From 5a2dbf36b9e3f055a0d3f87da5cd7d5723fe4ac7 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Wed, 20 May 2020 22:28:30 +0200 Subject: [PATCH] Signal new tracks explicitly. --- client.go | 63 ++++++++++++++++++++++++++++++------------------------- group.go | 18 ++++++++++++++-- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/client.go b/client.go index efa0659..5f3c6ce 100644 --- a/client.go +++ b/client.go @@ -353,6 +353,8 @@ func addUpConn(c *client, id string) (*upConnection, error) { rate: estimator.New(time.Second), jitter: jitter.New(remote.Codec().ClockRate), maxBitrate: ^uint64(0), + localCh: make(chan struct{}, 2), + writerDone: make(chan struct{}), } u.tracks = append(u.tracks, track) var tracks []*upTrack @@ -432,41 +434,40 @@ func readLoop(conn *upConnection, track *upTrack) { } func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) { - var localTime uint64 - var local []*downTrack + defer close(track.writerDone) buf := make([]byte, packetcache.BufSize) var packet rtp.Packet + local := track.getLocal() + for { - now := mono.Microseconds() - if now < localTime || now > localTime+500000 { + select { + case <-track.localCh: local = track.getLocal() - localTime = now - } - - pi, ok := <-ch - if !ok { - return - } - - bytes := track.cache.GetAt(pi.seqno, pi.index, buf) - if bytes == 0 { - continue - } - - err := packet.Unmarshal(buf[:bytes]) - if err != nil { - log.Printf("%v", err) - continue - } - - for _, l := range local { - err := l.track.WriteRTP(&packet) - if err != nil && err != io.ErrClosedPipe { - log.Printf("%v", err) + case pi, ok := <-ch: + if !ok { + return + } + + bytes := track.cache.GetAt(pi.seqno, pi.index, buf) + if bytes == 0 { + continue + } + + err := packet.Unmarshal(buf[:bytes]) + if err != nil { + log.Printf("%v", err) + continue + } + + for _, l := range local { + err := l.track.WriteRTP(&packet) + if err != nil && err != io.ErrClosedPipe { + log.Printf("%v", err) + } + l.rate.Add(uint32(bytes)) } - l.rate.Add(uint32(bytes)) } } } @@ -637,7 +638,11 @@ func addDownConn(c *client, id string, remote *upConnection) (*downConnection, e if c.down == nil { c.down = make(map[string]*downConnection) } - conn := &downConnection{id: id, pc: pc, remote: remote} + conn := &downConnection{ + id: id, + pc: pc, + remote: remote, + } c.mu.Lock() defer c.mu.Unlock() diff --git a/group.go b/group.go index 41ded51..421f411 100644 --- a/group.go +++ b/group.go @@ -35,25 +35,39 @@ type upTrack struct { lastSenderReport uint32 lastSenderReportTime uint32 + localCh chan struct{} // signals that local has changed + writerDone chan struct{} // closed when the loop dies + mu sync.Mutex local []*downTrack } +func (up *upTrack) notifyLocal() { + var s struct{} + select { + case up.localCh <- s: + case <-up.writerDone: + } +} + func (up *upTrack) addLocal(local *downTrack) { up.mu.Lock() - defer up.mu.Unlock() up.local = append(up.local, local) + up.mu.Unlock() + up.notifyLocal() } func (up *upTrack) delLocal(local *downTrack) bool { up.mu.Lock() - defer up.mu.Unlock() for i, l := range up.local { if l == local { up.local = append(up.local[:i], up.local[i+1:]...) + up.mu.Unlock() + up.notifyLocal() return true } } + up.mu.Unlock() return false }