1
Fork 0

Rework packet recovery in diskwriter.

We now request a keyframe if we lose too many packets.
This commit is contained in:
Juliusz Chroboczek 2021-11-28 13:58:07 +01:00
parent 1f9e45a30b
commit fdf1fb4b16
1 changed files with 40 additions and 19 deletions

View File

@ -410,27 +410,28 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
if valid(t.lastSeqno) {
lastSeqno := uint16(value(t.lastSeqno))
if ((p.SequenceNumber - lastSeqno) & 0x8000) == 0 {
// jump forward
count := p.SequenceNumber - lastSeqno
if count > 0 && count < 128 {
for i := lastSeqno + 1; i != p.SequenceNumber; i++ {
// different buf each time
buf := make([]byte, 1504)
n := t.remote.GetPacket(i, buf, true)
if n == 0 {
continue
}
p := new(rtp.Packet)
err := p.Unmarshal(buf)
if err == nil {
t.writeRTP(p)
}
if count < 256 {
for i := uint16(1); i < count; i++ {
recover(t, lastSeqno + i)
}
} else {
requestKeyframe(t)
}
t.lastSeqno = some(uint32(p.SequenceNumber))
} else {
// jump backward
count := lastSeqno - p.SequenceNumber
if count >= 512 {
t.lastSeqno = none
requestKeyframe(t)
}
}
} else {
t.lastSeqno = some(uint32(p.SequenceNumber))
}
t.lastSeqno = some(uint32(p.SequenceNumber))
err = t.writeRTP(p)
if err != nil {
return 0, err
@ -438,6 +439,29 @@ func (t *diskTrack) Write(buf []byte) (int, error) {
return len(buf), nil
}
func recover(t *diskTrack, seqno uint16) {
// since the samplebuilder retains packets, use a fresh buffer
buf := make([]byte, 1504)
n := t.remote.GetPacket(seqno, buf, true)
if n == 0 {
return
}
p := new(rtp.Packet)
err := p.Unmarshal(buf)
if err != nil {
return
}
t.writeRTP(p)
}
func requestKeyframe(t *diskTrack) {
now := time.Now()
if now.Sub(t.kfRequested) > 500*time.Millisecond {
t.remote.RequestKeyframe()
t.kfRequested = now
}
}
// writeRTP writes the packet without doing any loss recovery.
// Called locked.
func (t *diskTrack) writeRTP(p *rtp.Packet) error {
@ -513,10 +537,7 @@ func (t *diskTrack) writeBuffered(force bool) error {
if keyframe {
t.lastKf = now
} else if t.writer == nil || now.Sub(t.lastKf) > 4*time.Second {
if now.Sub(t.kfRequested) > time.Second {
t.remote.RequestKeyframe()
t.kfRequested = now
}
requestKeyframe(t)
return nil
}