From 9093339b62231b89994865a3d5301a9fe4940fd2 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Wed, 4 Aug 2021 02:25:56 +0200 Subject: [PATCH] Rework handling of buffered keyframes. Instead of buffering the last keyframe, we merely keep track of its seqno, and use the main cache for recovering. We also send the whole sequence of packets rather than just the keyframe itself. --- packetcache/packetcache.go | 206 +++----------------------------- packetcache/packetcache_test.go | 87 +------------- rtpconn/rtpwriter.go | 24 ++-- 3 files changed, 35 insertions(+), 282 deletions(-) diff --git a/packetcache/packetcache.go b/packetcache/packetcache.go index 80a4106..95dc0be 100644 --- a/packetcache/packetcache.go +++ b/packetcache/packetcache.go @@ -38,13 +38,6 @@ type bitmap struct { bitmap uint32 } -// frame is used for storing the last keyframe -type frame struct { - timestamp uint32 - complete bool - entries []entry -} - type Cache struct { mu sync.Mutex //stats @@ -55,10 +48,11 @@ type Cache struct { totalExpected uint32 received uint32 totalReceived uint32 + // last seen keyframe + keyframe uint16 + keyframeValid bool // bitmap bitmap bitmap - // buffered keyframe - keyframe frame // the actual cache tail uint16 entries []entry @@ -162,94 +156,6 @@ func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) { return true, first, uint16(bm >> 1) } -// insert inserts a packet into a frame. -func (frame *frame) insert(seqno uint16, timestamp uint32, marker bool, data []byte) bool { - n := len(frame.entries) - i := 0 - if n == 0 || seqno > frame.entries[n-1].seqno { - // fast path - i = n - } else { - for i < n { - if frame.entries[i].seqno >= seqno { - break - } - i++ - } - - if i < n && frame.entries[i].seqno == seqno { - // duplicate - return false - } - } - - if n >= maxFrame { - // overflow - return false - } - - lam := uint16(len(data)) - if marker { - lam |= 0x8000 - } - e := entry{ - seqno: seqno, - lengthAndMarker: lam, - timestamp: timestamp, - } - copy(e.buf[:], data) - - if i >= n { - frame.entries = append(frame.entries, e) - return true - } - frame.entries = append(frame.entries, entry{}) - copy(frame.entries[i+1:], frame.entries[i:]) - frame.entries[i] = e - return true -} - -// store checks whether a packet is part of the current keyframe and, if -// so, inserts it. -func (frame *frame) store(seqno uint16, timestamp uint32, first bool, marker bool, data []byte) bool { - if first { - if frame.timestamp != timestamp { - frame.timestamp = timestamp - frame.complete = false - frame.entries = frame.entries[:0] - } - } else if len(frame.entries) > 0 { - if frame.timestamp != timestamp { - delta := seqno - frame.entries[0].seqno - if (delta&0x8000) == 0 && delta > 0x4000 { - frame.complete = false - frame.entries = frame.entries[:0] - } - return false - } - } else { - return false - } - - done := frame.insert(seqno, timestamp, marker, data) - if done && !frame.complete { - marker := false - fst := frame.entries[0].seqno - for i := 1; i < len(frame.entries); i++ { - if frame.entries[i].seqno != fst+uint16(i) { - return done - } - if frame.entries[i].marker() { - marker = true - } - } - if marker { - frame.complete = true - } - } - return done -} - // Store stores a packet in the cache. It returns the first seqno in the // bitmap, and the index at which the packet was stored. func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker bool, buf []byte) (uint16, uint16) { @@ -270,6 +176,10 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker cache.cycle++ } cache.last = seqno + if cache.keyframeValid && + compare(cache.keyframe, seqno) > 0 { + cache.keyframeValid = false + } } else if cmp > 0 { if cache.received < cache.expected { cache.received++ @@ -278,9 +188,9 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker } cache.bitmap.set(seqno) - done := cache.keyframe.store(seqno, timestamp, keyframe, marker, buf) - if done && !cache.keyframe.complete { - completeKeyframe(cache) + if keyframe { + cache.keyframe = seqno + cache.keyframeValid = true } i := cache.tail @@ -297,58 +207,6 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker return cache.bitmap.first, i } -// completeKeyFrame attempts to complete the current keyframe. -func completeKeyframe(cache *Cache) { - l := len(cache.keyframe.entries) - if l == 0 { - return - } - first := cache.keyframe.entries[0].seqno - last := cache.keyframe.entries[l-1].seqno - count := (last - first) // may wrap around - if count > 0x4000 { - // this shouldn't happen - return - } - var buf []byte - if count > 1 { - if buf == nil { - buf = make([]byte, BufSize) - } - for i := uint16(1); i < count; i++ { - n, ts, marker := get(first+i, cache.entries, buf) - if n > 0 { - cache.keyframe.store( - first+i, ts, false, marker, buf, - ) - } - } - } - if !cache.keyframe.complete { - // Try to find packets after the last one. - for { - l := len(cache.keyframe.entries) - if cache.keyframe.entries[l-1].marker() { - break - } - if buf == nil { - buf = make([]byte, BufSize) - } - seqno := cache.keyframe.entries[l-1].seqno + 1 - n, ts, marker := get(seqno, cache.entries, buf) - if n <= 0 { - break - } - done := cache.keyframe.store( - seqno, ts, false, marker, buf, - ) - if !done || marker { - break - } - } - } -} - // Expect records that we expect n additional packets. func (cache *Cache) Expect(n int) { if n <= 0 { @@ -384,12 +242,7 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { cache.mu.Lock() defer cache.mu.Unlock() - n, _, _ := get(seqno, cache.keyframe.entries, result) - if n > 0 { - return n - } - - n, _, _ = get(seqno, cache.entries, result) + n, _, _ := get(seqno, cache.entries, result) if n > 0 { return n } @@ -397,17 +250,13 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { return 0 } -func (cache *Cache) Last() (bool, uint16, uint32) { +func (cache *Cache) Last() (uint16, bool) { cache.mu.Lock() defer cache.mu.Unlock() if !cache.lastValid { - return false, 0, 0 + return 0, false } - len, ts, _ := get(cache.last, cache.entries, nil) - if len == 0 { - return false, 0, 0 - } - return true, cache.last, ts + return cache.last, true } // GetAt retrieves a packet from the cache assuming it is at the given index. @@ -427,32 +276,15 @@ func (cache *Cache) GetAt(seqno uint16, index uint16, result []byte) uint16 { ) } -// Keyframe returns the last buffered keyframe. It returns the frame's -// timestamp and a boolean indicating if the frame is complete. -func (cache *Cache) Keyframe() (uint32, bool, []uint16) { +// Keyframe returns the seqno of the last seen keyframe +func (cache *Cache) Keyframe() (uint16, bool) { cache.mu.Lock() defer cache.mu.Unlock() - if len(cache.keyframe.entries) == 0 { - return 0, false, nil + if !cache.keyframeValid { + return 0, false } - - seqnos := make([]uint16, len(cache.keyframe.entries)) - for i := range cache.keyframe.entries { - seqnos[i] = cache.keyframe.entries[i].seqno - } - return cache.keyframe.timestamp, cache.keyframe.complete, seqnos -} - -func (cache *Cache) KeyframeSeqno() (bool, uint16, uint32) { - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.keyframe.entries) == 0 { - return false, 0, 0 - } - - return true, cache.keyframe.entries[0].seqno, cache.keyframe.timestamp + return cache.keyframe, true } func (cache *Cache) resize(capacity int) { diff --git a/packetcache/packetcache_test.go b/packetcache/packetcache_test.go index 2c1db0d..5dbc035 100644 --- a/packetcache/packetcache_test.go +++ b/packetcache/packetcache_test.go @@ -22,7 +22,7 @@ func TestCache(t *testing.T) { buf2 := randomBuf() cache := New(16) - found, _, _ := cache.Last() + _, found := cache.Last() if found { t.Errorf("Found in empty cache") } @@ -30,13 +30,12 @@ func TestCache(t *testing.T) { _, i1 := cache.Store(13, 42, false, false, buf1) _, i2 := cache.Store(17, 42, false, false, buf2) - found, seqno, ts := cache.Last() + seqno, found := cache.Last() if !found { t.Errorf("Not found") } - if seqno != 17 || ts != 42 { - t.Errorf("Expected %v, %v, got %v, %v", - 17, 42, seqno, ts) + if seqno != 17 { + t.Errorf("Expected %v, got %v", 17, seqno) } buf := make([]byte, BufSize) @@ -170,84 +169,6 @@ func TestCacheGrowCond(t *testing.T) { } } -func TestKeyframe(t *testing.T) { - cache := New(16) - packet := make([]byte, 1) - buf := make([]byte, BufSize) - - found, _, _ := cache.KeyframeSeqno() - if found { - t.Errorf("Found keyframe in empty cache") - } - - cache.Store(7, 57, true, false, packet) - if cache.keyframe.complete { - t.Errorf("Expected false, got true") - } - cache.Store(8, 57, false, true, packet) - if !cache.keyframe.complete { - t.Errorf("Expected true, got false") - } - - ts, c, kf := cache.Keyframe() - if ts != 57 || !c || len(kf) != 2 { - t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) - } - - found, seqno, ts := cache.KeyframeSeqno() - if !found || seqno != 7 || ts != 57 { - t.Errorf("Got %v %v %v, expected %v %v", found, seqno, ts, 7, 57) - } - - for _, i := range kf { - l := cache.Get(i, buf) - if int(l) != len(packet) { - t.Errorf("Couldn't get %v", i) - } - } - - for i := 0; i < 32; i++ { - cache.Store(uint16(9+i), uint32(58+i), false, false, packet) - } - - ts, c, kf = cache.Keyframe() - if ts != 57 || !c || len(kf) != 2 { - t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) - } - for _, i := range kf { - l := cache.Get(i, buf) - if int(l) != len(packet) { - t.Errorf("Couldn't get %v", i) - } - } -} - -func TestKeyframeUnsorted(t *testing.T) { - cache := New(16) - packet := make([]byte, 1) - - cache.Store(7, 57, false, false, packet) - cache.Store(9, 57, false, false, packet) - cache.Store(10, 57, false, true, packet) - cache.Store(6, 57, true, false, packet) - _, c, kf := cache.Keyframe() - if len(kf) != 2 || c { - t.Errorf("Got %v %v, expected 2", c, kf) - } - cache.Store(8, 57, false, false, packet) - - _, c, kf = cache.Keyframe() - if len(kf) != 5 || !c { - t.Errorf("Got %v %v, expected 5", c, kf) - } - for i, v := range kf { - if v != uint16(i+6) { - t.Errorf("Position %v, expected %v, got %v\n", - i, i+6, v) - } - } -} - func TestBitmap(t *testing.T) { value := uint64(0xcdd58f1e035379c0) packet := make([]byte, 1) diff --git a/rtpconn/rtpwriter.go b/rtpconn/rtpwriter.go index a57a9e0..64b132a 100644 --- a/rtpconn/rtpwriter.go +++ b/rtpconn/rtpwriter.go @@ -206,9 +206,10 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error { } } -func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { +func sendSequence(kf, last uint16, track conn.DownTrack, cache *packetcache.Cache) { buf := make([]byte, packetcache.BufSize) - for _, seqno := range kf { + seqno := kf + for ((last - seqno) & 0x8000) == 0 { bytes := cache.Get(seqno, buf) if bytes == 0 { return @@ -218,6 +219,7 @@ func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { if err != nil { return } + seqno++ } } @@ -253,14 +255,12 @@ func rtpWriterLoop(writer *rtpWriter, track *rtpUpTrack) { action.track.SetCname(cname) } - found, _, lts := track.cache.Last() - kts, _, kf := track.cache.Keyframe() - if found && len(kf) > 0 { - if ((lts-kts)&0x80000000) != 0 || - lts-kts < 2*90000 { - // we got a recent keyframe - go sendKeyframe( - kf, + last, foundLast := track.cache.Last() + kf, foundKf := track.cache.Keyframe() + if foundLast && foundKf { + if last-kf < 40 { // modulo 2^16 + go sendSequence( + kf, last, action.track, track.cache, ) @@ -331,11 +331,11 @@ func nackWriter(track *rtpUpTrack) { // drop any nacks before the last keyframe var cutoff uint16 - found, seqno, _ := track.cache.KeyframeSeqno() + seqno, found := track.cache.Keyframe() if found { cutoff = seqno } else { - last, lastSeqno, _ := track.cache.Last() + lastSeqno, last := track.cache.Last() if !last { // NACK on a fresh track? Give up. return