From 795a40ceafa7e6dd0cbefba69742732d18f42622 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Thu, 29 Apr 2021 17:03:25 +0200 Subject: [PATCH] Simulcast. --- README.PROTOCOL | 25 +++++++-- conn/conn.go | 3 +- diskwriter/diskwriter.go | 2 +- group/group.go | 10 +++- rtpconn/rtpconn.go | 106 ++++++++++++++++++++------------------- rtpconn/rtpreader.go | 10 ++++ rtpconn/rtpstats.go | 1 - rtpconn/webclient.go | 66 +++++++++++++++++------- static/galene.html | 2 + static/galene.js | 84 ++++++++++++++++++++++--------- static/protocol.js | 21 ++++---- 11 files changed, 218 insertions(+), 112 deletions(-) diff --git a/README.PROTOCOL b/README.PROTOCOL index 5d05c44..298aaa3 100644 --- a/README.PROTOCOL +++ b/README.PROTOCOL @@ -135,8 +135,17 @@ A peer must explicitly request the streams that it wants to receive. ``` The field `request` is a dictionary that maps the labels of requested -streams to a list containing either 'audio', 'video' or both. An entry -with an empty key `''` serves as default. +streams to a list containing either 'audio', or one of 'video' or +'video-low'. The empty key `''` serves as default. For example: + +```javascript +{ + type: 'request', + request: { + camera: ['audio', 'video-low'], + '': ['audio', 'video'] + } +} ## Pushing streams @@ -157,16 +166,22 @@ A stream is created by the sender with the `offer` message: If a stream with the same id exists, then this is a renegotation; otherwise this message creates a new stream. If the field `replace` is not empty, then this request additionally requests that an existing stream -with the given id should be closed, and the new stream should replace it. +with the given id should be closed, and the new stream should replace it; +this is used most notably when changing the simulcast envelope. -The field `label` is one of `camera`, `screenshare` or `video`, as in the -`request` message. +The field `label` is one of `camera`, `screenshare` or `video`, and will +be matched against the keys sent by the receiver in their `request` message. The field `sdp` contains the raw SDP string (i.e. the `sdp` field of a JSEP session description). Galène will interpret the `nack`, `nack pli`, `ccm fir` and `goog-remb` RTCP feedback types, and act accordingly. +The sender may either send a single stream per media section in the SDP, +or use rid-based simulcasting. In the latter case, it should send two +video streams, one with rid 'h' and high throughput, and one with rid 'l' +and throughput limited to roughly 100kbit/s. + The receiver may either abort the stream immediately (see below), or send an answer. diff --git a/conn/conn.go b/conn/conn.go index 1e7feda..33c893d 100644 --- a/conn/conn.go +++ b/conn/conn.go @@ -25,6 +25,7 @@ type UpTrack interface { AddLocal(DownTrack) error DelLocal(DownTrack) bool Kind() webrtc.RTPCodecType + Label() string Codec() webrtc.RTPCodecCapability // get a recent packet. Returns 0 if the packet is not in cache. GetRTP(seqno uint16, result []byte) uint16 @@ -33,7 +34,6 @@ type UpTrack interface { // Type Down represents a connection in the server to client direction. type Down interface { - GetMaxBitrate(now uint64) uint64 } // Type DownTrack represents a track in the server to client direction. @@ -42,4 +42,5 @@ type DownTrack interface { Accumulate(bytes uint32) SetTimeOffset(ntp uint64, rtp uint32) SetCname(string) + GetMaxBitrate() uint64 } diff --git a/diskwriter/diskwriter.go b/diskwriter/diskwriter.go index 4847b89..03c4970 100644 --- a/diskwriter/diskwriter.go +++ b/diskwriter/diskwriter.go @@ -600,7 +600,7 @@ func (conn *diskConn) initWriter(width, height uint32) error { return nil } -func (down *diskConn) GetMaxBitrate(now uint64) uint64 { +func (t *diskTrack) GetMaxBitrate() uint64 { return ^uint64(0) } diff --git a/group/group.go b/group/group.go index e6cbb74..8d7a8f0 100644 --- a/group/group.go +++ b/group/group.go @@ -13,6 +13,7 @@ import ( "time" "github.com/pion/ice/v2" + "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" ) @@ -60,7 +61,8 @@ type ChatHistoryEntry struct { } const ( - MinBitrate = 200000 + LowBitrate = 100000 + MinBitrate = 2 * LowBitrate ) type Group struct { @@ -252,6 +254,12 @@ func APIFromCodecs(codecs []webrtc.RTPCodecCapability) (*webrtc.API, error) { if UDPMin > 0 && UDPMax > 0 { s.SetEphemeralUDPPortRange(UDPMin, UDPMax) } + m.RegisterHeaderExtension( + webrtc.RTPHeaderExtensionCapability{sdp.SDESMidURI}, + webrtc.RTPCodecTypeVideo) + m.RegisterHeaderExtension( + webrtc.RTPHeaderExtensionCapability{sdp.SDESRTPStreamIDURI}, + webrtc.RTPCodecTypeVideo) return webrtc.NewAPI( webrtc.WithSettingEngine(s), diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index 837cd44..d512793 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -77,15 +77,16 @@ type downTrackAtomics struct { } type rtpDownTrack struct { - track *webrtc.TrackLocalStaticRTP - sender *webrtc.RTPSender - remote conn.UpTrack - ssrc webrtc.SSRC - maxBitrate *bitrate - rate *estimator.Estimator - stats *receiverStats - atomics *downTrackAtomics - cname atomic.Value + track *webrtc.TrackLocalStaticRTP + sender *webrtc.RTPSender + remote conn.UpTrack + ssrc webrtc.SSRC + maxBitrate *bitrate + maxREMBBitrate *bitrate + rate *estimator.Estimator + stats *receiverStats + atomics *downTrackAtomics + cname atomic.Value } func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error { @@ -140,7 +141,6 @@ type rtpDownConnection struct { id string pc *webrtc.PeerConnection remote conn.Up - maxREMBBitrate *bitrate iceCandidates []*webrtc.ICECandidateInit negotiationNeeded int @@ -174,31 +174,22 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, id: id, pc: pc, remote: remote, - maxREMBBitrate: new(bitrate), } return conn, nil } -func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 { - rate := down.maxREMBBitrate.Get(now) - var trackRate uint64 - tracks := down.getTracks() - for _, t := range tracks { - r := t.maxBitrate.Get(now) - if r == ^uint64(0) { - if t.track.Kind() == webrtc.RTPCodecTypeAudio { - r = 128 * 1024 - } else { - r = 512 * 1024 - } - } - trackRate += r +func (t *rtpDownTrack) GetMaxBitrate() uint64 { + now := rtptime.Jiffies() + r := t.maxBitrate.Get(now) + if r == ^uint64(0) { + r = 512 * 1024 } - if trackRate < rate { - return trackRate + rr := t.maxREMBBitrate.Get(now) + if rr == 0 || r < rr { + return r } - return rate + return rr } func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { @@ -311,6 +302,10 @@ func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 { return up.cache.Get(seqno, result) } +func (up *rtpUpTrack) Label() string { + return up.track.RID() +} + func (up *rtpUpTrack) Kind() webrtc.RTPCodecType { return up.track.Kind() } @@ -687,7 +682,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei for { firstSR := false - n, _, err := r.Read(buf) + n, _, err := r.ReadSimulcast(buf, track.track.RID()) if err != nil { if err != io.EOF && err != io.ErrClosedPipe { log.Printf("Read RTCP: %v", err) @@ -752,11 +747,11 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei } } -func sendUpRTCP(conn *rtpUpConnection) error { - tracks := conn.getTracks() +func sendUpRTCP(up *rtpUpConnection) error { + tracks := up.getTracks() - if len(conn.tracks) == 0 { - state := conn.pc.ConnectionState() + if len(up.tracks) == 0 { + state := up.pc.ConnectionState() if state == webrtc.PeerConnectionStateClosed { return io.ErrClosedPipe } @@ -765,7 +760,7 @@ func sendUpRTCP(conn *rtpUpConnection) error { now := rtptime.Jiffies() - reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) + reports := make([]rtcp.ReceptionReport, 0, len(up.tracks)) for _, t := range tracks { updateUpTrack(t) stats := t.cache.GetStats(true) @@ -810,29 +805,38 @@ func sendUpRTCP(conn *rtpUpConnection) error { }, } - rate := ^uint64(0) - - local := conn.getLocal() - for _, l := range local { - r := l.GetMaxBitrate(now) - if r < rate { - rate = r - } - } - - if rate < group.MinBitrate { - rate = group.MinBitrate - } - var ssrcs []uint32 + var rate uint64 for _, t := range tracks { if !t.hasRtcpFb("goog-remb", "") { continue } ssrcs = append(ssrcs, uint32(t.track.SSRC())) + var r uint64 + if t.Kind() == webrtc.RTPCodecTypeAudio { + r = 100 * 1024 + } else if t.Label() == "l" { + r = group.LowBitrate + } else { + local := t.getLocal() + r = ^uint64(0) + for _, down := range local { + rr := down.GetMaxBitrate() + if rr < group.MinBitrate { + rr = group.MinBitrate + } + if r > rr { + r = rr + } + } + if r == ^uint64(0) { + r = 512 * 1024 + } + } + rate += r } - if len(ssrcs) > 0 { + if rate < ^uint64(0) && len(ssrcs) > 0 { packets = append(packets, &rtcp.ReceiverEstimatedMaximumBitrate{ Bitrate: rate, @@ -840,7 +844,7 @@ func sendUpRTCP(conn *rtpUpConnection) error { }, ) } - return conn.pc.WriteRTCP(packets) + return up.pc.WriteRTCP(packets) } func rtcpUpSender(conn *rtpUpConnection) { @@ -1049,7 +1053,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT log.Printf("sendFIR: %v", err) } case *rtcp.ReceiverEstimatedMaximumBitrate: - conn.maxREMBBitrate.Set(p.Bitrate, jiffies) + track.maxREMBBitrate.Set(p.Bitrate, jiffies) case *rtcp.ReceiverReport: for _, r := range p.Reports { if r.SSRC == uint32(track.ssrc) { diff --git a/rtpconn/rtpreader.go b/rtpconn/rtpreader.go index d405d9b..f7e5990 100644 --- a/rtpconn/rtpreader.go +++ b/rtpconn/rtpreader.go @@ -149,6 +149,16 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { kf, _ := isKeyframe(codec.MimeType, &packet) + if packet.Extension { + packet.Extension = false + packet.Extensions = nil + bytes, err = packet.MarshalTo(buf) + if err != nil { + log.Printf("%v", err) + continue + } + } + first, index := track.cache.Store( packet.SequenceNumber, packet.Timestamp, kf, packet.Marker, buf[:bytes], diff --git a/rtpconn/rtpstats.go b/rtpconn/rtpstats.go index 16e934b..f5ef174 100644 --- a/rtpconn/rtpstats.go +++ b/rtpconn/rtpstats.go @@ -47,7 +47,6 @@ func (c *webClient) GetStats() *stats.Client { for _, down := range c.down { conns := stats.Conn{ Id: down.id, - MaxBitrate: down.GetMaxBitrate(jiffies), } for _, t := range down.tracks { rate, _ := t.rate.Estimate() diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index 4e6595d..b50bac3 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -380,14 +380,15 @@ func addDownTrackUnlocked(conn *rtpDownConnection, remoteTrack *rtpUpTrack, remo } track := &rtpDownTrack{ - track: local, - sender: sender, - ssrc: parms.Encodings[0].SSRC, - remote: remoteTrack, - maxBitrate: new(bitrate), - stats: new(receiverStats), - rate: estimator.New(time.Second), - atomics: &downTrackAtomics{}, + track: local, + sender: sender, + ssrc: parms.Encodings[0].SSRC, + remote: remoteTrack, + maxBitrate: new(bitrate), + maxREMBBitrate: new(bitrate), + stats: new(receiverStats), + rate: estimator.New(time.Second), + atomics: &downTrackAtomics{}, } conn.tracks = append(conn.tracks, track) @@ -646,33 +647,60 @@ func requestedTracks(c *webClient, up conn.Up, tracks []conn.UpTrack) []conn.UpT return nil } - var audio, video bool + var audio, video, videoLow bool for _, s := range r { switch s { case "audio": audio = true case "video": video = true + case "video-low": + videoLow = true default: log.Printf("client requested unknown value %v", s) } } + find := func(kind webrtc.RTPCodecType, labels ...string) conn.UpTrack { + for _, l := range labels { + for _, t := range tracks { + if t.Kind() != kind { + continue + } + if t.Label() == l { + return t + } + } + } + for _, t := range tracks { + if t.Kind() != kind { + continue + } + return t + } + return nil + } + var ts []conn.UpTrack if audio { - for _, t := range tracks { - if t.Kind() == webrtc.RTPCodecTypeAudio { - ts = append(ts, t) - break - } + t := find(webrtc.RTPCodecTypeAudio) + if t != nil { + ts = append(ts, t) } } if video { - for _, t := range tracks { - if t.Kind() == webrtc.RTPCodecTypeVideo { - ts = append(ts, t) - break - } + t := find( + webrtc.RTPCodecTypeVideo, "h", "m", "video", + ) + if t != nil { + ts = append(ts, t) + } + } else if videoLow { + t := find( + webrtc.RTPCodecTypeVideo, "l", "m", "video", + ) + if t != nil { + ts = append(ts, t) } } diff --git a/static/galene.html b/static/galene.html index 23b3585..7d44ff1 100644 --- a/static/galene.html +++ b/static/galene.html @@ -213,7 +213,9 @@ diff --git a/static/galene.js b/static/galene.js index 84d1787..3d4acb6 100644 --- a/static/galene.js +++ b/static/galene.js @@ -78,6 +78,7 @@ function getUserPass() { * @property {boolean} [localMute] * @property {string} [video] * @property {string} [audio] + * @property {boolean} [simulcast] * @property {string} [send] * @property {string} [request] * @property {boolean} [activityDetection] @@ -550,9 +551,15 @@ function mapRequest(what) { case 'audio': return {'': ['audio']}; break; + case 'screenshare-low': + return {screenshare: ['audio','video-low'], '': ['audio']}; + break; case 'screenshare': return {screenshare: ['audio','video'], '': ['audio']}; break; + case 'everything-low': + return {'': ['audio','video-low']}; + break; case 'everything': return {'': ['audio','video']} break; @@ -611,20 +618,25 @@ getInputElement('fileinput').onchange = function(e) { function gotUpStats(stats) { let c = this; - let text = ''; + let values = []; - c.pc.getSenders().forEach(s => { - let tid = s.track && s.track.id; - let stats = tid && c.stats[tid]; - let rate = stats && stats['outbound-rtp'] && stats['outbound-rtp'].rate; - if(typeof rate === 'number') { - if(text) - text = text + ' + '; - text = text + Math.round(rate / 1000) + 'kbps'; + for(let id in stats) { + if(stats[id] && stats[id]['outbound-rtp']) { + let rate = stats[id]['outbound-rtp'].rate; + if(typeof rate === 'number') { + values.push(rate); + } } - }); + } - setLabel(c, text); + if(values.length === 0) { + setLabel(c, ''); + } else { + values.sort((x,y) => x - y); + setLabel(c, values + .map(x => Math.round(x / 1000).toString()) + .reduce((x, y) => x + '+' + y)); + } } /** @@ -800,6 +812,7 @@ function newUpStream(localId) { * @param {number} [bps] */ async function setMaxVideoThroughput(c, bps) { + let simulcast = doSimulcast(); let senders = c.pc.getSenders(); for(let i = 0; i < senders.length; i++) { let s = senders[i]; @@ -808,17 +821,17 @@ async function setMaxVideoThroughput(c, bps) { let p = s.getParameters(); if(!p.encodings) p.encodings = [{}]; - p.encodings.forEach(e => { - if(bps > 0) - e.maxBitrate = bps; - else - delete e.maxBitrate; - }); - try { - await s.setParameters(p); - } catch(e) { - console.error(e); + if((!simulcast && p.encodings.length != 1) || + (simulcast && p.encodings.length != 2)) { + // change the simulcast envelope + await replaceUpStream(c); + return; } + p.encodings.forEach(e => { + if(!e.rid || e.rid === 'h') + e.maxBitrate = bps || unlimitedRate; + }); + await s.setParameters(p); } } @@ -1022,6 +1035,19 @@ function isSafari() { return ua.indexOf('safari') >= 0 && ua.indexOf('chrome') < 0; } +const unlimitedRate = 1000000000; +const simulcastRate = 100000; + +/** + * @returns {boolean} + */ +function doSimulcast() { + if(!getSettings().simulcast) + return false; + let bps = getMaxVideoThroughput(); + return bps <= 0 || bps >= 2 * simulcastRate; +} + /** * Sets up c to send the given stream. Some extra parameters are stored * in c.userdata. @@ -1029,6 +1055,7 @@ function isSafari() { * @param {Stream} c * @param {MediaStream} stream */ + function setUpStream(c, stream) { if(c.stream != null) throw new Error("Setting nonempty stream"); @@ -1073,11 +1100,20 @@ function setUpStream(c, stream) { c.close(); }; - let encodings = [{}]; + let encodings = []; if(t.kind === 'video') { + let simulcast = doSimulcast(); let bps = getMaxVideoThroughput(); - if(bps > 0) - encodings[0].maxBitrate = bps; + encodings.push({ + rid: 'h', + maxBitrate: bps || unlimitedRate, + }); + if(simulcast) + encodings.push({ + rid: 'l', + scaleResolutionDownBy: 2, + maxBitrate: simulcastRate, + }); } c.pc.addTransceiver(t, { direction: 'sendonly', diff --git a/static/protocol.js b/static/protocol.js index 8e220f2..9ac2994 100644 --- a/static/protocol.js +++ b/static/protocol.js @@ -1246,17 +1246,20 @@ Stream.prototype.updateStats = async function() { if(report) { for(let r of report.values()) { if(stid && r.type === 'outbound-rtp') { + let id = stid; + if(r.rid) + id = id + '-' + r.rid if(!('bytesSent' in r)) continue; - if(!stats[stid]) - stats[stid] = {}; - stats[stid][r.type] = {}; - stats[stid][r.type].timestamp = r.timestamp; - stats[stid][r.type].bytesSent = r.bytesSent; - if(old[stid] && old[stid][r.type]) - stats[stid][r.type].rate = - ((r.bytesSent - old[stid][r.type].bytesSent) * 1000 / - (r.timestamp - old[stid][r.type].timestamp)) * 8; + if(!stats[id]) + stats[id] = {}; + stats[id][r.type] = {}; + stats[id][r.type].timestamp = r.timestamp; + stats[id][r.type].bytesSent = r.bytesSent; + if(old[id] && old[id][r.type]) + stats[id][r.type].rate = + ((r.bytesSent - old[id][r.type].bytesSent) * 1000 / + (r.timestamp - old[id][r.type].timestamp)) * 8; } } }