diff --git a/conn.go b/conn.go index fa8dc23..13e4e53 100644 --- a/conn.go +++ b/conn.go @@ -41,4 +41,5 @@ type downTrack interface { WriteRTP(packat *rtp.Packet) error Accumulate(bytes uint32) GetMaxBitrate(now uint64) uint64 + setTimeOffset(ntp uint64, rtp uint32) } diff --git a/disk.go b/disk.go index 0a099cf..7e3e6cf 100644 --- a/disk.go +++ b/disk.go @@ -204,6 +204,9 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac return &conn, nil } +func (t *diskTrack) setTimeOffset(ntp uint64, rtp uint32) { +} + func clonePacket(packet *rtp.Packet) *rtp.Packet { buf, err := packet.Marshal() if err != nil { diff --git a/rtpconn.go b/rtpconn.go index d008ada..f32fb06 100644 --- a/rtpconn.go +++ b/rtpconn.go @@ -78,6 +78,8 @@ type rtpDownTrack struct { stats *receiverStats srTime uint64 srNTPTime uint64 + remoteNTPTime uint64 + remoteRTPTime uint32 rtt uint64 } @@ -98,6 +100,11 @@ func (down *rtpDownTrack) GetMaxBitrate(now uint64) uint64 { return br2 } +func (down *rtpDownTrack) setTimeOffset(ntp uint64, rtp uint32) { + atomic.StoreUint64(&down.remoteNTPTime, ntp) + atomic.StoreUint32(&down.remoteRTPTime, rtp) +} + type rtpDownConnection struct { id string pc *webrtc.PeerConnection @@ -554,11 +561,19 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex) if action.add { local = append(local, action.track) firSent = false + track.mu.Lock() + ntp := track.srNTPTime + rtp := track.srRTPTime + track.mu.Unlock() + if ntp != 0 { + action.track.setTimeOffset(ntp, rtp) + } } else { found := false for i, t := range local { if t == action.track { - local = append(local[:i], local[i+1:]...) + local = append(local[:i], + local[i+1:]...) found = true break } @@ -754,6 +769,10 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei track.srNTPTime = p.NTPTime track.srRTPTime = p.RTPTime track.mu.Unlock() + local := track.getLocal() + for _, l := range local { + l.setTimeOffset(p.NTPTime, p.RTPTime) + } case *rtcp.SourceDescription: } } @@ -848,37 +867,22 @@ func sendSR(conn *rtpDownConnection) error { for _, t := range conn.tracks { clockrate := t.track.Codec().ClockRate - remote := t.remote var nowRTP uint32 - switch r := remote.(type) { - case *rtpUpTrack: - r.mu.Lock() - lastTime := r.srTime - srNTPTime := r.srNTPTime - srRTPTime := r.srRTPTime - r.mu.Unlock() - if lastTime == 0 { - // we never got a remote SR, skip this track - continue - } - if srNTPTime != 0 { - srTime := rtptime.NTPToTime(srNTPTime) - d := now.Sub(srTime) - if d > 0 && d < time.Hour { - delay := rtptime.FromDuration( - d, clockrate, - ) - nowRTP = srRTPTime + uint32(delay) - } - } - default: - ts, ok := remote.getTimestamp() - if !ok { - continue - } - nowRTP = ts + remoteNTP := atomic.LoadUint64(&t.remoteNTPTime) + remoteRTP := atomic.LoadUint32(&t.remoteRTPTime) + if remoteNTP == 0 { + // we never got a remote SR for this track + continue + } + srTime := rtptime.NTPToTime(remoteNTP) + d := now.Sub(srTime) + if d > 0 && d < time.Hour { + delay := rtptime.FromDuration( + d, clockrate, + ) + nowRTP = remoteRTP + uint32(delay) } p, b := t.rate.Totals()