1
Fork 0

Rework packetcache loss statistics.

Don't maintain loss, which is too error-prone.  Instead, maintain
expected and received packet counts, and compute loss from that.
This commit is contained in:
Juliusz Chroboczek 2021-04-30 16:26:17 +02:00
parent cd1ac8a8c9
commit 9b1d814b58
4 changed files with 76 additions and 57 deletions

View File

@ -48,12 +48,13 @@ type frame struct {
type Cache struct { type Cache struct {
mu sync.Mutex mu sync.Mutex
//stats //stats
last uint16 last uint16
cycle uint16 cycle uint16
lastValid bool lastValid bool
expected uint32 expected uint32
lost uint32 totalExpected uint32
totalLost uint32 received uint32
totalReceived uint32
// bitmap // bitmap
bitmap bitmap bitmap bitmap
// buffered keyframe // buffered keyframe
@ -259,18 +260,19 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker
cache.last = seqno cache.last = seqno
cache.lastValid = true cache.lastValid = true
cache.expected++ cache.expected++
cache.received++
} else { } else {
cmp := compare(cache.last, seqno) cmp := compare(cache.last, seqno)
if cmp < 0 { if cmp < 0 {
cache.received++
cache.expected += uint32(seqno - cache.last) cache.expected += uint32(seqno - cache.last)
cache.lost += uint32(seqno - cache.last - 1)
if seqno < cache.last { if seqno < cache.last {
cache.cycle++ cache.cycle++
} }
cache.last = seqno cache.last = seqno
} else if cmp > 0 { } else if cmp > 0 {
if cache.lost > 0 { if cache.received < cache.expected {
cache.lost-- cache.received++
} }
} }
} }
@ -355,7 +357,6 @@ func (cache *Cache) Expect(n int) {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
cache.expected += uint32(n) cache.expected += uint32(n)
cache.lost += uint32(n)
} }
// get retrieves a packet from a slice of entries. // get retrieves a packet from a slice of entries.
@ -510,23 +511,34 @@ func (cache *Cache) ResizeCond(capacity int) bool {
return true return true
} }
// Stats contains cache statistics
type Stats struct {
Received, TotalReceived uint32
Expected, TotalExpected uint32
ESeqno uint32
}
// GetStats returns statistics about received packets. If reset is true, // GetStats returns statistics about received packets. If reset is true,
// the statistics are reset. // the statistics are reset.
func (cache *Cache) GetStats(reset bool) (uint32, uint32, uint32, uint32) { func (cache *Cache) GetStats(reset bool) Stats {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
expected := cache.expected s := Stats{
lost := cache.lost Received: cache.received,
totalLost := cache.totalLost + cache.lost TotalReceived: cache.totalReceived + cache.received,
eseqno := uint32(cache.cycle)<<16 | uint32(cache.last) Expected: cache.expected,
TotalExpected: cache.totalExpected + cache.expected,
ESeqno: uint32(cache.cycle)<<16 | uint32(cache.last),
}
if reset { if reset {
cache.totalExpected += cache.expected
cache.expected = 0 cache.expected = 0
cache.totalLost += cache.lost cache.totalReceived += cache.received
cache.lost = 0 cache.received = 0
} }
return expected, lost, totalLost, eseqno return s
} }
// ToBitmap takes a non-empty sorted list of seqnos, and computes a bitmap // ToBitmap takes a non-empty sorted list of seqnos, and computes a bitmap

View File

@ -499,13 +499,13 @@ func TestCacheStatsFull(t *testing.T) {
for i := 0; i < 32; i++ { for i := 0; i < 32; i++ {
cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) cache.Store(uint16(i), 0, false, false, []byte{uint8(i)})
} }
expected, lost, totalLost, eseqno := cache.GetStats(false) stats := cache.GetStats(false)
if expected != 32 || if stats.Received != 32 ||
lost != 0 || stats.TotalReceived != 32 ||
totalLost != 0 || stats.Expected != 32 ||
eseqno != 31 { stats.TotalExpected != 32 ||
t.Errorf("Expected 32, 0, 0, 31, got %v, %v, %v, %v", stats.ESeqno != 31 {
expected, lost, totalLost, eseqno) t.Errorf("Expected 32, 32, 32, 32, 31, got %v", stats)
} }
} }
@ -516,13 +516,13 @@ func TestCacheStatsDrop(t *testing.T) {
cache.Store(uint16(i), 0, false, false, []byte{uint8(i)}) cache.Store(uint16(i), 0, false, false, []byte{uint8(i)})
} }
} }
expected, lost, totalLost, eseqno := cache.GetStats(false) stats := cache.GetStats(false)
if expected != 32 || if stats.Received != 30 ||
lost != 2 || stats.TotalReceived != 30 ||
totalLost != 2 || stats.Expected != 32 ||
eseqno != 31 { stats.TotalExpected != 32 ||
t.Errorf("Expected 32, 1, 1, 31, got %v, %v, %v, %v", stats.ESeqno != 31 {
expected, lost, totalLost, eseqno) t.Errorf("Expected 30, 30, 32, 32, 31, got %v", stats)
} }
} }
@ -535,13 +535,13 @@ func TestCacheStatsUnordered(t *testing.T) {
} }
cache.Store(uint16(8), 0, false, false, []byte{8}) cache.Store(uint16(8), 0, false, false, []byte{8})
cache.Store(uint16(10), 0, false, false, []byte{10}) cache.Store(uint16(10), 0, false, false, []byte{10})
expected, lost, totalLost, eseqno := cache.GetStats(false) stats := cache.GetStats(false)
if expected != 32 || if stats.Received != 32 ||
lost != 0 || stats.TotalReceived != 32 ||
totalLost != 0 || stats.Expected != 32 ||
eseqno != 31 { stats.TotalExpected != 32 ||
t.Errorf("Expected 32, 0, 0, 31, got %v, %v, %v, %v", stats.ESeqno != 31 {
expected, lost, totalLost, eseqno) t.Errorf("Expected 32, 32, 32, 32, 31, got %v", stats)
} }
} }
@ -555,12 +555,12 @@ func TestCacheStatsNack(t *testing.T) {
cache.Expect(2) cache.Expect(2)
cache.Store(uint16(8), 0, false, false, []byte{8}) cache.Store(uint16(8), 0, false, false, []byte{8})
cache.Store(uint16(10), 0, false, false, []byte{10}) cache.Store(uint16(10), 0, false, false, []byte{10})
expected, lost, totalLost, eseqno := cache.GetStats(false) stats := cache.GetStats(false)
if expected != 34 || if stats.Received != 32 ||
lost != 2 || stats.TotalReceived != 32 ||
totalLost != 2 || stats.Expected != 34 ||
eseqno != 31 { stats.TotalExpected != 34 ||
t.Errorf("Expected 34, 2, 2, 31, got %v, %v, %v, %v", stats.ESeqno != 31 {
expected, lost, totalLost, eseqno) t.Errorf("Expected 32, 32, 34, 34, 31, got %v", stats)
} }
} }

View File

@ -773,12 +773,18 @@ func sendUpRTCP(conn *rtpUpConnection) error {
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
for _, t := range tracks { for _, t := range tracks {
updateUpTrack(t) updateUpTrack(t)
expected, lost, totalLost, eseqno := t.cache.GetStats(true) stats := t.cache.GetStats(true)
if expected == 0 { var totalLost uint32
expected = 1 if stats.TotalExpected > stats.TotalReceived {
totalLost = stats.TotalExpected - stats.TotalReceived
} }
if lost >= expected { var fractionLost uint32
lost = expected - 1 if stats.Expected > stats.Received {
lost := stats.Expected - stats.Received
fractionLost = lost * 256 / stats.Expected
if fractionLost >= 255 {
fractionLost = 255
}
} }
t.mu.Lock() t.mu.Lock()
@ -794,9 +800,9 @@ func sendUpRTCP(conn *rtpUpConnection) error {
reports = append(reports, rtcp.ReceptionReport{ reports = append(reports, rtcp.ReceptionReport{
SSRC: uint32(t.track.SSRC()), SSRC: uint32(t.track.SSRC()),
FractionLost: uint8((lost * 256) / expected), FractionLost: uint8(fractionLost),
TotalLost: totalLost, TotalLost: totalLost,
LastSequenceNumber: eseqno, LastSequenceNumber: stats.ESeqno,
Jitter: t.jitter.Jitter(), Jitter: t.jitter.Jitter(),
LastSenderReport: uint32(srNTPTime >> 16), LastSenderReport: uint32(srNTPTime >> 16),
Delay: uint32(delay), Delay: uint32(delay),

View File

@ -22,11 +22,12 @@ func (c *webClient) GetStats() *stats.Client {
} }
tracks := up.getTracks() tracks := up.getTracks()
for _, t := range tracks { for _, t := range tracks {
expected, lost, _, _ := t.cache.GetStats(false) s := t.cache.GetStats(false)
if expected == 0 { var loss uint8
expected = 1 if s.Expected > s.Received {
loss = uint8((s.Expected - s.Received) * 100 /
s.Expected)
} }
loss := uint8(lost * 100 / expected)
jitter := time.Duration(t.jitter.Jitter()) * jitter := time.Duration(t.jitter.Jitter()) *
(time.Second / time.Duration(t.jitter.HZ())) (time.Second / time.Duration(t.jitter.HZ()))
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()