1
Fork 0

Include jitter and delay in receiver reports.

This commit is contained in:
Juliusz Chroboczek 2020-05-02 16:21:48 +02:00
parent 7961d7279b
commit 4dd245712f
4 changed files with 114 additions and 10 deletions

View File

@ -18,6 +18,7 @@ import (
"time" "time"
"sfu/estimator" "sfu/estimator"
"sfu/jitter"
"sfu/mono" "sfu/mono"
"sfu/packetcache" "sfu/packetcache"
@ -294,6 +295,7 @@ func addUpConn(c *client, id string) (*upConnection, error) {
track: remote, track: remote,
cache: packetcache.New(96), cache: packetcache.New(96),
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate),
maxBitrate: ^uint64(0), maxBitrate: ^uint64(0),
} }
u.tracks = append(u.tracks, track) u.tracks = append(u.tracks, track)
@ -323,7 +325,7 @@ func upLoop(conn *upConnection, track *upTrack) {
var localTime uint64 var localTime uint64
for { for {
now := mono.Microseconds() now := mono.Microseconds()
if now < localTime || now > localTime + 500000 { if now < localTime || now > localTime+500000 {
local = track.getLocal() local = track.getLocal()
localTime = now localTime = now
} }
@ -343,6 +345,8 @@ func upLoop(conn *upConnection, track *upTrack) {
continue continue
} }
track.jitter.Accumulate(packet.Timestamp)
first := track.cache.Store(packet.SequenceNumber, buf[:bytes]) first := track.cache.Store(packet.SequenceNumber, buf[:bytes])
if packet.SequenceNumber-first > 24 { if packet.SequenceNumber-first > 24 {
first, bitmap := track.cache.BitmapGet() first, bitmap := track.cache.BitmapGet()
@ -379,6 +383,8 @@ func rtcpUpListener(conn *upConnection, track *upTrack, r *webrtc.RTPReceiver) {
case *rtcp.SenderReport: case *rtcp.SenderReport:
atomic.StoreUint32(&track.lastSenderReport, atomic.StoreUint32(&track.lastSenderReport,
uint32(p.NTPTime>>16)) uint32(p.NTPTime>>16))
atomic.StoreUint32(&track.lastSenderReportTime,
uint32(mono.Now(0x10000)))
case *rtcp.SourceDescription: case *rtcp.SourceDescription:
} }
} }
@ -392,7 +398,7 @@ func sendRR(c *client, conn *upConnection) error {
return nil return nil
} }
ssrc := conn.tracks[0].track.SSRC() now := uint32(mono.Now(0x10000))
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
for _, t := range conn.tracks { for _, t := range conn.tracks {
@ -403,19 +409,24 @@ func sendRR(c *client, conn *upConnection) error {
if lost >= expected { if lost >= expected {
lost = expected - 1 lost = expected - 1
} }
lastSR := atomic.LoadUint32(&t.lastSenderReport)
delay := now - atomic.LoadUint32(&t.lastSenderReportTime)
reports = append(reports, rtcp.ReceptionReport{ reports = append(reports, rtcp.ReceptionReport{
SSRC: t.track.SSRC(), SSRC: t.track.SSRC(),
LastSenderReport: atomic.LoadUint32(&t.lastSenderReport),
FractionLost: uint8((lost * 256) / expected), FractionLost: uint8((lost * 256) / expected),
TotalLost: totalLost, TotalLost: totalLost,
LastSequenceNumber: eseqno, LastSequenceNumber: eseqno,
Jitter: t.jitter.Jitter(),
LastSenderReport: lastSR,
Delay: delay,
}) })
} }
c.mu.Unlock() c.mu.Unlock()
return conn.pc.WriteRTCP([]rtcp.Packet{ return conn.pc.WriteRTCP([]rtcp.Packet{
&rtcp.ReceiverReport{ &rtcp.ReceiverReport{
SSRC: ssrc, SSRC: 1,
Reports: reports, Reports: reports,
}, },
}) })
@ -609,6 +620,9 @@ func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrt
&track.loss, &track.loss,
uint32(r.FractionLost), uint32(r.FractionLost),
) )
atomic.StoreUint32(
&track.jitter,
r.Jitter)
} }
} }
case *rtcp.TransportLayerNack: case *rtcp.TransportLayerNack:

View File

@ -17,6 +17,7 @@ import (
"time" "time"
"sfu/estimator" "sfu/estimator"
"sfu/jitter"
"sfu/mono" "sfu/mono"
"sfu/packetcache" "sfu/packetcache"
@ -24,12 +25,14 @@ import (
) )
type upTrack struct { type upTrack struct {
track *webrtc.Track track *webrtc.Track
rate *estimator.Estimator rate *estimator.Estimator
cache *packetcache.Cache cache *packetcache.Cache
maxBitrate uint64 jitter *jitter.Estimator
lastPLI uint64 maxBitrate uint64
lastSenderReport uint32 lastPLI uint64
lastSenderReport uint32
lastSenderReportTime uint32
mu sync.Mutex mu sync.Mutex
local []*downTrack local []*downTrack
@ -96,6 +99,7 @@ type downTrack struct {
maxBitrate *timeStampedBitrate maxBitrate *timeStampedBitrate
rate *estimator.Estimator rate *estimator.Estimator
loss uint32 loss uint32
jitter uint32
} }
type downConnection struct { type downConnection struct {

49
jitter/jitter.go Normal file
View File

@ -0,0 +1,49 @@
package jitter
import (
"sync/atomic"
"sfu/mono"
)
type Estimator struct {
hz uint32
timestamp uint32
time uint32
jitter uint32 // atomic
}
func New(hz uint32) *Estimator {
return &Estimator{hz: hz}
}
func (e *Estimator) accumulate(timestamp, now uint32) {
if e.time == 0 {
e.timestamp = timestamp
e.time = now
}
d := uint32((e.time - now) - (e.timestamp - timestamp))
if d & 0x80000000 != 0 {
d = uint32(-int32(d))
}
oldjitter := atomic.LoadUint32(&e.jitter)
jitter := (oldjitter * 15 + d) / 16
atomic.StoreUint32(&e.jitter, jitter)
e.timestamp = timestamp
e.time = now
}
func (e *Estimator) Accumulate(timestamp uint32) {
e.accumulate(timestamp, uint32(mono.Now(e.hz)))
}
func (e *Estimator) Jitter() uint32 {
return atomic.LoadUint32(&e.jitter)
}
func (e *Estimator) HZ() uint32 {
return e.hz
}

37
jitter/jitter_test.go Normal file
View File

@ -0,0 +1,37 @@
package jitter
import (
"testing"
)
func TestJitter(t *testing.T) {
e := New(48000)
e.accumulate(0, 0)
e.accumulate(1000, 1000)
e.accumulate(2000, 2000)
e.accumulate(3000, 3000)
if e.Jitter() != 0 {
t.Errorf("Expected 0, got %v", e.Jitter())
}
e = New(48000)
e.accumulate(0, 0)
e.accumulate(1000, 1000)
e.accumulate(2000, 2200)
e.accumulate(3000, 3000)
if e.Jitter() != 23 {
t.Errorf("Expected 23, got %v", e.Jitter())
}
e = New(48000)
e.accumulate(0, 0)
e.accumulate(1000, 1000)
e.accumulate(2000, 1800)
e.accumulate(3000, 3000)
if e.Jitter() != 23 {
t.Errorf("Expected 23, got %v", e.Jitter())
}
}