From 9b1d814b5841bf3962a6b3b7a625687f1fc9bc32 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Fri, 30 Apr 2021 16:26:17 +0200 Subject: [PATCH] Rework packetcache loss statistics. Don't maintain loss, which is too error-prone. Instead, maintain expected and received packet counts, and compute loss from that. --- packetcache/packetcache.go | 48 +++++++++++++++++----------- packetcache/packetcache_test.go | 56 ++++++++++++++++----------------- rtpconn/rtpconn.go | 20 +++++++----- rtpconn/rtpstats.go | 9 +++--- 4 files changed, 76 insertions(+), 57 deletions(-) diff --git a/packetcache/packetcache.go b/packetcache/packetcache.go index 23c2d60..80a4106 100644 --- a/packetcache/packetcache.go +++ b/packetcache/packetcache.go @@ -48,12 +48,13 @@ type frame struct { type Cache struct { mu sync.Mutex //stats - last uint16 - cycle uint16 - lastValid bool - expected uint32 - lost uint32 - totalLost uint32 + last uint16 + cycle uint16 + lastValid bool + expected uint32 + totalExpected uint32 + received uint32 + totalReceived uint32 // bitmap bitmap bitmap // buffered keyframe @@ -259,18 +260,19 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker cache.last = seqno cache.lastValid = true cache.expected++ + cache.received++ } else { cmp := compare(cache.last, seqno) if cmp < 0 { + cache.received++ cache.expected += uint32(seqno - cache.last) - cache.lost += uint32(seqno - cache.last - 1) if seqno < cache.last { cache.cycle++ } cache.last = seqno } else if cmp > 0 { - if cache.lost > 0 { - cache.lost-- + if cache.received < cache.expected { + cache.received++ } } } @@ -355,7 +357,6 @@ func (cache *Cache) Expect(n int) { cache.mu.Lock() defer cache.mu.Unlock() cache.expected += uint32(n) - cache.lost += uint32(n) } // get retrieves a packet from a slice of entries. @@ -510,23 +511,34 @@ func (cache *Cache) ResizeCond(capacity int) bool { return true } +// Stats contains cache statistics +type Stats struct { + Received, TotalReceived uint32 + Expected, TotalExpected uint32 + ESeqno uint32 +} + // GetStats returns statistics about received packets. If reset is true, // the statistics are reset. -func (cache *Cache) GetStats(reset bool) (uint32, uint32, uint32, uint32) { +func (cache *Cache) GetStats(reset bool) Stats { cache.mu.Lock() defer cache.mu.Unlock() - expected := cache.expected - lost := cache.lost - totalLost := cache.totalLost + cache.lost - eseqno := uint32(cache.cycle)<<16 | uint32(cache.last) + s := Stats{ + Received: cache.received, + TotalReceived: cache.totalReceived + cache.received, + Expected: cache.expected, + TotalExpected: cache.totalExpected + cache.expected, + ESeqno: uint32(cache.cycle)<<16 | uint32(cache.last), + } if reset { + cache.totalExpected += cache.expected cache.expected = 0 - cache.totalLost += cache.lost - cache.lost = 0 + cache.totalReceived += cache.received + cache.received = 0 } - return expected, lost, totalLost, eseqno + return s } // ToBitmap takes a non-empty sorted list of seqnos, and computes a bitmap diff --git a/packetcache/packetcache_test.go b/packetcache/packetcache_test.go index 04eee38..2c1db0d 100644 --- a/packetcache/packetcache_test.go +++ b/packetcache/packetcache_test.go @@ -499,13 +499,13 @@ func TestCacheStatsFull(t *testing.T) { for i := 0; i < 32; i++ { cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) } - expected, lost, totalLost, eseqno := cache.GetStats(false) - if expected != 32 || - lost != 0 || - totalLost != 0 || - eseqno != 31 { - t.Errorf("Expected 32, 0, 0, 31, got %v, %v, %v, %v", - expected, lost, totalLost, eseqno) + stats := cache.GetStats(false) + if stats.Received != 32 || + stats.TotalReceived != 32 || + stats.Expected != 32 || + stats.TotalExpected != 32 || + stats.ESeqno != 31 { + t.Errorf("Expected 32, 32, 32, 32, 31, got %v", stats) } } @@ -516,13 +516,13 @@ func TestCacheStatsDrop(t *testing.T) { cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) } } - expected, lost, totalLost, eseqno := cache.GetStats(false) - if expected != 32 || - lost != 2 || - totalLost != 2 || - eseqno != 31 { - t.Errorf("Expected 32, 1, 1, 31, got %v, %v, %v, %v", - expected, lost, totalLost, eseqno) + stats := cache.GetStats(false) + if stats.Received != 30 || + stats.TotalReceived != 30 || + stats.Expected != 32 || + stats.TotalExpected != 32 || + stats.ESeqno != 31 { + t.Errorf("Expected 30, 30, 32, 32, 31, got %v", stats) } } @@ -535,13 +535,13 @@ func TestCacheStatsUnordered(t *testing.T) { } cache.Store(uint16(8), 0, false, false, []byte{8}) cache.Store(uint16(10), 0, false, false, []byte{10}) - expected, lost, totalLost, eseqno := cache.GetStats(false) - if expected != 32 || - lost != 0 || - totalLost != 0 || - eseqno != 31 { - t.Errorf("Expected 32, 0, 0, 31, got %v, %v, %v, %v", - expected, lost, totalLost, eseqno) + stats := cache.GetStats(false) + if stats.Received != 32 || + stats.TotalReceived != 32 || + stats.Expected != 32 || + stats.TotalExpected != 32 || + stats.ESeqno != 31 { + t.Errorf("Expected 32, 32, 32, 32, 31, got %v", stats) } } @@ -555,12 +555,12 @@ func TestCacheStatsNack(t *testing.T) { cache.Expect(2) cache.Store(uint16(8), 0, false, false, []byte{8}) cache.Store(uint16(10), 0, false, false, []byte{10}) - expected, lost, totalLost, eseqno := cache.GetStats(false) - if expected != 34 || - lost != 2 || - totalLost != 2 || - eseqno != 31 { - t.Errorf("Expected 34, 2, 2, 31, got %v, %v, %v, %v", - expected, lost, totalLost, eseqno) + stats := cache.GetStats(false) + if stats.Received != 32 || + stats.TotalReceived != 32 || + stats.Expected != 34 || + stats.TotalExpected != 34 || + stats.ESeqno != 31 { + t.Errorf("Expected 32, 32, 34, 34, 31, got %v", stats) } } diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 7ed36a1..ab2d262 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -773,12 +773,18 @@ func sendUpRTCP(conn *rtpUpConnection) error { reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) for _, t := range tracks { updateUpTrack(t) - expected, lost, totalLost, eseqno := t.cache.GetStats(true) - if expected == 0 { - expected = 1 + stats := t.cache.GetStats(true) + var totalLost uint32 + if stats.TotalExpected > stats.TotalReceived { + totalLost = stats.TotalExpected - stats.TotalReceived } - if lost >= expected { - lost = expected - 1 + var fractionLost uint32 + if stats.Expected > stats.Received { + lost := stats.Expected - stats.Received + fractionLost = lost * 256 / stats.Expected + if fractionLost >= 255 { + fractionLost = 255 + } } t.mu.Lock() @@ -794,9 +800,9 @@ func sendUpRTCP(conn *rtpUpConnection) error { reports = append(reports, rtcp.ReceptionReport{ SSRC: uint32(t.track.SSRC()), - FractionLost: uint8((lost * 256) / expected), + FractionLost: uint8(fractionLost), TotalLost: totalLost, - LastSequenceNumber: eseqno, + LastSequenceNumber: stats.ESeqno, Jitter: t.jitter.Jitter(), LastSenderReport: uint32(srNTPTime >> 16), Delay: uint32(delay), diff --git a/rtpconn/rtpstats.go b/rtpconn/rtpstats.go index 3e9f57e..a6a3fd3 100644 --- a/rtpconn/rtpstats.go +++ b/rtpconn/rtpstats.go @@ -22,11 +22,12 @@ func (c *webClient) GetStats() *stats.Client { } tracks := up.getTracks() for _, t := range tracks { - expected, lost, _, _ := t.cache.GetStats(false) - if expected == 0 { - expected = 1 + s := t.cache.GetStats(false) + var loss uint8 + if s.Expected > s.Received { + loss = uint8((s.Expected - s.Received) * 100 / + s.Expected) } - loss := uint8(lost * 100 / expected) jitter := time.Duration(t.jitter.Jitter()) * (time.Second / time.Duration(t.jitter.HZ())) rate, _ := t.rate.Estimate()