diff --git a/README.PROTOCOL b/README.PROTOCOL index 298aaa3..f1b09d6 100644 --- a/README.PROTOCOL +++ b/README.PROTOCOL @@ -213,6 +213,16 @@ restart by sending a `renegotiate` message: } ``` +At any time after answering, the client may change the set of streams +being offered by sending a 'requestStream' request: +```javascript +{ + type: 'answerStream' + id: id, + request: [audio, video] +} +``` + ## Closing streams The offerer may close a stream at any time by sending a `close` message. diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index aa6c4c3..dab43fc 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -149,6 +149,7 @@ type rtpDownConnection struct { remote conn.Up iceCandidates []*webrtc.ICECandidateInit negotiationNeeded int + requested []string mu sync.Mutex tracks []*rtpDownTrack @@ -432,6 +433,7 @@ func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool { type rtpUpConnection struct { id string + client group.Client label string userId string username string @@ -585,7 +587,7 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon } } - up := &rtpUpConnection{id: id, label: label, pc: pc} + up := &rtpUpConnection{id: id, client: c, label: label, pc: pc} pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { up.mu.Lock() diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index 8facabf..2921c70 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -141,7 +141,7 @@ type clientMessage struct { SDP string `json:"sdp,omitempty"` Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"` Label string `json:"label,omitempty"` - Request map[string][]string `json:"request,omitempty"` + Request interface{} `json:"request,omitempty"` RTCConfiguration *webrtc.Configuration `json:"rtcConfiguration,omitempty"` } @@ -638,6 +638,52 @@ func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error { return conn.addICECandidate(candidate) } +var errBadType = errors.New("bad type") + +func toStringArray(r interface{}) ([]string, error) { + if r == nil { + return nil, nil + } + rr, ok := r.([]interface{}) + if !ok { + return nil, errBadType + } + if rr == nil { + return nil, nil + } + + rrr := make([]string, len(rr)) + for i, s := range rr { + rrr[i], ok = s.(string) + if !ok { + return nil, errBadType + } + } + return rrr, nil +} + +func parseRequested(r interface{}) (map[string][]string, error) { + if r == nil { + return nil, nil + } + rr, ok := r.(map[string]interface{}) + if !ok { + return nil, errBadType + } + if rr == nil { + return nil, nil + } + rrr := make(map[string][]string) + for k, v := range rr { + vv, err := toStringArray(v) + if err != nil { + return nil, err + } + rrr[k] = vv + } + return rrr, nil +} + func (c *webClient) setRequested(requested map[string][]string) error { if c.group == nil { return errors.New("attempted to request with no group joined") @@ -648,6 +694,16 @@ func (c *webClient) setRequested(requested map[string][]string) error { return nil } +func (c *webClient) setRequestedStream(down *rtpDownConnection, requested []string) error { + var remoteClient group.Client + remote, ok := down.remote.(*rtpUpConnection) + if ok { + remoteClient = remote.client + } + down.requested = requested + return remoteClient.RequestConns(c, c.group, remote.id) +} + func (c *webClient) RequestConns(target group.Client, g *group.Group, id string) error { return c.action(requestConnsAction{g, target, id}) } @@ -659,13 +715,17 @@ func requestConns(target group.Client, g *group.Group, id string) { } } -func requestedTracks(c *webClient, up conn.Up, tracks []conn.UpTrack) []conn.UpTrack { - r, ok := c.requested[up.Label()] - if !ok { - r, ok = c.requested[""] - } - if !ok || len(r) == 0 { - return nil +func requestedTracks(c *webClient, override []string, up conn.Up, tracks []conn.UpTrack) []conn.UpTrack { + r := override + if r == nil { + var ok bool + r, ok = c.requested[up.Label()] + if !ok { + r, ok = c.requested[""] + } + if !ok || len(r) == 0 { + return nil + } } var audio, video, videoLow bool @@ -894,8 +954,18 @@ func handleAction(c *webClient, a interface{}) error { return nil } var tracks []conn.UpTrack + var override []string if a.conn != nil { - tracks = requestedTracks(c, a.conn, a.tracks) + var old *rtpDownConnection + if a.replace != "" { + old = getDownConn(c, a.replace) + } else { + old = getDownConn(c, a.conn.Id()) + } + if old != nil { + override = old.requested + } + tracks = requestedTracks(c, override, a.conn, a.tracks) } if len(tracks) == 0 { @@ -938,6 +1008,7 @@ func handleAction(c *webClient, a interface{}) error { case requestConnsAction: g := c.group if g == nil || a.group != g { + log.Printf("Misdirected pushConns") return nil } for _, u := range c.up { @@ -1241,7 +1312,21 @@ func handleClientMessage(c *webClient, m clientMessage) error { } } case "request": - return c.setRequested(m.Request) + requested, err := parseRequested(m.Request) + if err != nil { + return err + } + return c.setRequested(requested) + case "requestStream": + down := getDownConn(c, m.Id) + if down == nil { + return ErrUnknownId + } + requested, err := toStringArray(m.Request) + if err != nil { + return err + } + c.setRequestedStream(down, requested) case "offer": if m.Id == "" { return errEmptyId diff --git a/static/protocol.js b/static/protocol.js index d2ed182..c6269d9 100644 --- a/static/protocol.js +++ b/static/protocol.js @@ -214,7 +214,7 @@ function ServerConnection() { * @property {string} [sdp] * @property {RTCIceCandidate} [candidate] * @property {string} [label] - * @property {Object>} [request] + * @property {Object>|Array} [request] * @property {Object} [rtcConfiguration] */ @@ -440,11 +440,11 @@ ServerConnection.prototype.leave = function(group) { }; /** - * request sets the list of requested media types. + * request sets the list of requested tracks * * @param {Object>} what - * - A dictionary that maps labels to a sequence of 'audio' and 'video'. - * An entry with an empty label '' provides the default. + * - A dictionary that maps labels to a sequence of 'audio', 'video' + * or 'video-low. An entry with an empty label '' provides the default. */ ServerConnection.prototype.request = function(what) { this.send({ @@ -1219,6 +1219,21 @@ Stream.prototype.restartIce = function () { c.negotiate(true); }; +/** + * request sets the list of tracks. If this is not called, or called with + * a null argument, then the default is provided by ServerConnection.request. + * + * @param {Array} what - a sequence of 'audio', 'video' or 'video-low'. + */ +Stream.prototype.request = function(what) { + let c = this; + c.sc.send({ + type: 'requestStream', + id: c.id, + request: what, + }); +}; + /** * updateStats is called periodically, if requested by setStatsInterval, * in order to recompute stream statistics and invoke the onstats handler.