1
Fork 0

Maintain time offsets on the sender side.

This commit is contained in:
Juliusz Chroboczek 2020-06-08 23:54:10 +02:00
parent 917fa33d38
commit f9edde6526
3 changed files with 37 additions and 29 deletions

View File

@ -41,4 +41,5 @@ type downTrack interface {
WriteRTP(packat *rtp.Packet) error
Accumulate(bytes uint32)
GetMaxBitrate(now uint64) uint64
setTimeOffset(ntp uint64, rtp uint32)
}

View File

@ -204,6 +204,9 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac
return &conn, nil
}
func (t *diskTrack) setTimeOffset(ntp uint64, rtp uint32) {
}
func clonePacket(packet *rtp.Packet) *rtp.Packet {
buf, err := packet.Marshal()
if err != nil {

View File

@ -78,6 +78,8 @@ type rtpDownTrack struct {
stats *receiverStats
srTime uint64
srNTPTime uint64
remoteNTPTime uint64
remoteRTPTime uint32
rtt uint64
}
@ -98,6 +100,11 @@ func (down *rtpDownTrack) GetMaxBitrate(now uint64) uint64 {
return br2
}
func (down *rtpDownTrack) setTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint64(&down.remoteNTPTime, ntp)
atomic.StoreUint32(&down.remoteRTPTime, rtp)
}
type rtpDownConnection struct {
id string
pc *webrtc.PeerConnection
@ -554,11 +561,19 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex)
if action.add {
local = append(local, action.track)
firSent = false
track.mu.Lock()
ntp := track.srNTPTime
rtp := track.srRTPTime
track.mu.Unlock()
if ntp != 0 {
action.track.setTimeOffset(ntp, rtp)
}
} else {
found := false
for i, t := range local {
if t == action.track {
local = append(local[:i], local[i+1:]...)
local = append(local[:i],
local[i+1:]...)
found = true
break
}
@ -754,6 +769,10 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
track.srNTPTime = p.NTPTime
track.srRTPTime = p.RTPTime
track.mu.Unlock()
local := track.getLocal()
for _, l := range local {
l.setTimeOffset(p.NTPTime, p.RTPTime)
}
case *rtcp.SourceDescription:
}
}
@ -848,37 +867,22 @@ func sendSR(conn *rtpDownConnection) error {
for _, t := range conn.tracks {
clockrate := t.track.Codec().ClockRate
remote := t.remote
var nowRTP uint32
switch r := remote.(type) {
case *rtpUpTrack:
r.mu.Lock()
lastTime := r.srTime
srNTPTime := r.srNTPTime
srRTPTime := r.srRTPTime
r.mu.Unlock()
if lastTime == 0 {
// we never got a remote SR, skip this track
continue
}
if srNTPTime != 0 {
srTime := rtptime.NTPToTime(srNTPTime)
d := now.Sub(srTime)
if d > 0 && d < time.Hour {
delay := rtptime.FromDuration(
d, clockrate,
)
nowRTP = srRTPTime + uint32(delay)
}
}
default:
ts, ok := remote.getTimestamp()
if !ok {
continue
}
nowRTP = ts
remoteNTP := atomic.LoadUint64(&t.remoteNTPTime)
remoteRTP := atomic.LoadUint32(&t.remoteRTPTime)
if remoteNTP == 0 {
// we never got a remote SR for this track
continue
}
srTime := rtptime.NTPToTime(remoteNTP)
d := now.Sub(srTime)
if d > 0 && d < time.Hour {
delay := rtptime.FromDuration(
d, clockrate,
)
nowRTP = remoteRTP + uint32(delay)
}
p, b := t.rate.Totals()