diff --git a/packetcache/packetcache.go b/packetcache/packetcache.go index 9de29eb..8f6828d 100644 --- a/packetcache/packetcache.go +++ b/packetcache/packetcache.go @@ -9,9 +9,18 @@ const BufSize = 1500 const maxFrame = 1024 type entry struct { - seqno uint16 - length uint16 - buf [BufSize]byte + seqno uint16 + lengthAndMarker uint16 + timestamp uint32 + buf [BufSize]byte +} + +func (e *entry) length() uint16 { + return e.lengthAndMarker & 0x7FFF +} + +func (e *entry) marker() bool { + return (e.lengthAndMarker & 0x8000) != 0 } type bitmap struct { @@ -22,6 +31,7 @@ type bitmap struct { type frame struct { timestamp uint32 + complete bool entries []entry } @@ -52,8 +62,18 @@ func New(capacity int) *Cache { } } +func compare(s1, s2 uint16) int { + if s1 == s2 { + return 0 + } + if ((s2 - s1) & 0x8000) != 0 { + return 1 + } + return -1 +} + func seqnoInvalid(seqno, reference uint16) bool { - if ((seqno - reference) & 0x8000) == 0 { + if compare(reference, seqno) < 0 { return false } @@ -73,7 +93,7 @@ func (bitmap *bitmap) set(seqno uint16) { return } - if ((seqno - bitmap.first) & 0x8000) != 0 { + if compare(bitmap.first, seqno) > 0 { return } @@ -104,11 +124,10 @@ func (cache *Cache) BitmapGet(next uint16) (bool, uint16, uint16) { func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) { first := bitmap.first - count := next - first - if (count&0x8000) != 0 || count == 0 { - // next is in the past + if compare(first, next) >= 0 { return false, first, 0 } + count := next - first if count > 17 { count = 17 } @@ -129,59 +148,93 @@ func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) { return true, first, uint16(bm >> 1) } -func (frame *frame) store(seqno uint16, timestamp uint32, first bool, data []byte) { +func (frame *frame) insert(seqno uint16, timestamp uint32, marker bool, data []byte) bool { + n := len(frame.entries) + i := 0 + if n == 0 || seqno > frame.entries[n-1].seqno { + // fast path + i = n + } else { + for i < n { + if frame.entries[i].seqno >= seqno { + break + } + i++ + } + + if i < n && frame.entries[i].seqno == seqno { + // duplicate + return false + } + } + + if n >= maxFrame { + // overflow + return false + } + + lam := uint16(len(data)) + if marker { + lam |= 0x8000 + } + e := entry{ + seqno: seqno, + lengthAndMarker: lam, + timestamp: timestamp, + } + copy(e.buf[:], data) + + if i >= n { + frame.entries = append(frame.entries, e) + return true + } + frame.entries = append(frame.entries, entry{}) + copy(frame.entries[i+1:], frame.entries[i:]) + frame.entries[i] = e + return true +} + +func (frame *frame) store(seqno uint16, timestamp uint32, first bool, marker bool, data []byte) bool { if first { if frame.timestamp != timestamp { frame.timestamp = timestamp + frame.complete = false frame.entries = frame.entries[:0] } } else if len(frame.entries) > 0 { if frame.timestamp != timestamp { delta := seqno - frame.entries[0].seqno - if (delta & 0x8000) == 0 && delta > 0x4000 { + if (delta&0x8000) == 0 && delta > 0x4000 { + frame.complete = false frame.entries = frame.entries[:0] } - return + return false } } else { - return + return false } - i := 0 - for i < len(frame.entries) { - if frame.entries[i].seqno >= seqno { - break + done := frame.insert(seqno, timestamp, marker, data) + if done && !frame.complete { + marker := false + fst := frame.entries[0].seqno + for i := 1; i < len(frame.entries); i++ { + if frame.entries[i].seqno != fst+uint16(i) { + return done + } + if frame.entries[i].marker() { + marker = true + } + } + if marker { + frame.complete = true } - i++ } - - if i < len(frame.entries) && frame.entries[i].seqno == seqno { - // duplicate - return - } - - if len(frame.entries) >= maxFrame { - // overflow - return - } - - e := entry{ - seqno: seqno, - length: uint16(len(data)), - } - copy(e.buf[:], data) - - if i >= len(frame.entries) { - frame.entries = append(frame.entries, e) - return - } - frame.entries = append(frame.entries, entry{}) - copy(frame.entries[i+1:], frame.entries[i:]) - frame.entries[i] = e + return done } // Store a packet, setting bitmap at the same time -func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, buf []byte) (uint16, uint16) { +func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker bool, buf []byte) (uint16, uint16) { cache.mu.Lock() defer cache.mu.Unlock() @@ -190,7 +243,7 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, buf []b cache.lastValid = true cache.expected++ } else { - if ((cache.last - seqno) & 0x8000) != 0 { + if compare(cache.last, seqno) <= 0 { cache.expected += uint32(seqno - cache.last) cache.lost += uint32(seqno - cache.last - 1) if seqno < cache.last { @@ -205,17 +258,76 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, buf []b } cache.bitmap.set(seqno) - cache.keyframe.store(seqno, timestamp, keyframe, buf) + done := cache.keyframe.store(seqno, timestamp, keyframe, marker, buf) + if done && !cache.keyframe.complete { + completeKeyframe(cache) + } i := cache.tail cache.entries[i].seqno = seqno copy(cache.entries[i].buf[:], buf) - cache.entries[i].length = uint16(len(buf)) + lam := uint16(len(buf)) + if marker { + lam |= 0x8000 + } + cache.entries[i].lengthAndMarker = lam + cache.entries[i].timestamp = timestamp cache.tail = (i + 1) % uint16(len(cache.entries)) return cache.bitmap.first, i } +func completeKeyframe(cache *Cache) { + l := len(cache.keyframe.entries) + if l == 0 { + return + } + first := cache.keyframe.entries[0].seqno + last := cache.keyframe.entries[l-1].seqno + count := (last - first) // may wrap around + if count > 0x4000 { + // this shouldn't happen + return + } + var buf []byte + if count > 1 { + if buf == nil { + buf = make([]byte, BufSize) + } + for i := uint16(1); i < count; i++ { + n, ts, marker := get(first+i, cache.entries, buf) + if n > 0 { + cache.keyframe.store( + first+i, ts, false, marker, buf, + ) + } + } + } + if !cache.keyframe.complete { + // Try to find packets after the last one. + for { + l := len(cache.keyframe.entries) + if cache.keyframe.entries[l-1].marker() { + break + } + if buf == nil { + buf = make([]byte, BufSize) + } + seqno := cache.keyframe.entries[l-1].seqno + 1 + n, ts, marker := get(seqno, cache.entries, buf) + if n <= 0 { + break + } + done := cache.keyframe.store( + seqno, ts, false, marker, buf, + ) + if !done || marker { + break + } + } + } +} + func (cache *Cache) Expect(n int) { if n <= 0 { return @@ -225,29 +337,29 @@ func (cache *Cache) Expect(n int) { cache.expected += uint32(n) } -func get(seqno uint16, entries []entry, result []byte) uint16 { +func get(seqno uint16, entries []entry, result []byte) (uint16, uint32, bool) { for i := range entries { - if entries[i].length == 0 || entries[i].seqno != seqno { + if entries[i].lengthAndMarker == 0 || entries[i].seqno != seqno { continue } - return uint16(copy( - result[:entries[i].length], - entries[i].buf[:]), - ) + n := uint16(copy( + result[:entries[i].length()], + entries[i].buf[:])) + return n, entries[i].timestamp, entries[i].marker() } - return 0 + return 0, 0, false } func (cache *Cache) Get(seqno uint16, result []byte) uint16 { cache.mu.Lock() defer cache.mu.Unlock() - n := get(seqno, cache.keyframe.entries, result) + n, _, _ := get(seqno, cache.keyframe.entries, result) if n > 0 { return n } - n = get(seqno, cache.entries, result) + n, _, _ = get(seqno, cache.entries, result) if n > 0 { return n } @@ -266,24 +378,24 @@ func (cache *Cache) GetAt(seqno uint16, index uint16, result []byte) uint16 { return 0 } return uint16(copy( - result[:cache.entries[index].length], + result[:cache.entries[index].length()], cache.entries[index].buf[:]), ) } -func (cache *Cache) Keyframe() (uint32, []uint16) { +func (cache *Cache) Keyframe() (uint32, bool, []uint16) { cache.mu.Lock() defer cache.mu.Unlock() if len(cache.keyframe.entries) == 0 { - return 0, nil + return 0, false, nil } seqnos := make([]uint16, len(cache.keyframe.entries)) for i := range cache.keyframe.entries { seqnos[i] = cache.keyframe.entries[i].seqno } - return cache.keyframe.timestamp, seqnos + return cache.keyframe.timestamp, cache.keyframe.complete, seqnos } func (cache *Cache) resize(capacity int) { diff --git a/packetcache/packetcache_test.go b/packetcache/packetcache_test.go index 0b2f6fe..756ae83 100644 --- a/packetcache/packetcache_test.go +++ b/packetcache/packetcache_test.go @@ -20,8 +20,8 @@ func TestCache(t *testing.T) { buf1 := randomBuf() buf2 := randomBuf() cache := New(16) - _, i1 := cache.Store(13, 0, false, buf1) - _, i2 := cache.Store(17, 0, false, buf2) + _, i1 := cache.Store(13, 0, false, false, buf1) + _, i2 := cache.Store(17, 0, false, false, buf2) buf := make([]byte, BufSize) @@ -62,7 +62,7 @@ func TestCacheOverflow(t *testing.T) { cache := New(16) for i := 0; i < 32; i++ { - cache.Store(uint16(i), 0, false, []byte{uint8(i)}) + cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) } for i := 0; i < 32; i++ { @@ -84,7 +84,7 @@ func TestCacheGrow(t *testing.T) { cache := New(16) for i := 0; i < 24; i++ { - cache.Store(uint16(i), 0, false, []byte{uint8(i)}) + cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) } cache.Resize(32) @@ -107,7 +107,7 @@ func TestCacheShrink(t *testing.T) { cache := New(16) for i := 0; i < 24; i++ { - cache.Store(uint16(i), 0, false, []byte{uint8(i)}) + cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) } cache.Resize(12) @@ -155,12 +155,18 @@ func TestKeyframe(t *testing.T) { packet := make([]byte, 1) buf := make([]byte, BufSize) - cache.Store(7, 57, true, packet) - cache.Store(8, 57, true, packet) + cache.Store(7, 57, true, false, packet) + if cache.keyframe.complete { + t.Errorf("Expected false, got true") + } + cache.Store(8, 57, false, true, packet) + if !cache.keyframe.complete { + t.Errorf("Expected true, got false") + } - ts, kf := cache.Keyframe() - if ts != 57 || len(kf) != 2 { - t.Errorf("Got %v %v, expected %v %v", ts, len(kf), 57, 2) + ts, c, kf := cache.Keyframe() + if ts != 57 || !c || len(kf) != 2 { + t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) } for _, i := range kf { l := cache.Get(i, buf) @@ -170,12 +176,12 @@ func TestKeyframe(t *testing.T) { } for i := 0; i < 32; i++ { - cache.Store(uint16(9 + i), uint32(58 + i), false, packet) + cache.Store(uint16(9+i), uint32(58+i), false, false, packet) } - ts, kf = cache.Keyframe() - if ts != 57 || len(kf) != 2 { - t.Errorf("Got %v %v, expected %v %v", ts, len(kf), 57, 2) + ts, c, kf = cache.Keyframe() + if ts != 57 || !c || len(kf) != 2 { + t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2) } for _, i := range kf { l := cache.Get(i, buf) @@ -189,26 +195,28 @@ func TestKeyframeUnsorted(t *testing.T) { cache := New(16) packet := make([]byte, 1) - cache.Store(7, 57, true, packet) - cache.Store(9, 57, true, packet) - cache.Store(8, 57, true, packet) - cache.Store(10, 57, true, packet) - cache.Store(6, 57, true, packet) - cache.Store(8, 57, true, packet) + cache.Store(7, 57, false, false, packet) + cache.Store(9, 57, false, false, packet) + cache.Store(10, 57, false, true, packet) + cache.Store(6, 57, true, false, packet) + _, c, kf := cache.Keyframe() + if len(kf) != 2 || c { + t.Errorf("Got %v %v, expected 2", c, kf) + } + cache.Store(8, 57, false, false, packet) - _, kf := cache.Keyframe() - if len(kf) != 5 { - t.Errorf("Got length %v, expected 5", len(kf)) + _, c, kf = cache.Keyframe() + if len(kf) != 5 || !c { + t.Errorf("Got %v %v, expected 5", c, kf) } for i, v := range kf { - if v != uint16(i + 6) { + if v != uint16(i+6) { t.Errorf("Position %v, expected %v, got %v\n", - i, i + 6, v) + i, i+6, v) } } } - func TestBitmap(t *testing.T) { value := uint64(0xcdd58f1e035379c0) packet := make([]byte, 1) @@ -218,7 +226,7 @@ func TestBitmap(t *testing.T) { var first uint16 for i := 0; i < 64; i++ { if (value & (1 << i)) != 0 { - first, _ = cache.Store(uint16(42+i), 0, false, packet) + first, _ = cache.Store(uint16(42+i), 0, false, false, packet) } } @@ -234,13 +242,13 @@ func TestBitmapWrap(t *testing.T) { cache := New(16) - cache.Store(0x7000, 0, false, packet) - cache.Store(0xA000, 0, false, packet) + cache.Store(0x7000, 0, false, false, packet) + cache.Store(0xA000, 0, false, false, packet) var first uint16 for i := 0; i < 64; i++ { if (value & (1 << i)) != 0 { - first, _ = cache.Store(uint16(42+i), 0, false, packet) + first, _ = cache.Store(uint16(42+i), 0, false, false, packet) } } @@ -258,7 +266,7 @@ func TestBitmapGet(t *testing.T) { for i := 0; i < 64; i++ { if (value & (1 << i)) != 0 { - cache.Store(uint16(42+i), 0, false, packet) + cache.Store(uint16(42+i), 0, false, false, packet) } } @@ -300,7 +308,7 @@ func TestBitmapPacket(t *testing.T) { for i := 0; i < 64; i++ { if (value & (1 << i)) != 0 { - cache.Store(uint16(42+i), 0, false, packet) + cache.Store(uint16(42+i), 0, false, false, packet) } } @@ -358,7 +366,7 @@ func BenchmarkCachePutGet(b *testing.B) { for i := 0; i < b.N; i++ { seqno := uint16(i) - cache.Store(seqno, 0, false, buf) + cache.Store(seqno, 0, false, false, buf) for _, ch := range chans { ch <- seqno } @@ -409,7 +417,7 @@ func BenchmarkCachePutGetAt(b *testing.B) { for i := 0; i < b.N; i++ { seqno := uint16(i) - _, index := cache.Store(seqno, 0, false, buf) + _, index := cache.Store(seqno, 0, false, false, buf) for _, ch := range chans { ch <- is{index, seqno} } diff --git a/rtpconn/rtpreader.go b/rtpconn/rtpreader.go index 27f729b..8511f96 100644 --- a/rtpconn/rtpreader.go +++ b/rtpconn/rtpreader.go @@ -59,7 +59,7 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { first, index := track.cache.Store( packet.SequenceNumber, packet.Timestamp, - kf, buf[:bytes], + kf, packet.Marker, buf[:bytes], ) _, rate := track.rate.Estimate() diff --git a/rtpconn/rtpwriter.go b/rtpconn/rtpwriter.go index b0bc040..f977d39 100644 --- a/rtpconn/rtpwriter.go +++ b/rtpconn/rtpwriter.go @@ -209,7 +209,7 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error { } func sendKeyframe(track conn.DownTrack, cache *packetcache.Cache) { - _, kf := cache.Keyframe() + _, _, kf := cache.Keyframe() if len(kf) == 0 { return }