diff --git a/packetcache/packetcache.go b/packetcache/packetcache.go index 80a4106..95dc0be 100644 --- a/packetcache/packetcache.go +++ b/packetcache/packetcache.go @@ -38,13 +38,6 @@ type bitmap struct { bitmap uint32 } -// frame is used for storing the last keyframe -type frame struct { - timestamp uint32 - complete bool - entries []entry -} - type Cache struct { mu sync.Mutex //stats @@ -55,10 +48,11 @@ type Cache struct { totalExpected uint32 received uint32 totalReceived uint32 + // last seen keyframe + keyframe uint16 + keyframeValid bool // bitmap bitmap bitmap - // buffered keyframe - keyframe frame // the actual cache tail uint16 entries []entry @@ -162,94 +156,6 @@ func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) { return true, first, uint16(bm >> 1) } -// insert inserts a packet into a frame. -func (frame *frame) insert(seqno uint16, timestamp uint32, marker bool, data []byte) bool { - n := len(frame.entries) - i := 0 - if n == 0 || seqno > frame.entries[n-1].seqno { - // fast path - i = n - } else { - for i < n { - if frame.entries[i].seqno >= seqno { - break - } - i++ - } - - if i < n && frame.entries[i].seqno == seqno { - // duplicate - return false - } - } - - if n >= maxFrame { - // overflow - return false - } - - lam := uint16(len(data)) - if marker { - lam |= 0x8000 - } - e := entry{ - seqno: seqno, - lengthAndMarker: lam, - timestamp: timestamp, - } - copy(e.buf[:], data) - - if i >= n { - frame.entries = append(frame.entries, e) - return true - } - frame.entries = append(frame.entries, entry{}) - copy(frame.entries[i+1:], frame.entries[i:]) - frame.entries[i] = e - return true -} - -// store checks whether a packet is part of the current keyframe and, if -// so, inserts it. -func (frame *frame) store(seqno uint16, timestamp uint32, first bool, marker bool, data []byte) bool { - if first { - if frame.timestamp != timestamp { - frame.timestamp = timestamp - frame.complete = false - frame.entries = frame.entries[:0] - } - } else if len(frame.entries) > 0 { - if frame.timestamp != timestamp { - delta := seqno - frame.entries[0].seqno - if (delta&0x8000) == 0 && delta > 0x4000 { - frame.complete = false - frame.entries = frame.entries[:0] - } - return false - } - } else { - return false - } - - done := frame.insert(seqno, timestamp, marker, data) - if done && !frame.complete { - marker := false - fst := frame.entries[0].seqno - for i := 1; i < len(frame.entries); i++ { - if frame.entries[i].seqno != fst+uint16(i) { - return done - } - if frame.entries[i].marker() { - marker = true - } - } - if marker { - frame.complete = true - } - } - return done -} - // Store stores a packet in the cache. It returns the first seqno in the // bitmap, and the index at which the packet was stored. func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker bool, buf []byte) (uint16, uint16) { @@ -270,6 +176,10 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker cache.cycle++ } cache.last = seqno + if cache.keyframeValid && + compare(cache.keyframe, seqno) > 0 { + cache.keyframeValid = false + } } else if cmp > 0 { if cache.received < cache.expected { cache.received++ @@ -278,9 +188,9 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker } cache.bitmap.set(seqno) - done := cache.keyframe.store(seqno, timestamp, keyframe, marker, buf) - if done && !cache.keyframe.complete { - completeKeyframe(cache) + if keyframe { + cache.keyframe = seqno + cache.keyframeValid = true } i := cache.tail @@ -297,58 +207,6 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker return cache.bitmap.first, i } -// completeKeyFrame attempts to complete the current keyframe. -func completeKeyframe(cache *Cache) { - l := len(cache.keyframe.entries) - if l == 0 { - return - } - first := cache.keyframe.entries[0].seqno - last := cache.keyframe.entries[l-1].seqno - count := (last - first) // may wrap around - if count > 0x4000 { - // this shouldn't happen - return - } - var buf []byte - if count > 1 { - if buf == nil { - buf = make([]byte, BufSize) - } - for i := uint16(1); i < count; i++ { - n, ts, marker := get(first+i, cache.entries, buf) - if n > 0 { - cache.keyframe.store( - first+i, ts, false, marker, buf, - ) - } - } - } - if !cache.keyframe.complete { - // Try to find packets after the last one. - for { - l := len(cache.keyframe.entries) - if cache.keyframe.entries[l-1].marker() { - break - } - if buf == nil { - buf = make([]byte, BufSize) - } - seqno := cache.keyframe.entries[l-1].seqno + 1 - n, ts, marker := get(seqno, cache.entries, buf) - if n <= 0 { - break - } - done := cache.keyframe.store( - seqno, ts, false, marker, buf, - ) - if !done || marker { - break - } - } - } -} - // Expect records that we expect n additional packets. func (cache *Cache) Expect(n int) { if n <= 0 { @@ -384,12 +242,7 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { cache.mu.Lock() defer cache.mu.Unlock() - n, _, _ := get(seqno, cache.keyframe.entries, result) - if n > 0 { - return n - } - - n, _, _ = get(seqno, cache.entries, result) + n, _, _ := get(seqno, cache.entries, result) if n > 0 { return n } @@ -397,17 +250,13 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { return 0 } -func (cache *Cache) Last() (bool, uint16, uint32) { +func (cache *Cache) Last() (uint16, bool) { cache.mu.Lock() defer cache.mu.Unlock() if !cache.lastValid { - return false, 0, 0 + return 0, false } - len, ts, _ := get(cache.last, cache.entries, nil) - if len == 0 { - return false, 0, 0 - } - return true, cache.last, ts + return cache.last, true } // GetAt retrieves a packet from the cache assuming it is at the given index. @@ -427,32 +276,15 @@ func (cache *Cache) GetAt(seqno uint16, index uint16, result []byte) uint16 { ) } -// Keyframe returns the last buffered keyframe. It returns the frame's -// timestamp and a boolean indicating if the frame is complete. -func (cache *Cache) Keyframe() (uint32, bool, []uint16) { +// Keyframe returns the seqno of the last seen keyframe +func (cache *Cache) Keyframe() (uint16, bool) { cache.mu.Lock() defer cache.mu.Unlock() - if len(cache.keyframe.entries) == 0 { - return 0, false, nil + if !cache.keyframeValid { + return 0, false } - - seqnos := make([]uint16, len(cache.keyframe.entries)) - for i := range cache.keyframe.entries { - seqnos[i] = cache.keyframe.entries[i].seqno - } - return cache.keyframe.timestamp, cache.keyframe.complete, seqnos -} - -func (cache *Cache) KeyframeSeqno() (bool, uint16, uint32) { - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.keyframe.entries) == 0 { - return false, 0, 0 - } - - return true, cache.keyframe.entries[0].seqno, cache.keyframe.timestamp + return cache.keyframe, true } func (cache *Cache) resize(capacity int) { diff --git a/packetcache/packetcache_test.go b/packetcache/packetcache_test.go index 2c1db0d..5dbc035 100644 --- a/packetcache/packetcache_test.go +++ b/packetcache/packetcache_test.go @@ -22,7 +22,7 @@ func TestCache(t *testing.T) { buf2 := randomBuf() cache := New(16) - found, _, _ := cache.Last() + _, found := cache.Last() if found { t.Errorf("Found in empty cache") } @@ -30,13 +30,12 @@ func TestCache(t *testing.T) { _, i1 := cache.Store(13, 42, false, false, buf1) _, i2 := cache.Store(17, 42, false, false, buf2) - found, seqno, ts := cache.Last() + seqno, found := cache.Last() if !found { t.Errorf("Not found") } - if seqno != 17 || ts != 42 { - t.Errorf("Expected %v, %v, got %v, %v", - 17, 42, seqno, ts) + if seqno != 17 { + t.Errorf("Expected %v, got %v", 17, seqno) } buf := make([]byte, BufSize) @@ -170,84 +169,6 @@ func TestCacheGrowCond(t *testing.T) { } } -func TestKeyframe(t *testing.T) { - cache := New(16) - packet := make([]byte, 1) - buf := make([]byte, BufSize) - - found, _, _ := cache.KeyframeSeqno() - if found { - t.Errorf("Found keyframe in empty cache") - } - - cache.Store(7, 57, true, false, packet) - if cache.keyframe.complete { - t.Errorf("Expected false, got true") - } - cache.Store(8, 57, false, true, packet) - if !cache.keyframe.complete { - t.Errorf("Expected true, got false") - } - - ts, c, kf := cache.Keyframe() - if ts != 57 || !c || len(kf) != 2 { - t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) - } - - found, seqno, ts := cache.KeyframeSeqno() - if !found || seqno != 7 || ts != 57 { - t.Errorf("Got %v %v %v, expected %v %v", found, seqno, ts, 7, 57) - } - - for _, i := range kf { - l := cache.Get(i, buf) - if int(l) != len(packet) { - t.Errorf("Couldn't get %v", i) - } - } - - for i := 0; i < 32; i++ { - cache.Store(uint16(9+i), uint32(58+i), false, false, packet) - } - - ts, c, kf = cache.Keyframe() - if ts != 57 || !c || len(kf) != 2 { - t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) - } - for _, i := range kf { - l := cache.Get(i, buf) - if int(l) != len(packet) { - t.Errorf("Couldn't get %v", i) - } - } -} - -func TestKeyframeUnsorted(t *testing.T) { - cache := New(16) - packet := make([]byte, 1) - - cache.Store(7, 57, false, false, packet) - cache.Store(9, 57, false, false, packet) - cache.Store(10, 57, false, true, packet) - cache.Store(6, 57, true, false, packet) - _, c, kf := cache.Keyframe() - if len(kf) != 2 || c { - t.Errorf("Got %v %v, expected 2", c, kf) - } - cache.Store(8, 57, false, false, packet) - - _, c, kf = cache.Keyframe() - if len(kf) != 5 || !c { - t.Errorf("Got %v %v, expected 5", c, kf) - } - for i, v := range kf { - if v != uint16(i+6) { - t.Errorf("Position %v, expected %v, got %v\n", - i, i+6, v) - } - } -} - func TestBitmap(t *testing.T) { value := uint64(0xcdd58f1e035379c0) packet := make([]byte, 1) diff --git a/rtpconn/rtpwriter.go b/rtpconn/rtpwriter.go index a57a9e0..64b132a 100644 --- a/rtpconn/rtpwriter.go +++ b/rtpconn/rtpwriter.go @@ -206,9 +206,10 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error { } } -func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { +func sendSequence(kf, last uint16, track conn.DownTrack, cache *packetcache.Cache) { buf := make([]byte, packetcache.BufSize) - for _, seqno := range kf { + seqno := kf + for ((last - seqno) & 0x8000) == 0 { bytes := cache.Get(seqno, buf) if bytes == 0 { return @@ -218,6 +219,7 @@ func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { if err != nil { return } + seqno++ } } @@ -253,14 +255,12 @@ func rtpWriterLoop(writer *rtpWriter, track *rtpUpTrack) { action.track.SetCname(cname) } - found, _, lts := track.cache.Last() - kts, _, kf := track.cache.Keyframe() - if found && len(kf) > 0 { - if ((lts-kts)&0x80000000) != 0 || - lts-kts < 2*90000 { - // we got a recent keyframe - go sendKeyframe( - kf, + last, foundLast := track.cache.Last() + kf, foundKf := track.cache.Keyframe() + if foundLast && foundKf { + if last-kf < 40 { // modulo 2^16 + go sendSequence( + kf, last, action.track, track.cache, ) @@ -331,11 +331,11 @@ func nackWriter(track *rtpUpTrack) { // drop any nacks before the last keyframe var cutoff uint16 - found, seqno, _ := track.cache.KeyframeSeqno() + seqno, found := track.cache.Keyframe() if found { cutoff = seqno } else { - last, lastSeqno, _ := track.cache.Last() + lastSeqno, last := track.cache.Last() if !last { // NACK on a fresh track? Give up. return