1
Fork 0

Use unbounded buffer for track actions.

Without that, we might deadlock if the reader is blocked in read.
This commit is contained in:
Juliusz Chroboczek 2021-07-16 02:57:48 +02:00
parent 0d2ca28ae1
commit 6ae79f21d6
3 changed files with 27 additions and 18 deletions

View File

@ -359,7 +359,7 @@ type rtpUpTrack struct {
jitter *jitter.Estimator jitter *jitter.Estimator
cname atomic.Value cname atomic.Value
localCh chan trackAction actionCh chan struct{}
readerDone chan struct{} readerDone chan struct{}
mu sync.Mutex mu sync.Mutex
@ -369,6 +369,7 @@ type rtpUpTrack struct {
maxLayer uint8 maxLayer uint8
local []conn.DownTrack local []conn.DownTrack
bufferedNACKs []uint16 bufferedNACKs []uint16
actions []trackAction
} }
const ( const (
@ -383,22 +384,30 @@ type trackAction struct {
} }
func (up *rtpUpTrack) action(action int, track conn.DownTrack) { func (up *rtpUpTrack) action(action int, track conn.DownTrack) {
up.mu.Lock()
empty := len(up.actions) == 0
up.actions = append(up.actions, trackAction{action, track})
up.mu.Unlock()
if empty {
select { select {
case up.localCh <- trackAction{action, track}: case up.actionCh <- struct{}{}:
case <-up.readerDone: default:
}
} }
} }
func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error { func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock()
for _, t := range up.local { for _, t := range up.local {
if t == local { if t == local {
up.mu.Unlock()
return nil return nil
} }
} }
up.local = append(up.local, local) up.local = append(up.local, local)
up.mu.Unlock()
up.action(trackActionAdd, local) up.action(trackActionAdd, local)
return nil return nil
} }
@ -410,16 +419,15 @@ func (up *rtpUpTrack) RequestKeyframe() error {
func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool { func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock()
for i, l := range up.local { for i, l := range up.local {
if l == local { if l == local {
up.local = append(up.local[:i], up.local[i+1:]...) up.local = append(up.local[:i], up.local[i+1:]...)
// do this asynchronously, to avoid deadlocking when up.mu.Unlock()
// multiple clients call this simultaneously. up.action(trackActionDel, l)
go up.action(trackActionDel, l)
return true return true
} }
} }
up.mu.Unlock()
return false return false
} }
@ -625,7 +633,7 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon
cache: packetcache.New(minPacketCache(remote)), cache: packetcache.New(minPacketCache(remote)),
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate), jitter: jitter.New(remote.Codec().ClockRate),
localCh: make(chan trackAction, 2), actionCh: make(chan struct{}, 1),
readerDone: make(chan struct{}), readerDone: make(chan struct{}),
} }

View File

@ -29,10 +29,12 @@ func readLoop(track *rtpUpTrack) {
var packet rtp.Packet var packet rtp.Packet
for { for {
inner:
for {
select { select {
case action := <-track.localCh: case <-track.actionCh:
track.mu.Lock()
actions := track.actions
track.mu.Unlock()
for _, action := range actions {
switch action.action { switch action.action {
case trackActionAdd, trackActionDel: case trackActionAdd, trackActionDel:
err := writers.add( err := writers.add(
@ -50,8 +52,6 @@ func readLoop(track *rtpUpTrack) {
default: default:
log.Printf("Unknown action") log.Printf("Unknown action")
} }
default:
break inner
} }
} }

View File

@ -1710,9 +1710,10 @@ var ErrClientDead = errors.New("client is dead")
func (c *webClient) action(a interface{}) error { func (c *webClient) action(a interface{}) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
empty := len(c.actions) == 0 empty := len(c.actions) == 0
c.actions = append(c.actions, a) c.actions = append(c.actions, a)
c.mu.Unlock()
if empty { if empty {
select { select {
case c.actionCh <- struct{}{}: case c.actionCh <- struct{}{}: