From b042bed9a30a841b97784e3fe73f220b2dde99f5 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Wed, 29 Apr 2020 03:03:47 +0200 Subject: [PATCH] Maintain reception statistics, send receiver reports. --- client.go | 80 +++++++++++++++++++++++++++++++++++++- group.go | 9 +++-- packetcache/packetcache.go | 64 +++++++++++++++++++++++++++--- 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 5c721e2..76a329c 100644 --- a/client.go +++ b/client.go @@ -277,6 +277,8 @@ func addUpConn(c *client, id string) (*upConnection, error) { sendICE(c, id, candidate) }) + go rtcpUpSender(c, conn) + pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { c.mu.Lock() u, ok := c.up[id] @@ -303,6 +305,8 @@ func addUpConn(c *client, id string) (*upConnection, error) { } go upLoop(conn, track) + + go rtcpUpListener(conn, track, receiver) }) return conn, nil @@ -357,6 +361,78 @@ func upLoop(conn *upConnection, track *upTrack) { } } +func rtcpUpListener(conn *upConnection, track *upTrack, r *webrtc.RTPReceiver) { + for { + ps, err := r.ReadRTCP() + if err != nil { + if err != io.EOF { + log.Printf("ReadRTCP: %v", err) + } + return + } + + for _, p := range ps { + switch p := p.(type) { + case *rtcp.SenderReport: + atomic.StoreUint32(&track.lastSenderReport, + uint32(p.NTPTime>>16)) + case *rtcp.SourceDescription: + default: + log.Printf("RTCP: %T", p) + } + } + } +} + +func sendRR(c *client, conn *upConnection) error { + c.mu.Lock() + if len(conn.tracks) == 0 { + c.mu.Unlock() + return nil + } + + ssrc := conn.tracks[0].track.SSRC() + + reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) + for _, t := range conn.tracks { + expected, lost, eseqno := t.cache.GetStats(true) + if expected == 0 { + expected = 1 + } + if lost >= expected { + lost = expected - 1 + } + reports = append(reports, rtcp.ReceptionReport{ + SSRC: t.track.SSRC(), + LastSenderReport: atomic.LoadUint32(&t.lastSenderReport), + FractionLost: uint8((lost * 256) / expected), + TotalLost: lost, + LastSequenceNumber: eseqno, + }) + } + c.mu.Unlock() + + return conn.pc.WriteRTCP([]rtcp.Packet{ + &rtcp.ReceiverReport{ + SSRC: ssrc, + Reports: reports, + }, + }) +} + +func rtcpUpSender(c *client, conn *upConnection) { + for { + time.Sleep(time.Second) + err := sendRR(c, conn) + if err != nil { + if err == io.EOF || err == io.ErrClosedPipe { + return + } + log.Printf("WriteRTCP: %v", err) + } + } +} + func delUpConn(c *client, id string) { c.mu.Lock() defer c.mu.Unlock() @@ -498,7 +574,7 @@ func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConn conn.tracks = append(conn.tracks, track) remoteTrack.addLocal(track) - go rtcpListener(c.group, conn, track, s) + go rtcpDownListener(c.group, conn, track, s) return conn, s, nil } @@ -509,7 +585,7 @@ func msSinceEpoch() uint64 { return uint64(time.Since(epoch) / time.Millisecond) } -func rtcpListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) { +func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) { for { ps, err := s.ReadRTCP() if err != nil { diff --git a/group.go b/group.go index f6149fb..f1b4326 100644 --- a/group.go +++ b/group.go @@ -21,10 +21,11 @@ import ( ) type upTrack struct { - track *webrtc.Track - cache *packetcache.Cache - maxBitrate uint64 - lastPLI uint64 + track *webrtc.Track + cache *packetcache.Cache + maxBitrate uint64 + lastPLI uint64 + lastSenderReport uint32 mu sync.Mutex local []*downTrack diff --git a/packetcache/packetcache.go b/packetcache/packetcache.go index 48b84d3..50aa6f0 100644 --- a/packetcache/packetcache.go +++ b/packetcache/packetcache.go @@ -13,10 +13,18 @@ type entry struct { } type Cache struct { - mu sync.Mutex - first uint16 // the first seqno - bitmap uint32 - tail int // the next entry to be rewritten + mu sync.Mutex + //stats + last uint16 + cycle uint16 + lastValid bool + expected uint32 + lost uint32 + // bitmap + first uint16 + bitmap uint32 + // packet cache + tail int entries []entry } @@ -26,9 +34,21 @@ func New(capacity int) *Cache { } } +func seqnoInvalid(seqno, reference uint16) bool { + if ((seqno - reference) & 0x8000) == 0 { + return false + } + + if reference - seqno > 0x100 { + return true + } + + return false +} + // Set a bit in the bitmap, shifting first if necessary. func (cache *Cache) set(seqno uint16) { - if cache.bitmap == 0 { + if cache.bitmap == 0 || seqnoInvalid(seqno, cache.first) { cache.first = seqno cache.bitmap = 1 return @@ -65,6 +85,25 @@ func (cache *Cache) Store(seqno uint16, buf []byte) uint16 { cache.mu.Lock() defer cache.mu.Unlock() + if !cache.lastValid || seqnoInvalid(seqno, cache.last) { + cache.last = seqno + cache.lastValid = true + cache.expected++ + } else { + if ((cache.last - seqno) & 0x8000) != 0 { + cache.expected += uint32(seqno - cache.last) + cache.lost += uint32(seqno - cache.last - 1) + if seqno < cache.last { + cache.cycle++ + } + cache.last = seqno + } else { + if cache.lost > 0 { + cache.lost-- + } + } + } + cache.set(seqno) cache.entries[cache.tail].seqno = seqno @@ -102,3 +141,18 @@ func (cache *Cache) BitmapGet() (uint16, uint16) { cache.first += 17 return first, bitmap } + +func (cache *Cache) GetStats(reset bool) (uint32, uint32, uint32) { + cache.mu.Lock() + defer cache.mu.Unlock() + + expected := cache.expected + lost := cache.lost + eseqno := uint32(cache.cycle)<<16 | uint32(cache.last) + + if reset { + cache.expected = 0 + cache.lost = 0 + } + return expected, lost, eseqno +}