1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-25 10:05:58 +01:00

Implement loss handling in diskwriter.

This commit is contained in:
Juliusz Chroboczek 2021-07-11 23:09:19 +02:00
parent 39d8cf72fe
commit b0c39fca22

View file

@ -255,7 +255,7 @@ type maybeUint32 uint64
const none maybeUint32 = 0 const none maybeUint32 = 0
func some(value uint32) maybeUint32 { func some(value uint32) maybeUint32 {
return maybeUint32(uint64(1 << 32) | uint64(value)) return maybeUint32(uint64(1<<32) | uint64(value))
} }
func valid(m maybeUint32) bool { func valid(m maybeUint32) bool {
@ -270,10 +270,10 @@ type diskTrack struct {
remote conn.UpTrack remote conn.UpTrack
conn *diskConn conn *diskConn
writer webm.BlockWriteCloser writer webm.BlockWriteCloser
builder *samplebuilder.SampleBuilder builder *samplebuilder.SampleBuilder
lastSeqno maybeUint32
origin maybeUint32 origin maybeUint32
kfRequested time.Time kfRequested time.Time
lastKf time.Time lastKf time.Time
@ -476,8 +476,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
return 0, nil return 0, nil
} }
codec := t.remote.Codec() // samplebuilder retains packets
data := make([]byte, len(buf)) data := make([]byte, len(buf))
copy(data, buf) copy(data, buf)
p := new(rtp.Packet) p := new(rtp.Packet)
@ -487,6 +486,46 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
return 0, nil return 0, nil
} }
if valid(t.lastSeqno) {
lastSeqno := uint16(value(t.lastSeqno))
if ((p.SequenceNumber - lastSeqno) & 0x8000) == 0 {
count := p.SequenceNumber - lastSeqno
if count > 0 && count < 128 {
var nacks []uint16
for i := lastSeqno + 1; i != p.SequenceNumber; i++ {
// different buf each time
buf := make([]byte, 1504)
n := t.remote.GetRTP(i, buf)
if n > 0 {
p := new(rtp.Packet)
err := p.Unmarshal(buf)
if err == nil {
t.writeRTP(p)
}
} else {
nacks = append(nacks, i)
}
}
if len(nacks) > 0 {
t.remote.Nack(t.conn.remote, nacks)
}
}
}
}
t.lastSeqno = some(uint32(p.SequenceNumber))
err = t.writeRTP(p)
if err != nil {
return 0, err
}
return len(buf), nil
}
// writeRTP writes the packet without doing any loss recovery.
// Called locked.
func (t *diskTrack) writeRTP(p *rtp.Packet) error {
codec := t.remote.Codec()
if strings.EqualFold(codec.MimeType, "video/vp9") { if strings.EqualFold(codec.MimeType, "video/vp9") {
var vp9 codecs.VP9Packet var vp9 codecs.VP9Packet
_, err := vp9.Unmarshal(p.Payload) _, err := vp9.Unmarshal(p.Payload)
@ -509,7 +548,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
for { for {
sample, ts := t.builder.PopWithTimestamp() sample, ts := t.builder.PopWithTimestamp()
if sample == nil { if sample == nil {
return len(buf), nil return nil
} }
keyframe := true keyframe := true
@ -528,7 +567,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
t.conn.warn( t.conn.warn(
"Write to disk " + err.Error(), "Write to disk " + err.Error(),
) )
return 0, err return err
} }
} }
} else { } else {
@ -540,7 +579,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
"Write to disk " + "Write to disk " +
err.Error(), err.Error(),
) )
return 0, err return err
} }
} }
} }
@ -554,7 +593,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
t.remote.RequestKeyframe() t.remote.RequestKeyframe()
t.kfRequested = now t.kfRequested = now
} }
return 0, nil return nil
} }
if t.writer == nil { if t.writer == nil {
@ -569,7 +608,7 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
tm := ts / (t.remote.Codec().ClockRate / 1000) tm := ts / (t.remote.Codec().ClockRate / 1000)
_, err := t.writer.Write(keyframe, int64(tm), sample.Data) _, err := t.writer.Write(keyframe, int64(tm), sample.Data)
if err != nil { if err != nil {
return 0, err return err
} }
} }
} }