1
Fork 0

Move monotonic time to separate package, use microseconds.

This commit is contained in:
Juliusz Chroboczek 2020-05-02 15:27:47 +02:00
parent 1f50b42ee0
commit 7961d7279b
4 changed files with 70 additions and 22 deletions

View File

@ -18,6 +18,7 @@ import (
"time" "time"
"sfu/estimator" "sfu/estimator"
"sfu/mono"
"sfu/packetcache" "sfu/packetcache"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -319,10 +320,10 @@ func upLoop(conn *upConnection, track *upTrack) {
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
var local []*downTrack var local []*downTrack
var localTime time.Time var localTime uint64
for { for {
now := time.Now() now := mono.Microseconds()
if now.Sub(localTime) > time.Second/2 { if now < localTime || now > localTime + 500000 {
local = track.getLocal() local = track.getLocal()
localTime = now localTime = now
} }
@ -598,7 +599,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt
log.Printf("sendPLI: %v", err) log.Printf("sendPLI: %v", err)
} }
case *rtcp.ReceiverEstimatedMaximumBitrate: case *rtcp.ReceiverEstimatedMaximumBitrate:
track.maxBitrate.Set(p.Bitrate, msSinceEpoch()) track.maxBitrate.Set(p.Bitrate,
mono.Microseconds(),
)
case *rtcp.ReceiverReport: case *rtcp.ReceiverReport:
for _, r := range p.Reports { for _, r := range p.Reports {
if r.SSRC == track.track.SSRC() { if r.SSRC == track.track.SSRC() {
@ -609,7 +612,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt
} }
} }
case *rtcp.TransportLayerNack: case *rtcp.TransportLayerNack:
maxBitrate := track.maxBitrate.Get(msSinceEpoch()) maxBitrate := track.maxBitrate.Get(
mono.Microseconds(),
)
bitrate := track.rate.Estimate() bitrate := track.rate.Estimate()
if uint64(bitrate) < maxBitrate { if uint64(bitrate) < maxBitrate {
sendRecovery(p, track) sendRecovery(p, track)
@ -640,7 +645,7 @@ func trackKinds(down *downConnection) (audio bool, video bool) {
} }
func updateUpBitrate(up *upConnection) { func updateUpBitrate(up *upConnection) {
now := msSinceEpoch() now := mono.Microseconds()
for _, track := range up.tracks { for _, track := range up.tracks {
track.maxBitrate = ^uint64(0) track.maxBitrate = ^uint64(0)
@ -678,8 +683,8 @@ func updateUpBitrate(up *upConnection) {
func (up *upConnection) sendPLI(track *upTrack) error { func (up *upConnection) sendPLI(track *upTrack) error {
last := atomic.LoadUint64(&track.lastPLI) last := atomic.LoadUint64(&track.lastPLI)
now := msSinceEpoch() now := mono.Microseconds()
if now >= last && now-last < 200 { if now >= last && now-last < 200000 {
return nil return nil
} }
atomic.StoreUint64(&track.lastPLI, now) atomic.StoreUint64(&track.lastPLI, now)

View File

@ -17,6 +17,7 @@ import (
"time" "time"
"sfu/estimator" "sfu/estimator"
"sfu/mono"
"sfu/packetcache" "sfu/packetcache"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
@ -68,28 +69,22 @@ type upConnection struct {
tracks []*upTrack tracks []*upTrack
} }
func msSinceEpoch() uint64 {
return uint64(time.Since(epoch) / time.Millisecond)
}
var epoch = time.Now()
type timeStampedBitrate struct { type timeStampedBitrate struct {
bitrate uint64 bitrate uint64
timestamp uint64 microseconds uint64
} }
func (tb *timeStampedBitrate) Set(bitrate, timestamp uint64) { func (tb *timeStampedBitrate) Set(bitrate, us uint64) {
// this is racy -- a reader might read the // this is racy -- a reader might read the
// data between the two writes. This shouldn't // data between the two writes. This shouldn't
// matter, we'll recover at the next sample. // matter, we'll recover at the next sample.
atomic.StoreUint64(&tb.bitrate, bitrate) atomic.StoreUint64(&tb.bitrate, bitrate)
atomic.StoreUint64(&tb.timestamp, timestamp) atomic.StoreUint64(&tb.microseconds, us)
} }
func (tb *timeStampedBitrate) Get(now uint64) uint64 { func (tb *timeStampedBitrate) Get(now uint64) uint64 {
ts := atomic.LoadUint64(&tb.timestamp) ts := atomic.LoadUint64(&tb.microseconds)
if now < ts || now > ts + 1000 { if now < ts || now > ts+4000000 {
return ^uint64(0) return ^uint64(0)
} }
return atomic.LoadUint64(&tb.bitrate) return atomic.LoadUint64(&tb.bitrate)
@ -720,7 +715,7 @@ func getClientStats(c *client) clientStats {
loss := atomic.LoadUint32(&t.loss) loss := atomic.LoadUint32(&t.loss)
conns.tracks = append(conns.tracks, trackStats{ conns.tracks = append(conns.tracks, trackStats{
bitrate: uint64(t.rate.Estimate()) * 8, bitrate: uint64(t.rate.Estimate()) * 8,
maxBitrate: t.maxBitrate.Get(msSinceEpoch()), maxBitrate: t.maxBitrate.Get(mono.Microseconds()),
loss: uint8((loss * 100) / 256), loss: uint8((loss * 100) / 256),
}) })
} }

19
mono/mono.go Normal file
View File

@ -0,0 +1,19 @@
package mono
import (
"time"
)
var epoch = time.Now()
func fromDuration(d time.Duration, hz uint32) uint64 {
return uint64(d) * uint64(hz) / uint64(time.Second)
}
func Now(hz uint32) uint64 {
return fromDuration(time.Since(epoch), hz)
}
func Microseconds() uint64 {
return Now(1000000)
}

29
mono/mono_test.go Normal file
View File

@ -0,0 +1,29 @@
package mono
import (
"testing"
"time"
)
func differs(a, b, delta uint64) bool {
if a < b {
a, b = b, a
}
return a - b >= delta
}
func TestMono(t *testing.T) {
a := Now(48000)
time.Sleep(4 * time.Millisecond)
b := Now(48000) - a
if differs(b, 4 * 48, 16) {
t.Errorf("Expected %v, got %v", 4 * 48, b)
}
c := Microseconds()
time.Sleep(4 * time.Millisecond)
d := Microseconds() - c
if differs(d, 4000, 1000) {
t.Errorf("Expected %v, got %v", 4000, d)
}
}