diff --git a/client.go b/client.go index 90eadea..af5d72a 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,7 @@ import ( "time" "sfu/estimator" + "sfu/mono" "sfu/packetcache" "github.com/gorilla/websocket" @@ -319,10 +320,10 @@ func upLoop(conn *upConnection, track *upTrack) { buf := make([]byte, packetcache.BufSize) var packet rtp.Packet var local []*downTrack - var localTime time.Time + var localTime uint64 for { - now := time.Now() - if now.Sub(localTime) > time.Second/2 { + now := mono.Microseconds() + if now < localTime || now > localTime + 500000 { local = track.getLocal() localTime = now } @@ -598,7 +599,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt log.Printf("sendPLI: %v", err) } case *rtcp.ReceiverEstimatedMaximumBitrate: - track.maxBitrate.Set(p.Bitrate, msSinceEpoch()) + track.maxBitrate.Set(p.Bitrate, + mono.Microseconds(), + ) case *rtcp.ReceiverReport: for _, r := range p.Reports { if r.SSRC == track.track.SSRC() { @@ -609,7 +612,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt } } case *rtcp.TransportLayerNack: - maxBitrate := track.maxBitrate.Get(msSinceEpoch()) + maxBitrate := track.maxBitrate.Get( + mono.Microseconds(), + ) bitrate := track.rate.Estimate() if uint64(bitrate) < maxBitrate { sendRecovery(p, track) @@ -640,7 +645,7 @@ func trackKinds(down *downConnection) (audio bool, video bool) { } func updateUpBitrate(up *upConnection) { - now := msSinceEpoch() + now := mono.Microseconds() for _, track := range up.tracks { track.maxBitrate = ^uint64(0) @@ -678,8 +683,8 @@ func updateUpBitrate(up *upConnection) { func (up *upConnection) sendPLI(track *upTrack) error { last := atomic.LoadUint64(&track.lastPLI) - now := msSinceEpoch() - if now >= last && now-last < 200 { + now := mono.Microseconds() + if now >= last && now-last < 200000 { return nil } atomic.StoreUint64(&track.lastPLI, now) diff --git a/group.go b/group.go index 9b66cef..f8a73ba 100644 --- a/group.go +++ b/group.go @@ -17,6 +17,7 @@ import ( "time" "sfu/estimator" + "sfu/mono" "sfu/packetcache" "github.com/pion/webrtc/v2" @@ -68,28 +69,22 @@ type upConnection struct { tracks []*upTrack } -func msSinceEpoch() uint64 { - return uint64(time.Since(epoch) / time.Millisecond) -} - -var epoch = time.Now() - type timeStampedBitrate struct { - bitrate uint64 - timestamp uint64 + bitrate uint64 + microseconds uint64 } -func (tb *timeStampedBitrate) Set(bitrate, timestamp uint64) { +func (tb *timeStampedBitrate) Set(bitrate, us uint64) { // this is racy -- a reader might read the // data between the two writes. This shouldn't // matter, we'll recover at the next sample. - atomic.StoreUint64(&tb.bitrate, bitrate) - atomic.StoreUint64(&tb.timestamp, timestamp) + atomic.StoreUint64(&tb.bitrate, bitrate) + atomic.StoreUint64(&tb.microseconds, us) } func (tb *timeStampedBitrate) Get(now uint64) uint64 { - ts := atomic.LoadUint64(&tb.timestamp) - if now < ts || now > ts + 1000 { + ts := atomic.LoadUint64(&tb.microseconds) + if now < ts || now > ts+4000000 { return ^uint64(0) } return atomic.LoadUint64(&tb.bitrate) @@ -720,7 +715,7 @@ func getClientStats(c *client) clientStats { loss := atomic.LoadUint32(&t.loss) conns.tracks = append(conns.tracks, trackStats{ bitrate: uint64(t.rate.Estimate()) * 8, - maxBitrate: t.maxBitrate.Get(msSinceEpoch()), + maxBitrate: t.maxBitrate.Get(mono.Microseconds()), loss: uint8((loss * 100) / 256), }) } diff --git a/mono/mono.go b/mono/mono.go new file mode 100644 index 0000000..e17eafb --- /dev/null +++ b/mono/mono.go @@ -0,0 +1,19 @@ +package mono + +import ( + "time" +) + +var epoch = time.Now() + +func fromDuration(d time.Duration, hz uint32) uint64 { + return uint64(d) * uint64(hz) / uint64(time.Second) +} + +func Now(hz uint32) uint64 { + return fromDuration(time.Since(epoch), hz) +} + +func Microseconds() uint64 { + return Now(1000000) +} diff --git a/mono/mono_test.go b/mono/mono_test.go new file mode 100644 index 0000000..f592c83 --- /dev/null +++ b/mono/mono_test.go @@ -0,0 +1,29 @@ +package mono + +import ( + "testing" + "time" +) + +func differs(a, b, delta uint64) bool { + if a < b { + a, b = b, a + } + return a - b >= delta +} + +func TestMono(t *testing.T) { + a := Now(48000) + time.Sleep(4 * time.Millisecond) + b := Now(48000) - a + if differs(b, 4 * 48, 16) { + t.Errorf("Expected %v, got %v", 4 * 48, b) + } + + c := Microseconds() + time.Sleep(4 * time.Millisecond) + d := Microseconds() - c + if differs(d, 4000, 1000) { + t.Errorf("Expected %v, got %v", 4000, d) + } +}