From 5916028edd91baabafbd888bcd395a125413e0b4 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Wed, 20 May 2020 21:45:48 +0200 Subject: [PATCH] Split the main up loop into two threads. The reader and the writer now communicate through a channel and the packet cache. If the writer thread drops behind, we drop packets after inserting in the packet cache, which avoids building a backlog. --- client.go | 63 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 9d86b74..efa0659 100644 --- a/client.go +++ b/client.go @@ -372,7 +372,7 @@ func addUpConn(c *client, id string) (*upConnection, error) { } } - go upLoop(conn, track) + go readLoop(conn, track) go rtcpUpListener(conn, track, receiver) }) @@ -380,18 +380,19 @@ func addUpConn(c *client, id string) (*upConnection, error) { return conn, nil } -func upLoop(conn *upConnection, track *upTrack) { +type packetIndex struct { + seqno uint16 + index uint16 +} + +func readLoop(conn *upConnection, track *upTrack) { + ch := make(chan packetIndex, 32) + defer close(ch) + go writeLoop(conn, track, ch) + buf := make([]byte, packetcache.BufSize) var packet rtp.Packet - var local []*downTrack - var localTime uint64 for { - now := mono.Microseconds() - if now < localTime || now > localTime+500000 { - local = track.getLocal() - localTime = now - } - bytes, err := track.track.Read(buf) if err != nil { if err != io.EOF { @@ -409,7 +410,8 @@ func upLoop(conn *upConnection, track *upTrack) { track.jitter.Accumulate(packet.Timestamp) - first, _ := track.cache.Store(packet.SequenceNumber, buf[:bytes]) + first, index := + track.cache.Store(packet.SequenceNumber, buf[:bytes]) if packet.SequenceNumber-first > 24 { found, first, bitmap := track.cache.BitmapGet() if found { @@ -420,6 +422,45 @@ func upLoop(conn *upConnection, track *upTrack) { } } + select { + case ch <- packetIndex{packet.SequenceNumber, index}: + default: + // The writer is congested. Drop the packet, and + // leave it to NACK recovery if possible. + } + } +} + +func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) { + var localTime uint64 + var local []*downTrack + + buf := make([]byte, packetcache.BufSize) + var packet rtp.Packet + + for { + now := mono.Microseconds() + if now < localTime || now > localTime+500000 { + local = track.getLocal() + localTime = now + } + + pi, ok := <-ch + if !ok { + return + } + + bytes := track.cache.GetAt(pi.seqno, pi.index, buf) + if bytes == 0 { + continue + } + + err := packet.Unmarshal(buf[:bytes]) + if err != nil { + log.Printf("%v", err) + continue + } + for _, l := range local { err := l.track.WriteRTP(&packet) if err != nil && err != io.ErrClosedPipe {