diff --git a/rtpconn.go b/rtpconn.go index 2802ae0..8c74f22 100644 --- a/rtpconn.go +++ b/rtpconn.go @@ -480,6 +480,7 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) { type packetIndex struct { seqno uint16 index uint16 + delay uint32 } func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { @@ -531,15 +532,34 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { continue } + _, rate := track.rate.Estimate() + delay := uint32(rtptime.JiffiesPerSec / 1024) + if rate > 512 { + delay = rtptime.JiffiesPerSec / rate / 2 + } + + pi := packetIndex{packet.SequenceNumber, index, delay} select { - case ch <- packetIndex{packet.SequenceNumber, index}: + case ch <- pi: default: + // the writer is congested if isvideo { - // the writer is congested. Drop until - // the end of the frame. + // keep dropping until the end of the frame if isvideo && !packet.Marker { drop = 7 } + } else { + // try again with half the delay on our side + timer := time.NewTimer(rtptime.ToDuration( + uint64(delay/2), + rtptime.JiffiesPerSec, + )) + pi.delay = delay / 2 + select { + case ch <- pi: + timer.Stop() + case <-timer.C: + } } } } @@ -598,8 +618,15 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex) continue } - kfNeeded := false + var delay time.Duration + if len(local) > 0 { + delay = rtptime.ToDuration( + uint64(pi.delay / uint32(len(local))), + rtptime.JiffiesPerSec, + ) + } + kfNeeded := false for _, l := range local { err := l.WriteRTP(&packet) if err != nil { @@ -611,6 +638,9 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex) continue } l.Accumulate(uint32(bytes)) + if delay > 0 { + time.Sleep(delay) + } } if kfNeeded {