1
Fork 0

Signal new tracks explicitly.

This commit is contained in:
Juliusz Chroboczek 2020-05-20 22:28:30 +02:00
parent 5916028edd
commit 5a2dbf36b9
2 changed files with 50 additions and 31 deletions

View File

@ -353,6 +353,8 @@ func addUpConn(c *client, id string) (*upConnection, error) {
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate), jitter: jitter.New(remote.Codec().ClockRate),
maxBitrate: ^uint64(0), maxBitrate: ^uint64(0),
localCh: make(chan struct{}, 2),
writerDone: make(chan struct{}),
} }
u.tracks = append(u.tracks, track) u.tracks = append(u.tracks, track)
var tracks []*upTrack var tracks []*upTrack
@ -432,41 +434,40 @@ func readLoop(conn *upConnection, track *upTrack) {
} }
func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) { func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) {
var localTime uint64 defer close(track.writerDone)
var local []*downTrack
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
local := track.getLocal()
for { for {
now := mono.Microseconds() select {
if now < localTime || now > localTime+500000 { case <-track.localCh:
local = track.getLocal() local = track.getLocal()
localTime = now case pi, ok := <-ch:
} if !ok {
return
pi, ok := <-ch }
if !ok {
return bytes := track.cache.GetAt(pi.seqno, pi.index, buf)
} if bytes == 0 {
continue
bytes := track.cache.GetAt(pi.seqno, pi.index, buf) }
if bytes == 0 {
continue err := packet.Unmarshal(buf[:bytes])
} if err != nil {
log.Printf("%v", err)
err := packet.Unmarshal(buf[:bytes]) continue
if err != nil { }
log.Printf("%v", err)
continue for _, l := range local {
} err := l.track.WriteRTP(&packet)
if err != nil && err != io.ErrClosedPipe {
for _, l := range local { log.Printf("%v", err)
err := l.track.WriteRTP(&packet) }
if err != nil && err != io.ErrClosedPipe { l.rate.Add(uint32(bytes))
log.Printf("%v", err)
} }
l.rate.Add(uint32(bytes))
} }
} }
} }
@ -637,7 +638,11 @@ func addDownConn(c *client, id string, remote *upConnection) (*downConnection, e
if c.down == nil { if c.down == nil {
c.down = make(map[string]*downConnection) c.down = make(map[string]*downConnection)
} }
conn := &downConnection{id: id, pc: pc, remote: remote} conn := &downConnection{
id: id,
pc: pc,
remote: remote,
}
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View File

@ -35,25 +35,39 @@ type upTrack struct {
lastSenderReport uint32 lastSenderReport uint32
lastSenderReportTime uint32 lastSenderReportTime uint32
localCh chan struct{} // signals that local has changed
writerDone chan struct{} // closed when the loop dies
mu sync.Mutex mu sync.Mutex
local []*downTrack local []*downTrack
} }
func (up *upTrack) notifyLocal() {
var s struct{}
select {
case up.localCh <- s:
case <-up.writerDone:
}
}
func (up *upTrack) addLocal(local *downTrack) { func (up *upTrack) addLocal(local *downTrack) {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock()
up.local = append(up.local, local) up.local = append(up.local, local)
up.mu.Unlock()
up.notifyLocal()
} }
func (up *upTrack) delLocal(local *downTrack) bool { func (up *upTrack) delLocal(local *downTrack) bool {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock()
for i, l := range up.local { for i, l := range up.local {
if l == local { if l == local {
up.local = append(up.local[:i], up.local[i+1:]...) up.local = append(up.local[:i], up.local[i+1:]...)
up.mu.Unlock()
up.notifyLocal()
return true return true
} }
} }
up.mu.Unlock()
return false return false
} }