From c00a21995e7459a3e2e255b921aeb5c2983cd13f Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Mon, 17 May 2021 03:54:20 +0200 Subject: [PATCH] Move PLI rate-limiting into the reader loop. --- rtpconn/rtpconn.go | 12 --------- rtpconn/rtpreader.go | 59 ++++++++++++++++++++++++++++---------------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 23498ea..f6551a7 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -319,16 +319,11 @@ func (down *rtpDownConnection) flushICECandidates() error { return err } -type upTrackAtomics struct { - lastPLI uint64 -} - type rtpUpTrack struct { track *webrtc.TrackRemote rate *estimator.Estimator cache *packetcache.Cache jitter *jitter.Estimator - atomics *upTrackAtomics cname atomic.Value localCh chan trackAction @@ -597,7 +592,6 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon cache: packetcache.New(minPacketCache(remote)), rate: estimator.New(time.Second), jitter: jitter.New(remote.Codec().ClockRate), - atomics: &upTrackAtomics{}, localCh: make(chan trackAction, 2), readerDone: make(chan struct{}), } @@ -626,12 +620,6 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error { if !track.hasRtcpFb("nack", "pli") { return ErrUnsupportedFeedback } - last := atomic.LoadUint64(&track.atomics.lastPLI) - now := rtptime.Jiffies() - if now >= last && now-last < rtptime.JiffiesPerSec/2 { - return ErrRateLimited - } - atomic.StoreUint64(&track.atomics.lastPLI, now) return sendPLI(up.pc, track.track.SSRC()) } diff --git a/rtpconn/rtpreader.go b/rtpconn/rtpreader.go index 43ef4e4..ad0ef81 100644 --- a/rtpconn/rtpreader.go +++ b/rtpconn/rtpreader.go @@ -3,6 +3,7 @@ package rtpconn import ( "io" "log" + "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" @@ -21,10 +22,39 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { isvideo := track.track.Kind() == webrtc.RTPCodecTypeVideo codec := track.track.Codec() sendNACK := track.hasRtcpFb("nack", "") + sendPLI := track.hasRtcpFb("nack", "pli") var kfNeeded bool + var kfRequested time.Time buf := make([]byte, packetcache.BufSize) var packet rtp.Packet for { + + inner: + for { + select { + case action := <-track.localCh: + switch action.action { + case trackActionAdd, trackActionDel: + err := writers.add( + action.track, + action.action == trackActionAdd, + ) + if err != nil { + log.Printf( + "add/remove track: %v", + err, + ) + } + case trackActionKeyframe: + kfNeeded = true + default: + log.Printf("Unknown action") + } + default: + break inner + } + } + bytes, _, err := track.track.Read(buf) if err != nil { if err != io.EOF { @@ -103,31 +133,18 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { writers.write(packet.SequenceNumber, index, delay, isvideo, packet.Marker) - select { - case action := <-track.localCh: - switch action.action { - case trackActionAdd, trackActionDel: - err := writers.add( - action.track, - action.action == trackActionAdd, - ) + now := time.Now() + if kfNeeded && now.Sub(kfRequested) > time.Second/2 { + if sendPLI { + err := conn.sendPLI(track) if err != nil { - log.Printf("add/remove track: %v", err) + log.Printf("sendPLI: %v", err) + kfNeeded = false } - case trackActionKeyframe: - kfNeeded = true - default: - log.Printf("Unknown action %v", action.action) - } - default: - } - - if kfNeeded { - err := conn.sendPLI(track) - if err != nil && err != ErrRateLimited { - log.Printf("sendPLI: %v", err) + } else { kfNeeded = false } + kfRequested = now } } }