diff --git a/client.go b/client.go index af5d72a..7dc00f1 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,7 @@ import ( "time" "sfu/estimator" + "sfu/jitter" "sfu/mono" "sfu/packetcache" @@ -294,6 +295,7 @@ func addUpConn(c *client, id string) (*upConnection, error) { track: remote, cache: packetcache.New(96), rate: estimator.New(time.Second), + jitter: jitter.New(remote.Codec().ClockRate), maxBitrate: ^uint64(0), } u.tracks = append(u.tracks, track) @@ -323,7 +325,7 @@ func upLoop(conn *upConnection, track *upTrack) { var localTime uint64 for { now := mono.Microseconds() - if now < localTime || now > localTime + 500000 { + if now < localTime || now > localTime+500000 { local = track.getLocal() localTime = now } @@ -343,6 +345,8 @@ func upLoop(conn *upConnection, track *upTrack) { continue } + track.jitter.Accumulate(packet.Timestamp) + first := track.cache.Store(packet.SequenceNumber, buf[:bytes]) if packet.SequenceNumber-first > 24 { first, bitmap := track.cache.BitmapGet() @@ -379,6 +383,8 @@ func rtcpUpListener(conn *upConnection, track *upTrack, r *webrtc.RTPReceiver) { case *rtcp.SenderReport: atomic.StoreUint32(&track.lastSenderReport, uint32(p.NTPTime>>16)) + atomic.StoreUint32(&track.lastSenderReportTime, + uint32(mono.Now(0x10000))) case *rtcp.SourceDescription: } } @@ -392,7 +398,7 @@ func sendRR(c *client, conn *upConnection) error { return nil } - ssrc := conn.tracks[0].track.SSRC() + now := uint32(mono.Now(0x10000)) reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) for _, t := range conn.tracks { @@ -403,19 +409,24 @@ func sendRR(c *client, conn *upConnection) error { if lost >= expected { lost = expected - 1 } + lastSR := atomic.LoadUint32(&t.lastSenderReport) + delay := now - atomic.LoadUint32(&t.lastSenderReportTime) + reports = append(reports, rtcp.ReceptionReport{ SSRC: t.track.SSRC(), - LastSenderReport: atomic.LoadUint32(&t.lastSenderReport), FractionLost: uint8((lost * 256) / expected), TotalLost: totalLost, LastSequenceNumber: eseqno, + Jitter: t.jitter.Jitter(), + LastSenderReport: lastSR, + Delay: delay, }) } c.mu.Unlock() return conn.pc.WriteRTCP([]rtcp.Packet{ &rtcp.ReceiverReport{ - SSRC: ssrc, + SSRC: 1, Reports: reports, }, }) @@ -609,6 +620,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt &track.loss, uint32(r.FractionLost), ) + atomic.StoreUint32( + &track.jitter, + r.Jitter) } } case *rtcp.TransportLayerNack: diff --git a/group.go b/group.go index f8a73ba..ae0430e 100644 --- a/group.go +++ b/group.go @@ -17,6 +17,7 @@ import ( "time" "sfu/estimator" + "sfu/jitter" "sfu/mono" "sfu/packetcache" @@ -24,12 +25,14 @@ import ( ) type upTrack struct { - track *webrtc.Track - rate *estimator.Estimator - cache *packetcache.Cache - maxBitrate uint64 - lastPLI uint64 - lastSenderReport uint32 + track *webrtc.Track + rate *estimator.Estimator + cache *packetcache.Cache + jitter *jitter.Estimator + maxBitrate uint64 + lastPLI uint64 + lastSenderReport uint32 + lastSenderReportTime uint32 mu sync.Mutex local []*downTrack @@ -96,6 +99,7 @@ type downTrack struct { maxBitrate *timeStampedBitrate rate *estimator.Estimator loss uint32 + jitter uint32 } type downConnection struct { diff --git a/jitter/jitter.go b/jitter/jitter.go new file mode 100644 index 0000000..06ee5b8 --- /dev/null +++ b/jitter/jitter.go @@ -0,0 +1,49 @@ +package jitter + +import ( + "sync/atomic" + + "sfu/mono" +) + +type Estimator struct { + hz uint32 + timestamp uint32 + time uint32 + + jitter uint32 // atomic +} + +func New(hz uint32) *Estimator { + return &Estimator{hz: hz} +} + +func (e *Estimator) accumulate(timestamp, now uint32) { + if e.time == 0 { + e.timestamp = timestamp + e.time = now + } + + d := uint32((e.time - now) - (e.timestamp - timestamp)) + if d & 0x80000000 != 0 { + d = uint32(-int32(d)) + } + oldjitter := atomic.LoadUint32(&e.jitter) + jitter := (oldjitter * 15 + d) / 16 + atomic.StoreUint32(&e.jitter, jitter) + + e.timestamp = timestamp + e.time = now +} + +func (e *Estimator) Accumulate(timestamp uint32) { + e.accumulate(timestamp, uint32(mono.Now(e.hz))) +} + +func (e *Estimator) Jitter() uint32 { + return atomic.LoadUint32(&e.jitter) +} + +func (e *Estimator) HZ() uint32 { + return e.hz +} diff --git a/jitter/jitter_test.go b/jitter/jitter_test.go new file mode 100644 index 0000000..5f22bed --- /dev/null +++ b/jitter/jitter_test.go @@ -0,0 +1,37 @@ +package jitter + +import ( + "testing" +) + +func TestJitter(t *testing.T) { + e := New(48000) + e.accumulate(0, 0) + e.accumulate(1000, 1000) + e.accumulate(2000, 2000) + e.accumulate(3000, 3000) + + if e.Jitter() != 0 { + t.Errorf("Expected 0, got %v", e.Jitter()) + } + + e = New(48000) + e.accumulate(0, 0) + e.accumulate(1000, 1000) + e.accumulate(2000, 2200) + e.accumulate(3000, 3000) + + if e.Jitter() != 23 { + t.Errorf("Expected 23, got %v", e.Jitter()) + } + + e = New(48000) + e.accumulate(0, 0) + e.accumulate(1000, 1000) + e.accumulate(2000, 1800) + e.accumulate(3000, 3000) + + if e.Jitter() != 23 { + t.Errorf("Expected 23, got %v", e.Jitter()) + } +}