diff --git a/conn/conn.go b/conn/conn.go index cab6fa2..5e74297 100644 --- a/conn/conn.go +++ b/conn/conn.go @@ -40,5 +40,5 @@ type DownTrack interface { Write(buf []byte) (int, error) SetTimeOffset(ntp uint64, rtp uint32) SetCname(string) - GetMaxBitrate() uint64 + GetMaxBitrate() (uint64, int) } diff --git a/diskwriter/diskwriter.go b/diskwriter/diskwriter.go index 6309d5d..10f8c6e 100644 --- a/diskwriter/diskwriter.go +++ b/diskwriter/diskwriter.go @@ -618,6 +618,6 @@ func (conn *diskConn) initWriter(width, height uint32) error { return nil } -func (t *diskTrack) GetMaxBitrate() uint64 { - return ^uint64(0) +func (t *diskTrack) GetMaxBitrate() (uint64, int) { + return ^uint64(0), -1 } diff --git a/go.mod b/go.mod index 4c50ac2..b627344 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/pion/ice/v2 v2.1.7 github.com/pion/rtcp v1.2.6 - github.com/pion/rtp v1.6.2 + github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6 github.com/pion/sdp/v3 v3.0.4 github.com/pion/turn/v2 v2.0.5 github.com/pion/webrtc/v3 v3.0.27 diff --git a/go.sum b/go.sum index 52eca9e..d545e0d 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo= github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= -github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6 h1:xAaGxAEYiL96TRp4DhhrrH7JRLBuLM+nGqhOXnWzTBs= +github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.12 h1:GsatLufywVruXbZZT1CKg+Jr8ZTkwiPnmUC/oO9+uuY= github.com/pion/sctp v1.7.12/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= diff --git a/group/group.go b/group/group.go index 6ac5106..fb9d15f 100644 --- a/group/group.go +++ b/group/group.go @@ -61,8 +61,8 @@ type ChatHistoryEntry struct { } const ( - LowBitrate = 100000 - MinBitrate = 2 * LowBitrate + LowBitrate = 100 * 1024 + MinBitrate = LowBitrate * 2 ) type Group struct { diff --git a/packetmap/packetmap.go b/packetmap/packetmap.go new file mode 100644 index 0000000..6212385 --- /dev/null +++ b/packetmap/packetmap.go @@ -0,0 +1,204 @@ +// Package packetmap implements remapping of sequence numbers and picture ids. +package packetmap + +import ( + "sync" +) + +const maxEntries = 128 + +type Map struct { + mu sync.Mutex + next uint16 + nextPid uint16 + delta uint16 + pidDelta uint16 + lastEntry uint16 + entries []entry +} + +type entry struct { + first, count uint16 + delta uint16 + pidDelta uint16 +} + +// Map maps a seqno, adding the mapping if required. It returns whether +// the seqno could be mapped, the target seqno, and the pid delta to apply. +func (m *Map) Map(seqno uint16, pid uint16) (bool, uint16, uint16) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.delta == 0 && m.entries == nil { + m.next = seqno + 1 + m.nextPid = pid + return true, seqno, 0 + } + + if compare(m.next, seqno) <= 0 { + if uint16(seqno-m.next) > 8*1024 { + m.reset() + m.next = seqno + 1 + m.nextPid = pid + return true, seqno, 0 + } + addMapping(m, seqno, pid, m.delta, m.pidDelta) + m.next = seqno + 1 + m.nextPid = pid + return true, seqno + m.delta, m.pidDelta + } + + if uint16(m.next-seqno) > 8*1024 { + m.reset() + m.next = seqno + 1 + m.nextPid = pid + return true, seqno, 0 + } + + return m.direct(seqno) +} + +func (m *Map) reset() { + m.next = 0 + m.nextPid = 0 + m.delta = 0 + m.pidDelta = 0 + m.lastEntry = 0 + m.entries = nil +} + +func addMapping(m *Map, seqno, pid uint16, delta, pidDelta uint16) { + if len(m.entries) == 0 { + m.entries = []entry{ + entry{ + first: seqno, + count: 1, + delta: delta, + pidDelta: pidDelta, + }, + } + return + } + + i := m.lastEntry + if delta == m.entries[i].delta && pidDelta == m.entries[i].pidDelta { + m.entries[m.lastEntry].count = seqno - m.entries[i].first + 1 + return + } + + e := entry{ + first: seqno, + count: 1, + delta: delta, + pidDelta: pidDelta, + } + + if len(m.entries) < maxEntries { + m.entries = append(m.entries, e) + m.lastEntry = uint16(len(m.entries) - 1) + return + } + + j := (m.lastEntry + 1) % maxEntries + m.entries[j] = e + m.lastEntry = j +} + +// direct maps a seqno to a target seqno. It returns true if the seqno +// could be mapped, the target seqno, and the pid delta to apply. +// Called with the m.mu taken. +func (m *Map) direct(seqno uint16) (bool, uint16, uint16) { + if len(m.entries) == 0 { + return false, 0, 0 + } + i := m.lastEntry + for { + f := m.entries[i].first + if seqno >= f { + if seqno < f+m.entries[i].count { + return true, + seqno + m.entries[i].delta, + m.entries[i].pidDelta + } + return false, 0, 0 + } + if i > 0 { + i-- + } else { + i = uint16(len(m.entries) - 1) + } + if i == m.lastEntry { + break + } + } + return false, 0, 0 +} + +// Reverse maps a target seqno to the original seqno. It returns true if +// the seqno could be mapped, the original seqno, and the pid delta to +// apply in reverse. +func (m *Map) Reverse(seqno uint16) (bool, uint16, uint16) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.delta == 0 && m.entries == nil { + return true, seqno, 0 + } + if m.entries == nil { + if m.delta == 0 { + return true, seqno, 0 + } + return false, 0, 0 + } + + i := m.lastEntry + for { + f := m.entries[i].first + m.entries[i].delta + if seqno >= f { + if seqno < f+m.entries[i].count { + return true, + seqno - m.entries[i].delta, + m.entries[i].pidDelta + } + return false, 0, 0 + } + if i > 0 { + i-- + } else { + i = uint16(len(m.entries) - 1) + } + if i == m.lastEntry { + break + } + } + return false, 0, 0 +} + +// Drop attempts to record a dropped packet. It returns true if the +// packet is safe to drop. +func (m *Map) Drop(seqno uint16, pid uint16) bool { + m.mu.Lock() + defer m.mu.Unlock() + + if seqno != m.next { + return false + } + + m.pidDelta += pid - m.nextPid + m.nextPid = pid + + m.delta-- + m.next = seqno + 1 + return true +} + +// compare performs comparison modulo 2^16. +func compare(s1, s2 uint16) int { + if s1 == s2 { + return 0 + } + if ((s2 - s1) & 0x8000) != 0 { + return 1 + } + return -1 +} diff --git a/packetmap/packetmap_test.go b/packetmap/packetmap_test.go new file mode 100644 index 0000000..e0d69a3 --- /dev/null +++ b/packetmap/packetmap_test.go @@ -0,0 +1,183 @@ +package packetmap + +import ( + "testing" +) + +func TestNoDrops(t *testing.T) { + m := Map{} + + ok, s, p := m.Map(42, 1001) + if !ok || s != 42 || p != 0 { + t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(43, 1001) + if !ok || s != 43 || p != 0 { + t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(44, 1002) + if !ok || s != 44 || p != 0 { + t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(40, 1000) + if !ok || s != 40 || p != 0 { + t.Errorf("Expected 40, 0, got %v, %v, %v", ok, s, p) + } + + if len(m.entries) > 0 || m.delta != 0 || m.pidDelta != 0 { + t.Errorf("Expected 0, got %v %v %v", + len(m.entries), m.delta, m.pidDelta) + } +} + +func TestDrop(t *testing.T) { + m := Map{} + + ok, s, p := m.Map(42, 1001) + if !ok || s != 42 || p != 0 { + t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p) + } + + ok = m.Drop(43, 1001) + if !ok || m.pidDelta != 0 { + t.Errorf("Expected 0, got %v, %v", ok, m.pidDelta) + } + + ok, s, p = m.Map(44, 1001) + if !ok || s != 43 || p != 0 { + t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(45, 1002) + if !ok || s != 44 || p != 0 { + t.Errorf("Expected 44, 0, got %v, %v, %v", ok, s, p) + } + + ok = m.Drop(46, 1003) + if !ok || m.pidDelta != 1 { + t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta) + } + + ok, s, p = m.Map(47, 1003) + if !ok || s != 45 || p != 1 { + t.Errorf("Expected 45, 1, got %v, %v, %v", ok, s, p) + } + + ok = m.Drop(48, 1003) + if !ok || m.pidDelta != 1 { + t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta) + } + + ok, s, p = m.Map(49, 1003) + if !ok || s != 46 || p != 1 { + t.Errorf("Expected 45, 1, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(60, 1007) + if !ok || s != 57 || p != 1 { + t.Errorf("Expected 57, 1, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(13, 1000) + if ok { + t.Errorf("Expected not ok") + } + + ok, s, p = m.Map(44, 1001) + if !ok || s != 43 || p != 0 { + t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(45, 1002) + if !ok || s != 44 || p != 0 { + t.Errorf("Expected 44, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(48, 3) + if ok { + t.Errorf("Expected not ok") + } + + ok, s, p = m.direct(1000) + if ok { + t.Errorf("Expected not ok") + } + + ok, s, p = m.direct(13) + if ok { + t.Errorf("Expected not ok") + } + + ok, s, p = m.Reverse(44) + if !ok || s != 45 || p != 0 { + t.Errorf("Expected 45, 0, got %v %v %v", ok, s, p) + } +} + +func TestWraparound(t *testing.T) { + m := Map{} + + ok, s, p := m.Map(0, 0) + if !ok || s != 0 || p != 0 { + t.Errorf("Expected 0, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(1, 0) + if !ok || s != 1 || p != 0 { + t.Errorf("Expected 1, 0, got %v, %v, %v", ok, s, p) + } + + ok = m.Drop(2, 1) + if !ok || m.pidDelta != 1 { + t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta) + } + + ok = m.Drop(3, 1) + if !ok || m.pidDelta != 1 { + t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta) + } + + for i := 4; i < 256000; i++ { + ok, s, p = m.Map(uint16(i), uint16((i/2) & 0x7FFF)) + if !ok || s != uint16(i-2) || p != 1 { + t.Errorf("Expected %v, %v, got %v, %v, %v", + uint16(i-2), 1, ok, s, p) + } + } +} + +func TestReset(t *testing.T) { + m := Map{} + + ok, s, p := m.Map(42, 1001) + if !ok || s != 42 || p != 0 { + t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p) + } + + ok = m.Drop(43, 1001) + if !ok || m.pidDelta != 0 { + t.Errorf("Expected 0, got %v, %v", ok, m.pidDelta) + } + + ok, s, p = m.Map(44, 1001) + if !ok || s != 43 || p != 0 { + t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p) + } + + ok, s, p = m.Map(40000, 2001) + if !ok || s != 40000 || p != 0 { + t.Errorf("Expected 32000, 0, got %v, %v, %v", ok, s, p) + } + + if m.delta != 0 || m.entries != nil { + t.Errorf("Expected reset") + } + + ok, s, p = m.Map(40001, 2001) + if !ok || s != 40001 || p != 0 { + t.Errorf("Expected 32001, 0, got %v, %v, %v", ok, s, p) + } +} diff --git a/rtpconn/codec.go b/rtpconn/codec.go index e9e2383..cd3c857 100644 --- a/rtpconn/codec.go +++ b/rtpconn/codec.go @@ -1,6 +1,7 @@ package rtpconn import ( + "errors" "strings" "github.com/pion/rtp" @@ -108,3 +109,101 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) { return false, false } } + +var errTruncated = errors.New("truncated packet") +var errUnsupportedCodec = errors.New("unsupported codec") + +func packetFlags(codec string, buf []byte) (seqno uint16, start bool, pid uint16, tid uint8, sid uint8, layersync bool, discardable bool, err error) { + if len(buf) < 12 { + err = errTruncated + return + } + + seqno = (uint16(buf[2]) << 8) | uint16(buf[3]) + + if strings.EqualFold(codec, "video/vp8") { + var packet rtp.Packet + err = packet.Unmarshal(buf) + if err != nil { + return + } + var vp8 codecs.VP8Packet + _, err = vp8.Unmarshal(packet.Payload) + if err != nil { + return + } + + start = vp8.S == 1 && vp8.PID == 0 + pid = vp8.PictureID + tid = vp8.TID + layersync = vp8.Y == 1 + discardable = vp8.N == 1 + return + } else if strings.EqualFold(codec, "video/vp9") { + var packet rtp.Packet + err = packet.Unmarshal(buf) + if err != nil { + return + } + var vp9 codecs.VP9Packet + _, err = vp9.Unmarshal(packet.Payload) + if err != nil { + return + } + start = vp9.B + tid = vp9.TID + sid = vp9.SID + layersync = vp9.U + return + } + return +} + +func rewritePacket(codec string, data []byte, seqno uint16, delta uint16) error { + if len(data) < 12 { + return errTruncated + } + + data[2] = uint8(seqno >> 8) + data[3] = uint8(seqno) + if delta == 0 { + return nil + } + + offset := 12 + offset += int(data[0]&0x0F) * 4 + if len(data) < offset+4 { + return errTruncated + } + + if (data[0] & 0x10) != 0 { + length := uint16(data[offset+2])<<8 | uint16(data[offset+3]) + offset += 4 + int(length)*4 + if len(data) < offset+4 { + return errTruncated + } + } + + if strings.EqualFold(codec, "video/vp8") { + x := (data[offset] & 0x80) != 0 + if !x { + return nil + } + i := (data[offset+1] & 0x80) != 0 + if !i { + return nil + } + m := (data[offset+2] & 0x80) != 0 + if m { + pid := (uint16(data[offset+2]&0x7F) << 8) | + uint16(data[offset+3]) + pid = (pid + delta) & 0x7FFF + data[offset+2] = 0x80 | byte((pid>>8)&0x7F) + data[offset+3] = byte(pid & 0xFF) + } else { + data[offset+2] = (data[offset+2] + uint8(delta)) & 0x7F + } + return nil + } + return errUnsupportedCodec +} diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index f93fc85..aa6c4c3 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -20,6 +20,7 @@ import ( "github.com/jech/galene/ice" "github.com/jech/galene/jitter" "github.com/jech/galene/packetcache" + "github.com/jech/galene/packetmap" "github.com/jech/galene/rtptime" ) @@ -74,6 +75,7 @@ type downTrackAtomics struct { srNTP uint64 remoteNTP uint64 remoteRTP uint32 + layerInfo uint32 } type rtpDownTrack struct { @@ -81,6 +83,7 @@ type rtpDownTrack struct { sender *webrtc.RTPSender remote conn.UpTrack ssrc webrtc.SSRC + packetmap packetmap.Map maxBitrate *bitrate maxREMBBitrate *bitrate rate *estimator.Estimator @@ -89,14 +92,6 @@ type rtpDownTrack struct { cname atomic.Value } -func (down *rtpDownTrack) Write(buf []byte) (int, error) { - n, err := down.track.Write(buf) - if err == nil { - down.rate.Accumulate(uint32(n)) - } - return n, err -} - func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) { atomic.StoreUint64(&down.atomics.remoteNTP, ntp) atomic.StoreUint32(&down.atomics.remoteRTP, rtp) @@ -131,6 +126,17 @@ func (down *rtpDownTrack) SetCname(cname string) { down.cname.Store(cname) } +func (down *rtpDownTrack) getLayerInfo() (uint8, uint8, uint8) { + info := atomic.LoadUint32(&down.atomics.layerInfo) + return uint8(info >> 16), uint8(info >> 8), uint8(info) +} + +func (down *rtpDownTrack) setLayerInfo(layer, wanted, max uint8) { + atomic.StoreUint32(&down.atomics.layerInfo, + (uint32(layer)<<16)|(uint32(wanted)<<8)|uint32(max), + ) +} + const ( negotiationUnneeded = iota negotiationNeeded @@ -179,17 +185,108 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, return conn, nil } -func (t *rtpDownTrack) GetMaxBitrate() uint64 { +var packetBufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, packetcache.BufSize) + }, +} + +func (down *rtpDownTrack) Write(buf []byte) (int, error) { + codec := down.remote.Codec().MimeType + + seqno, start, pid, tid, _, u, _, err := packetFlags(codec, buf) + if err != nil { + return 0, err + } + + layer, wantedLayer, maxLayer := down.getLayerInfo() + + if tid > maxLayer { + if layer == maxLayer { + wantedLayer = tid + layer = tid + } + maxLayer = tid + if wantedLayer > maxLayer { + wantedLayer = maxLayer + } + down.setLayerInfo(layer, wantedLayer, maxLayer) + down.adjustLayer() + } + if start && layer != wantedLayer { + if u || wantedLayer < layer { + layer = wantedLayer + down.setLayerInfo(layer, wantedLayer, maxLayer) + } + } + + if tid > layer { + ok := down.packetmap.Drop(seqno, pid) + if ok { + return 0, nil + } + } + + ok, newseqno, piddelta := down.packetmap.Map(seqno, pid) + if !ok { + return 0, nil + } + + if newseqno == seqno && piddelta == 0 { + return down.write(buf) + } + + ibuf2 := packetBufPool.Get() + defer packetBufPool.Put(ibuf2) + buf2 := ibuf2.([]byte) + + n := copy(buf2, buf) + err = rewritePacket(codec, buf2[:n], newseqno, piddelta) + if err != nil { + return 0, err + } + return down.write(buf2[:n]) +} + +func (down *rtpDownTrack) write(buf []byte) (int, error) { + n, err := down.track.Write(buf) + if err == nil { + down.rate.Accumulate(uint32(n)) + } + return n, err +} + +func (t *rtpDownTrack) GetMaxBitrate() (uint64, int) { now := rtptime.Jiffies() + layer, _, _ := t.getLayerInfo() r := t.maxBitrate.Get(now) if r == ^uint64(0) { r = 512 * 1024 } rr := t.maxREMBBitrate.Get(now) if rr == 0 || r < rr { - return r + return r, int(layer) + } + return rr, int(layer) +} + +func (t *rtpDownTrack) adjustLayer() { + max, _ := t.GetMaxBitrate() + r, _ := t.rate.Estimate() + rate := uint64(r) * 8 + if rate < max*7/8 { + layer, wanted, max := t.getLayerInfo() + if layer < max { + wanted = layer + 1 + t.setLayerInfo(layer, wanted, max) + } + } else if rate > max*3/2 { + layer, wanted, max := t.getLayerInfo() + if layer > 0 { + wanted = layer - 1 + t.setLayerInfo(layer, wanted, max) + } } - return rr } func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { @@ -240,6 +337,7 @@ type rtpUpTrack struct { srTime uint64 srNTPTime uint64 srRTPTime uint32 + maxLayer uint8 local []conn.DownTrack bufferedNACKs []uint16 } @@ -598,7 +696,11 @@ func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLaye var packet rtp.Packet buf := make([]byte, packetcache.BufSize) for _, nack := range p.Nacks { - nack.Range(func(seqno uint16) bool { + nack.Range(func(s uint16) bool { + ok, seqno, _ := track.packetmap.Reverse(s) + if !ok { + return true + } l := track.remote.GetRTP(seqno, buf) if l == 0 { unhandled = append(unhandled, seqno) @@ -785,28 +887,44 @@ func sendUpRTCP(up *rtpUpConnection) error { continue } ssrcs = append(ssrcs, uint32(t.track.SSRC())) - var r uint64 if t.Kind() == webrtc.RTPCodecTypeAudio { - r = 100 * 1024 + rate += 100 * 1024 } else if t.Label() == "l" { - r = group.LowBitrate + rate += group.LowBitrate } else { + minrate := ^uint64(0) + maxrate := uint64(group.MinBitrate) + maxlayer := 0 local := t.getLocal() - r = ^uint64(0) for _, down := range local { - rr := down.GetMaxBitrate() - if rr < group.MinBitrate { - rr = group.MinBitrate + r, l := down.GetMaxBitrate() + if maxlayer < l { + maxlayer = l } - if r > rr { - r = rr + if r < group.MinBitrate { + r = group.MinBitrate + } + if minrate > r { + minrate = r + } + if maxrate < r { + maxrate = r } } - if r == ^uint64(0) { - r = 512 * 1024 + // assume that each layer takes two times less + // throughput than the higher one. Then we've + // got enough slack for a factor of 2^(layers-1). + for i := 0; i < maxlayer; i++ { + if minrate < ^uint64(0)/2 { + minrate *= 2 + } + } + if minrate < maxrate { + rate += minrate + } else { + rate += maxrate } } - rate += r } if rate < ^uint64(0) && len(ssrcs) > 0 { @@ -968,6 +1086,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT continue } + adjust := false jiffies := rtptime.Jiffies() for _, p := range ps { @@ -994,10 +1113,12 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT } case *rtcp.ReceiverEstimatedMaximumBitrate: track.maxREMBBitrate.Set(p.Bitrate, jiffies) + adjust = true case *rtcp.ReceiverReport: for _, r := range p.Reports { if r.SSRC == uint32(track.ssrc) { handleReport(track, r, jiffies) + adjust = true } } case *rtcp.SenderReport: @@ -1010,6 +1131,9 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT gotNACK(conn, track, p) } } + if adjust { + track.adjustLayer() + } } } diff --git a/rtpconn/rtpstats.go b/rtpconn/rtpstats.go index f5ef174..dee64b9 100644 --- a/rtpconn/rtpstats.go +++ b/rtpconn/rtpstats.go @@ -46,9 +46,11 @@ func (c *webClient) GetStats() *stats.Client { jiffies := rtptime.Jiffies() for _, down := range c.down { conns := stats.Conn{ - Id: down.id, + Id: down.id, } for _, t := range down.tracks { + l, _, _ := t.getLayerInfo() + layer := int(l) rate, _ := t.rate.Estimate() rtt := rtptime.ToDuration(t.getRTT(), rtptime.JiffiesPerSec) @@ -56,6 +58,7 @@ func (c *webClient) GetStats() *stats.Client { j := time.Duration(jitter) * time.Second / time.Duration(t.track.Codec().ClockRate) conns.Tracks = append(conns.Tracks, stats.Track{ + Layer: &layer, Bitrate: uint64(rate) * 8, MaxBitrate: t.maxBitrate.Get(jiffies), Loss: float64(loss) / 256.0, diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index 618a08a..1bc0ec4 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -1270,6 +1270,9 @@ func handleClientMessage(c *webClient, m clientMessage) error { return closeDownConn(c, m.Id, message) } down := getDownConn(c, m.Id) + if down == nil { + return ErrUnknownId + } if down.negotiationNeeded > negotiationUnneeded { err := negotiate( c, down, diff --git a/static/stats.js b/static/stats.js index e00a208..6290015 100644 --- a/static/stats.js +++ b/static/stats.js @@ -95,23 +95,26 @@ function formatTrack(table, track) { tr.appendChild(document.createElement('td')); tr.appendChild(document.createElement('td')); let td = document.createElement('td'); - if(track.maxBitrate) - td.textContent = `${track.bitrate||0}/${track.maxBitrate}`; - else - td.textContent = `${track.bitrate||0}`; + td.textContent = track.layer; tr.appendChild(td); let td2 = document.createElement('td'); - td2.textContent = `${Math.round(track.loss * 100)}%`; + if(track.maxBitrate) + td2.textContent = `${track.bitrate||0}/${track.maxBitrate}`; + else + td2.textContent = `${track.bitrate||0}`; tr.appendChild(td2); let td3 = document.createElement('td'); + td3.textContent = `${Math.round(track.loss * 100)}%`; + tr.appendChild(td3); + let td4 = document.createElement('td'); let text = ''; if(track.rtt) { text = text + `${Math.round(track.rtt * 1000) / 1000}ms`; } if(track.jitter) text = text + `±${Math.round(track.jitter * 1000) / 1000}ms`; - td3.textContent = text; - tr.appendChild(td3); + td4.textContent = text; + tr.appendChild(td4); table.appendChild(tr); } diff --git a/stats/stats.go b/stats/stats.go index c812a5d..073e218 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -47,6 +47,7 @@ func (d *Duration) UnmarshalJSON(buf []byte) error { } type Track struct { + Layer *int `json:"layer,omitempty"` Bitrate uint64 `json:"bitrate"` MaxBitrate uint64 `json:"maxBitrate,omitempty"` Loss float64 `json:"loss"`