From 5a1ef1ddd7a3ce0f72c2c46299a43a1ed7866a57 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Thu, 30 Apr 2020 21:22:00 +0200 Subject: [PATCH] Use a proper accessor for timestampedByterate. --- client.go | 26 ++++---------------------- group.go | 24 +++++++++++++++++++++++- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/client.go b/client.go index 7344fa4..b7aeb0c 100644 --- a/client.go +++ b/client.go @@ -579,12 +579,6 @@ func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConn return conn, s, nil } -var epoch = time.Now() - -func msSinceEpoch() uint64 { - return uint64(time.Since(epoch) / time.Millisecond) -} - func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) { for { ps, err := s.ReadRTCP() @@ -603,18 +597,7 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt log.Printf("sendPLI: %v", err) } case *rtcp.ReceiverEstimatedMaximumBitrate: - ms := msSinceEpoch() - // this is racy -- a reader might read the - // data between the two writes. This shouldn't - // matter, we'll recover at the next sample. - atomic.StoreUint64( - &track.maxBitrate.bitrate, - p.Bitrate, - ) - atomic.StoreUint64( - &track.maxBitrate.timestamp, - uint64(ms), - ) + track.maxBitrate.Set(p.Bitrate, msSinceEpoch()) case *rtcp.ReceiverReport: for _, r := range p.Reports { if r.SSRC == track.track.SSRC() { @@ -658,10 +641,8 @@ func updateUpBitrate(up *upConnection) { track.maxBitrate = ^uint64(0) local := track.getLocal() for _, l := range local { - ms := atomic.LoadUint64(&l.maxBitrate.timestamp) - bitrate := atomic.LoadUint64(&l.maxBitrate.bitrate) - loss := atomic.LoadUint32(&l.loss) - if now < ms || now > ms+5000 || bitrate == 0 { + bitrate := l.maxBitrate.Get(now) + if bitrate == ^uint64(0) { continue } @@ -673,6 +654,7 @@ func updateUpBitrate(up *upConnection) { minrate2 = 512000 } if bitrate < minrate2 { + loss := atomic.LoadUint32(&l.loss) if loss <= 13 { // less than 10% loss, go ahead bitrate = minrate2 diff --git a/group.go b/group.go index 5366c32..f54219a 100644 --- a/group.go +++ b/group.go @@ -68,11 +68,33 @@ type upConnection struct { tracks []*upTrack } +func msSinceEpoch() uint64 { + return uint64(time.Since(epoch) / time.Millisecond) +} + +var epoch = time.Now() + type timeStampedBitrate struct { bitrate uint64 timestamp uint64 } +func (tb *timeStampedBitrate) Set(bitrate, timestamp uint64) { + // this is racy -- a reader might read the + // data between the two writes. This shouldn't + // matter, we'll recover at the next sample. + atomic.StoreUint64(&tb.bitrate, bitrate) + atomic.StoreUint64(&tb.timestamp, timestamp) +} + +func (tb *timeStampedBitrate) Get(now uint64) uint64 { + ts := atomic.LoadUint64(&tb.timestamp) + if now < ts || now > ts + 1000 { + return ^uint64(0) + } + return atomic.LoadUint64(&tb.bitrate) +} + type downTrack struct { track *webrtc.Track remote *upTrack @@ -695,7 +717,7 @@ func getClientStats(c *client) clientStats { loss := atomic.LoadUint32(&t.loss) conns.tracks = append(conns.tracks, trackStats{ bitrate: uint64(t.rate.Estimate()) * 8, - maxBitrate: atomic.LoadUint64(&t.maxBitrate.bitrate), + maxBitrate: t.maxBitrate.Get(msSinceEpoch()), loss: uint8((loss * 100) / 256), }) }