diff --git a/conn.go b/conn.go index 7cd1498..6eaacc7 100644 --- a/conn.go +++ b/conn.go @@ -281,6 +281,9 @@ type rtpDownTrack struct { maxREMBBitrate *bitrate rate *estimator.Estimator stats *receiverStats + srTime uint64 + srNTPTime uint64 + rtt uint64 } func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error { diff --git a/group.go b/group.go index 31dcba5..c4c8c7c 100644 --- a/group.go +++ b/group.go @@ -518,6 +518,7 @@ type trackStats struct { bitrate uint64 maxBitrate uint64 loss uint8 + rtt time.Duration jitter time.Duration } @@ -589,6 +590,8 @@ func getClientStats(c *webClient) clientStats { conns := connStats{id: down.id} for _, t := range down.tracks { jiffies := rtptime.Jiffies() + rtt := rtptime.ToDuration(atomic.LoadUint64(&t.rtt), + rtptime.JiffiesPerSec) loss, jitter := t.stats.Get(jiffies) j := time.Duration(jitter) * time.Second / time.Duration(t.track.Codec().ClockRate) @@ -596,6 +599,7 @@ func getClientStats(c *webClient) clientStats { bitrate: uint64(t.rate.Estimate()) * 8, maxBitrate: t.GetMaxBitrate(jiffies), loss: uint8(uint32(loss) * 100 / 256), + rtt: rtt, jitter: j, }) } diff --git a/rtptime/rtptime.go b/rtptime/rtptime.go index 4efad55..0892d46 100644 --- a/rtptime/rtptime.go +++ b/rtptime/rtptime.go @@ -10,7 +10,7 @@ func FromDuration(d time.Duration, hz uint32) uint64 { return uint64(d) * uint64(hz) / uint64(time.Second) } -func toDuration(tm uint64, hz uint32) time.Duration { +func ToDuration(tm uint64, hz uint32) time.Duration { return time.Duration(tm * uint64(time.Second) / uint64(hz)) } @@ -29,6 +29,10 @@ func Jiffies() uint64 { return Now(JiffiesPerSec) } +func TimeToJiffies(tm time.Time) uint64 { + return FromDuration(tm.Sub(epoch), JiffiesPerSec) +} + var ntpEpoch = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC) func NTPToTime(ntp uint64) time.Time { diff --git a/rtptime/rtptime_test.go b/rtptime/rtptime_test.go index 3304e5e..40cd552 100644 --- a/rtptime/rtptime_test.go +++ b/rtptime/rtptime_test.go @@ -11,7 +11,7 @@ func TestDuration(t *testing.T) { t.Errorf("Expected 48000, got %v", a) } - b := toDuration(48000, 48000) + b := ToDuration(48000, 48000) if b != time.Second { t.Errorf("Expected %v, got %v", time.Second, b) } diff --git a/webclient.go b/webclient.go index 5d90094..22fdaff 100644 --- a/webclient.go +++ b/webclient.go @@ -20,8 +20,8 @@ import ( "sfu/estimator" "sfu/jitter" - "sfu/rtptime" "sfu/packetcache" + "sfu/rtptime" "github.com/gorilla/websocket" "github.com/pion/rtcp" @@ -633,6 +633,7 @@ func sendSR(conn *rtpDownConnection) error { now := time.Now() nowNTP := rtptime.TimeToNTP(now) + jiffies := rtptime.TimeToJiffies(now) for _, t := range conn.tracks { clockrate := t.track.Codec().ClockRate @@ -661,6 +662,8 @@ func sendSR(conn *rtpDownConnection) error { PacketCount: p, OctetCount: b, }) + atomic.StoreUint64(&t.srTime, jiffies) + atomic.StoreUint64(&t.srNTPTime, nowNTP) } return conn.pc.WriteRTCP(packets) @@ -876,6 +879,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT } return } + jiffies := rtptime.Jiffies() for _, p := range ps { switch p := p.(type) { @@ -918,25 +922,21 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT log.Printf("sendFIR: %v", err) } case *rtcp.ReceiverEstimatedMaximumBitrate: - track.maxREMBBitrate.Set( - p.Bitrate, rtptime.Jiffies(), - ) + track.maxREMBBitrate.Set(p.Bitrate, jiffies) case *rtcp.ReceiverReport: for _, r := range p.Reports { if r.SSRC == track.track.SSRC() { - handleReport(track, r) + handleReport(track, r, jiffies) } } case *rtcp.SenderReport: for _, r := range p.Reports { if r.SSRC == track.track.SSRC() { - handleReport(track, r) + handleReport(track, r, jiffies) } } case *rtcp.TransportLayerNack: - maxBitrate := track.GetMaxBitrate( - rtptime.Jiffies(), - ) + maxBitrate := track.GetMaxBitrate(jiffies) bitrate := track.rate.Estimate() if uint64(bitrate)*7/8 < maxBitrate { sendRecovery(p, track) @@ -946,10 +946,32 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT } } -func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport) { - jiffies := rtptime.Jiffies() +func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint64) { track.stats.Set(report.FractionLost, report.Jitter, jiffies) track.updateRate(report.FractionLost, jiffies) + + if report.LastSenderReport != 0 { + jiffies := rtptime.Jiffies() + srTime := atomic.LoadUint64(&track.srTime) + if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec { + return + } + srNTPTime := atomic.LoadUint64(&track.srNTPTime) + if report.LastSenderReport == uint32(srNTPTime>>16) { + delay := uint64(report.Delay) * + (rtptime.JiffiesPerSec / 0x10000) + if delay > jiffies-srTime { + return + } + rtt := (jiffies - srTime) - delay + oldrtt := atomic.LoadUint64(&track.rtt) + newrtt := rtt + if oldrtt > 0 { + newrtt = (3*oldrtt + rtt) / 4 + } + atomic.StoreUint64(&track.rtt, newrtt) + } + } } func trackKinds(down *rtpDownConnection) (audio bool, video bool) { @@ -1013,7 +1035,7 @@ func (up *upConnection) sendPLI(track *upTrack) error { } last := atomic.LoadUint64(&track.lastPLI) now := rtptime.Jiffies() - if now >= last && now-last < rtptime.JiffiesPerSec / 5 { + if now >= last && now-last < rtptime.JiffiesPerSec/5 { return ErrRateLimited } atomic.StoreUint64(&track.lastPLI, now) @@ -1041,7 +1063,7 @@ func (up *upConnection) sendFIR(track *upTrack, increment bool) error { } last := atomic.LoadUint64(&track.lastFIR) now := rtptime.Jiffies() - if now >= last && now-last < rtptime.JiffiesPerSec / 5 { + if now >= last && now-last < rtptime.JiffiesPerSec/5 { return ErrRateLimited } atomic.StoreUint64(&track.lastFIR, now) diff --git a/webserver.go b/webserver.go index 872ccee..55a87a9 100644 --- a/webserver.go +++ b/webserver.go @@ -154,11 +154,14 @@ func statsHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "