From 3d2089f40fe38b7ed8412701f5587f95a1d3103e Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Wed, 14 Jul 2021 14:20:52 +0200 Subject: [PATCH] Merge GetRTP and Nack into GetPacket. The two function were always called together. This factors out the NACKing logic into the track. --- conn/conn.go | 6 +++--- diskwriter/diskwriter.go | 21 ++++++++------------- rtpconn/rtpconn.go | 33 ++++++++++++--------------------- 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/conn/conn.go b/conn/conn.go index d18ef66..ecce3e6 100644 --- a/conn/conn.go +++ b/conn/conn.go @@ -25,9 +25,9 @@ type UpTrack interface { Kind() webrtc.RTPCodecType Label() string Codec() webrtc.RTPCodecCapability - // get a recent packet. Returns 0 if the packet is not in cache. - GetRTP(seqno uint16, result []byte) uint16 - Nack(seqnos []uint16) error + // GetPacket fetches a recent packet. Returns 0 if the packet is + // not in cache, and, in that case, optionally schedules a NACK. + GetPacket(seqno uint16, result []byte, nack bool) uint16 RequestKeyframe() error } diff --git a/diskwriter/diskwriter.go b/diskwriter/diskwriter.go index 8220051..5928f64 100644 --- a/diskwriter/diskwriter.go +++ b/diskwriter/diskwriter.go @@ -494,23 +494,18 @@ func (t *diskTrack) Write(buf []byte) (int, error) { if ((p.SequenceNumber - lastSeqno) & 0x8000) == 0 { count := p.SequenceNumber - lastSeqno if count > 0 && count < 128 { - var nacks []uint16 for i := lastSeqno + 1; i != p.SequenceNumber; i++ { // different buf each time buf := make([]byte, 1504) - n := t.remote.GetRTP(i, buf) - if n > 0 { - p := new(rtp.Packet) - err := p.Unmarshal(buf) - if err == nil { - t.writeRTP(p) - } - } else { - nacks = append(nacks, i) + n := t.remote.GetPacket(i, buf, true) + if n == 0 { + continue + } + p := new(rtp.Packet) + err := p.Unmarshal(buf) + if err == nil { + t.writeRTP(p) } - } - if len(nacks) > 0 { - t.remote.Nack(nacks) } } } diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 06b9e96..734c031 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -430,10 +430,6 @@ func (up *rtpUpTrack) getLocal() []conn.DownTrack { return local } -func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 { - return up.cache.Get(seqno, result) -} - func (up *rtpUpTrack) Label() string { return up.track.RID() } @@ -712,7 +708,6 @@ func sendNACKs(pc *webrtc.PeerConnection, ssrc webrtc.SSRC, nacks []rtcp.NackPai } func gotNACK(track *rtpDownTrack, p *rtcp.TransportLayerNack) { - var unhandled []uint16 buf := make([]byte, packetcache.BufSize) for _, nack := range p.Nacks { nack.Range(func(s uint16) bool { @@ -720,9 +715,8 @@ func gotNACK(track *rtpDownTrack, p *rtcp.TransportLayerNack) { if !ok { return true } - l := track.remote.GetRTP(seqno, buf) + l := track.remote.GetPacket(seqno, buf, true) if l == 0 { - unhandled = append(unhandled, seqno) return true } _, err := track.Write(buf[:l]) @@ -733,33 +727,30 @@ func gotNACK(track *rtpDownTrack, p *rtcp.TransportLayerNack) { return true }) } - if len(unhandled) == 0 { - return - } - - track.remote.Nack(unhandled) } -func (track *rtpUpTrack) Nack(nacks []uint16) error { +func (track *rtpUpTrack) GetPacket(seqno uint16, result []byte, nack bool) uint16 { + n := track.cache.Get(seqno, result) + if n > 0 || !nack { + return n + } + track.mu.Lock() defer track.mu.Unlock() doit := len(track.bufferedNACKs) == 0 -outer: - for _, nack := range nacks { - for _, seqno := range track.bufferedNACKs { - if seqno == nack { - continue outer - } + for _, s := range track.bufferedNACKs { + if s == seqno { + return 0 } - track.bufferedNACKs = append(track.bufferedNACKs, nack) } + track.bufferedNACKs = append(track.bufferedNACKs, seqno) if doit { go nackWriter(track) } - return nil + return 0 } func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) {