From 28b7c6d54d9aee361e15335e08c71fd3bfc0e082 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Sat, 3 Oct 2020 12:56:16 +0200 Subject: [PATCH] Move RTP writer to separate file. --- rtpconn/rtpconn.go | 60 ------------------------------------ rtpconn/rtpreader.go | 72 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 60 deletions(-) create mode 100644 rtpconn/rtpreader.go diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 2cfaf63..f9c9784 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -455,66 +455,6 @@ func newUpConn(c group.Client, id string) (*rtpUpConnection, error) { return up, nil } -func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { - writers := rtpWriterPool{conn: conn, track: track} - defer func() { - writers.close() - close(track.readerDone) - }() - - isvideo := track.track.Kind() == webrtc.RTPCodecTypeVideo - buf := make([]byte, packetcache.BufSize) - var packet rtp.Packet - for { - bytes, err := track.track.Read(buf) - if err != nil { - if err != io.EOF { - log.Printf("%v", err) - } - break - } - track.rate.Accumulate(uint32(bytes)) - - err = packet.Unmarshal(buf[:bytes]) - if err != nil { - log.Printf("%v", err) - continue - } - - track.jitter.Accumulate(packet.Timestamp) - - first, index := - track.cache.Store(packet.SequenceNumber, buf[:bytes]) - if packet.SequenceNumber-first > 24 { - found, first, bitmap := track.cache.BitmapGet() - if found { - err := conn.sendNACK(track, first, bitmap) - if err != nil { - log.Printf("%v", err) - } - } - } - - _, rate := track.rate.Estimate() - delay := uint32(rtptime.JiffiesPerSec / 1024) - if rate > 512 { - delay = rtptime.JiffiesPerSec / rate / 2 - } - - writers.write(packet.SequenceNumber, index, delay, - isvideo, packet.Marker) - - select { - case action := <-track.localCh: - err := writers.add(action.track, action.add) - if err != nil { - log.Printf("add/remove track: %v", err) - } - default: - } - } -} - var ErrUnsupportedFeedback = errors.New("unsupported feedback type") var ErrRateLimited = errors.New("rate limited") diff --git a/rtpconn/rtpreader.go b/rtpconn/rtpreader.go new file mode 100644 index 0000000..bdb56f9 --- /dev/null +++ b/rtpconn/rtpreader.go @@ -0,0 +1,72 @@ +package rtpconn + +import ( + "io" + "log" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" + + "sfu/packetcache" + "sfu/rtptime" +) + +func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { + writers := rtpWriterPool{conn: conn, track: track} + defer func() { + writers.close() + close(track.readerDone) + }() + + isvideo := track.track.Kind() == webrtc.RTPCodecTypeVideo + buf := make([]byte, packetcache.BufSize) + var packet rtp.Packet + for { + bytes, err := track.track.Read(buf) + if err != nil { + if err != io.EOF { + log.Printf("%v", err) + } + break + } + track.rate.Accumulate(uint32(bytes)) + + err = packet.Unmarshal(buf[:bytes]) + if err != nil { + log.Printf("%v", err) + continue + } + + track.jitter.Accumulate(packet.Timestamp) + + first, index := + track.cache.Store(packet.SequenceNumber, buf[:bytes]) + if packet.SequenceNumber-first > 24 { + found, first, bitmap := track.cache.BitmapGet() + if found { + err := conn.sendNACK(track, first, bitmap) + if err != nil { + log.Printf("%v", err) + } + } + } + + _, rate := track.rate.Estimate() + delay := uint32(rtptime.JiffiesPerSec / 1024) + if rate > 512 { + delay = rtptime.JiffiesPerSec / rate / 2 + } + + writers.write(packet.SequenceNumber, index, delay, + isvideo, packet.Marker) + + select { + case action := <-track.localCh: + err := writers.add(action.track, action.add) + if err != nil { + log.Printf("add/remove track: %v", err) + } + default: + } + } +}