From 6ae79f21d6401662eaa96de37a853d567f88d043 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Fri, 16 Jul 2021 02:57:48 +0200 Subject: [PATCH] Use unbounded buffer for track actions. Without that, we might deadlock if the reader is blocked in read. --- rtpconn/rtpconn.go | 30 +++++++++++++++++++----------- rtpconn/rtpreader.go | 12 ++++++------ rtpconn/webclient.go | 3 ++- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 7b7ee12..87c3394 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -359,7 +359,7 @@ type rtpUpTrack struct { jitter *jitter.Estimator cname atomic.Value - localCh chan trackAction + actionCh chan struct{} readerDone chan struct{} mu sync.Mutex @@ -369,6 +369,7 @@ type rtpUpTrack struct { maxLayer uint8 local []conn.DownTrack bufferedNACKs []uint16 + actions []trackAction } const ( @@ -383,22 +384,30 @@ type trackAction struct { } func (up *rtpUpTrack) action(action int, track conn.DownTrack) { - select { - case up.localCh <- trackAction{action, track}: - case <-up.readerDone: + up.mu.Lock() + empty := len(up.actions) == 0 + up.actions = append(up.actions, trackAction{action, track}) + up.mu.Unlock() + + if empty { + select { + case up.actionCh <- struct{}{}: + default: + } } } func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error { up.mu.Lock() - defer up.mu.Unlock() - for _, t := range up.local { if t == local { + up.mu.Unlock() return nil } } up.local = append(up.local, local) + up.mu.Unlock() + up.action(trackActionAdd, local) return nil } @@ -410,16 +419,15 @@ func (up *rtpUpTrack) RequestKeyframe() error { func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool { up.mu.Lock() - defer up.mu.Unlock() for i, l := range up.local { if l == local { up.local = append(up.local[:i], up.local[i+1:]...) - // do this asynchronously, to avoid deadlocking when - // multiple clients call this simultaneously. - go up.action(trackActionDel, l) + up.mu.Unlock() + up.action(trackActionDel, l) return true } } + up.mu.Unlock() return false } @@ -625,7 +633,7 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon cache: packetcache.New(minPacketCache(remote)), rate: estimator.New(time.Second), jitter: jitter.New(remote.Codec().ClockRate), - localCh: make(chan trackAction, 2), + actionCh: make(chan struct{}, 1), readerDone: make(chan struct{}), } diff --git a/rtpconn/rtpreader.go b/rtpconn/rtpreader.go index c4a6886..815f543 100644 --- a/rtpconn/rtpreader.go +++ b/rtpconn/rtpreader.go @@ -29,10 +29,12 @@ func readLoop(track *rtpUpTrack) { var packet rtp.Packet for { - inner: - for { - select { - case action := <-track.localCh: + select { + case <-track.actionCh: + track.mu.Lock() + actions := track.actions + track.mu.Unlock() + for _, action := range actions { switch action.action { case trackActionAdd, trackActionDel: err := writers.add( @@ -50,8 +52,6 @@ func readLoop(track *rtpUpTrack) { default: log.Printf("Unknown action") } - default: - break inner } } diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index 622f024..4fe284f 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -1710,9 +1710,10 @@ var ErrClientDead = errors.New("client is dead") func (c *webClient) action(a interface{}) error { c.mu.Lock() - defer c.mu.Unlock() empty := len(c.actions) == 0 c.actions = append(c.actions, a) + c.mu.Unlock() + if empty { select { case c.actionCh <- struct{}{}: