diff --git a/README.FRONTEND b/README.FRONTEND index f7cf64c..af858b4 100644 --- a/README.FRONTEND +++ b/README.FRONTEND @@ -29,7 +29,7 @@ let sc = new ServerConnection() serverConnection.onconnected = ...; serverConnection.onclose = ...; serverConnection.onusermessage = ...; -serverConnection.onpermissions = ...; +serverConnection.onjoined = ...; serverConnection.onuser = ...; serverConnection.onchat = ...; serverConnection.onclearchat = ...; @@ -55,18 +55,17 @@ You may now connect to the server. serverConnection.connect(`wss://${location.host}/ws`); ``` -You log-in, join a group and request media in the `onconnected` callback. +You typically join a group and request media in the `onconnected` callback: ```javascript serverConnection.onconnected = function() { - this.login(username, password); - this.join(group); + this.join(group, 'join', username, password); this.request('everything'); } ``` You should not attempt to push a stream to the server until it has granted -you the `present` permission through the `onpermissions` callback. +you the `present` permission through the `onjoined` callback. ## Managing groups and users diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index c65b2a8..faae688 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -252,11 +252,16 @@ func delUpConn(c *webClient, id string) bool { delete(c.up, id) c.mu.Unlock() - go func(clients []group.Client) { - for _, c := range clients { - c.PushConn(conn.id, nil, nil, "") - } - }(c.Group().GetClients(c)) + g := c.group + if g != nil { + go func(clients []group.Client) { + for _, c := range clients { + c.PushConn(conn.id, nil, nil, "") + } + }(g.GetClients(c)) + } else { + log.Printf("Deleting connection for client with no group") + } conn.pc.Close() return true @@ -577,7 +582,12 @@ func (c *webClient) setRequested(requested map[string]uint32) error { } func pushConns(c group.Client) { - clients := c.Group().GetClients(c) + group := c.Group() + if group == nil { + log.Printf("Pushing connections to unjoined client") + return + } + clients := group.GetClients(c) for _, cc := range clients { ccc, ok := cc.(*webClient) if ok { @@ -637,40 +647,38 @@ func (c *webClient) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label return nil } -func StartClient(conn *websocket.Conn) (err error) { +func readMessage(conn *websocket.Conn, m *clientMessage) error { + err := conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + if err != nil { + return err + } + defer conn.SetReadDeadline(time.Time{}) + + return conn.ReadJSON(&m) +} + +func StartClient(conn *websocket.Conn) error { var m clientMessage - err = conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + err := readMessage(conn, &m) if err != nil { conn.Close() - return - } - err = conn.ReadJSON(&m) - if err != nil { - conn.Close() - return - } - err = conn.SetReadDeadline(time.Time{}) - if err != nil { - conn.Close() - return + return err } - if m.Type != "login" { + if m.Type != "handshake" { conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage( websocket.CloseProtocolError, - "you must login first", + "you must handshake first", ), ) conn.Close() - return + return group.ProtocolError("client didn't handshake") } c := &webClient{ id: m.Id, - username: m.Username, - password: m.Password, actionCh: make(chan interface{}, 10), done: make(chan struct{}), } @@ -678,50 +686,20 @@ func StartClient(conn *websocket.Conn) (err error) { defer close(c.done) c.writeCh = make(chan interface{}, 25) + c.writerDone = make(chan struct{}) + go clientWriter(conn, c.writeCh, c.writerDone) defer func() { - if isWSNormalError(err) { - err = nil - c.close(nil) - } else { - m, e := errorToWSCloseMessage(c.id, err) + var e []byte + if !isWSNormalError(err) { + var m *clientMessage + m, e = errorToWSCloseMessage(c.id, err) if m != nil { c.write(*m) } - c.close(e) } + c.close(e) }() - c.writerDone = make(chan struct{}) - go clientWriter(conn, c.writeCh, c.writerDone) - - err = conn.ReadJSON(&m) - if err != nil { - return err - } - - if m.Type != "join" { - return group.ProtocolError("you must join a group first") - } - - g, err := group.AddClient(m.Group, c) - if err != nil { - if os.IsNotExist(err) { - err = group.UserError("group does not exist") - } else if err == group.ErrNotAuthorised { - err = group.UserError("not authorised") - time.Sleep(200 * time.Millisecond) - } - return - } - if redirect := g.Redirect(); redirect != "" { - // We normally redirect at the HTTP level, but the group - // description could have been edited in the meantime. - err = group.UserError("group is now at " + redirect) - return - } - c.group = g - defer group.DelClient(c) - return clientLoop(c, conn) } @@ -753,6 +731,13 @@ type kickAction struct { } func clientLoop(c *webClient, ws *websocket.Conn) error { + defer func() { + if c.group != nil { + group.DelClient(c) + c.group = nil + } + }() + read := make(chan interface{}, 1) go clientReader(ws, read, c.done) @@ -765,27 +750,6 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { } }() - perms := c.permissions - c.write(clientMessage{ - Type: "permissions", - Permissions: &perms, - }) - - h := c.group.GetChatHistory() - for _, m := range h { - err := c.write(clientMessage{ - Type: "chat", - Id: m.Id, - Username: m.User, - Time: m.Time, - Value: m.Value, - Kind: m.Kind, - }) - if err != nil { - return err - } - } - readTime := time.Now() ticker := time.NewTicker(10 * time.Second) @@ -889,9 +853,15 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { } case permissionsChangedAction: + group := c.Group() + if group == nil { + return errors.New("Permissions changed in no group") + } perms := c.permissions c.write(clientMessage{ - Type: "permissions", + Type: "joined", + Kind: "change", + Group: group.Name(), Permissions: &perms, }) if !c.permissions.Present { @@ -1018,11 +988,66 @@ func kickClient(g *group.Group, id, user, dest string, message string) error { func handleClientMessage(c *webClient, m clientMessage) error { switch m.Type { - case "request": - err := c.setRequested(m.Request) + case "join": + if m.Kind == "leave" { + if c.group == nil || c.group.Name() != m.Group { + return group.ProtocolError("you are not joined") + } + c.group = nil + c.permissions = group.ClientPermissions{} + perms := c.permissions + return c.write(clientMessage{ + Type: "joined", + Kind: "leave", + Group: m.Group, + Permissions: &perms, + }) + } + + if m.Kind != "join" { + return group.ProtocolError("unknown kind") + } + + if c.group != nil { + return group.ProtocolError("cannot join multiple groups") + } + c.username = m.Username + c.password = m.Password + g, err := group.AddClient(m.Group, c) if err != nil { + if os.IsNotExist(err) { + return c.error( + group.UserError("group does not exist"), + ) + } else if err == group.ErrNotAuthorised { + time.Sleep(200 * time.Millisecond) + return c.write(clientMessage{ + Type: "joined", + Kind: "fail", + Group: m.Group, + Permissions: &group.ClientPermissions{}, + Value: "not authorised", + }) + } return err } + if redirect := g.Redirect(); redirect != "" { + // We normally redirect at the HTTP level, but the group + // description could have been edited in the meantime. + return c.error( + group.UserError("group is now at " + redirect), + ) + } + c.group = g + perms := c.permissions + return c.write(clientMessage{ + Type: "joined", + Kind: "join", + Group: m.Group, + Permissions: &perms, + }) + case "request": + return c.setRequested(m.Request) case "offer": if !c.permissions.Present { c.write(clientMessage{ @@ -1080,16 +1105,20 @@ func handleClientMessage(c *webClient, m clientMessage) error { } case "chat", "usermessage": if m.Id != c.id { - return group.UserError("wrong sender id") + return group.ProtocolError("wrong sender id") } if m.Username != "" && m.Username != c.username { - return group.UserError("wrong sender username") + return group.ProtocolError("wrong sender username") + } + g := c.group + if g == nil { + return c.error(group.UserError("join a group first")) } tm := group.ToJSTime(time.Now()) if m.Type == "chat" { if m.Dest == "" { - c.group.AddToChatHistory( + g.AddToChatHistory( m.Id, m.Username, tm, m.Kind, m.Value, ) } @@ -1105,7 +1134,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { Value: m.Value, } if m.Dest == "" { - clients := c.group.GetClients(nil) + clients := g.GetClients(nil) for _, cc := range clients { ccc, ok := cc.(*webClient) if ok { @@ -1113,7 +1142,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { } } } else { - cc := c.group.GetClient(m.Dest) + cc := g.GetClient(m.Dest) if cc == nil { return c.error(group.UserError("user unknown")) } @@ -1125,16 +1154,20 @@ func handleClientMessage(c *webClient, m clientMessage) error { } case "groupaction": if m.Id != c.id { - return group.UserError("wrong sender id") + return group.ProtocolError("wrong sender id") } if m.Username != "" && m.Username != c.username { - return group.UserError("wrong sender username") + return group.ProtocolError("wrong sender username") + } + g := c.group + if g == nil { + return c.error(group.UserError("join a group first")) } switch m.Kind { case "clearchat": - c.group.ClearChatHistory() + g.ClearChatHistory() m := clientMessage{Type: "clearchat"} - clients := c.group.GetClients(nil) + clients := g.GetClients(nil) for _, cc := range clients { cc, ok := cc.(*webClient) if ok { @@ -1145,19 +1178,19 @@ func handleClientMessage(c *webClient, m clientMessage) error { if !c.permissions.Op { return c.error(group.UserError("not authorised")) } - c.group.SetLocked(m.Kind == "lock", m.Value) + g.SetLocked(m.Kind == "lock", m.Value) case "record": if !c.permissions.Record { return c.error(group.UserError("not authorised")) } - for _, cc := range c.group.GetClients(c) { + for _, cc := range g.GetClients(c) { _, ok := cc.(*diskwriter.Client) if ok { return c.error(group.UserError("already recording")) } } - disk := diskwriter.New(c.group) - _, err := group.AddClient(c.group.Name(), disk) + disk := diskwriter.New(g) + _, err := group.AddClient(g.Name(), disk) if err != nil { disk.Close() return c.error(err) @@ -1167,7 +1200,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { if !c.permissions.Record { return c.error(group.UserError("not authorised")) } - for _, cc := range c.group.GetClients(c) { + for _, cc := range g.GetClients(c) { disk, ok := cc.(*diskwriter.Client) if ok { disk.Close() @@ -1179,17 +1212,21 @@ func handleClientMessage(c *webClient, m clientMessage) error { } case "useraction": if m.Id != c.id { - return group.UserError("wrong sender id") + return group.ProtocolError("wrong sender id") } if m.Username != "" && m.Username != c.username { - return group.UserError("wrong sender username") + return group.ProtocolError("wrong sender username") + } + g := c.group + if g == nil { + return c.error(group.UserError("join a group first")) } switch m.Kind { case "op", "unop", "present", "unpresent": if !c.permissions.Op { return c.error(group.UserError("not authorised")) } - err := setPermissions(c.group, m.Dest, m.Kind) + err := setPermissions(g, m.Dest, m.Kind) if err != nil { return c.error(err) } @@ -1197,7 +1234,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { if !c.permissions.Op { return c.error(group.UserError("not authorised")) } - err := kickClient(c.group, m.Id, m.Username, m.Dest, m.Value) + err := kickClient(g, m.Id, m.Username, m.Dest, m.Value) if err != nil { return c.error(err) } @@ -1207,7 +1244,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { case "pong": // nothing case "ping": - c.write(clientMessage{ + return c.write(clientMessage{ Type: "pong", }) default: diff --git a/static/protocol.js b/static/protocol.js index 1a9e5c8..c22c4cc 100644 --- a/static/protocol.js +++ b/static/protocol.js @@ -108,11 +108,14 @@ function ServerConnection() { */ this.onuser = null; /** - * onpermissions is called whenever the current user's permissions change + * onjoined is called whenever we join or leave a group or whenever the + * permissions we have in a group change. * - * @type{(this: ServerConnection, permissions: Object) => void} + * kind is one of 'join', 'fail', 'change' or 'leave'. + * + * @type{(this: ServerConnection, kind: string, group: string, permissions: Object, message: string) => void} */ - this.onpermissions = null; + this.onjoined = null; /** * ondownstream is called whenever a new down stream is added. It * should set up the stream's callbacks; actually setting up the UI @@ -237,14 +240,16 @@ ServerConnection.prototype.connect = async function(url) { reject(e); }; this.socket.onopen = function(e) { + sc.send({ + type: 'handshake', + id: sc.id, + }); if(sc.onconnected) sc.onconnected.call(sc); resolve(sc); }; this.socket.onclose = function(e) { sc.permissions = {}; - if(sc.onpermissions) - sc.onpermissions.call(sc, {}); for(let id in sc.down) { let c = sc.down[id]; delete(sc.down[id]); @@ -252,6 +257,9 @@ ServerConnection.prototype.connect = async function(url) { if(c.onclose) c.onclose.call(c); } + if(sc.group && sc.onjoined) + sc.onjoined.call(sc, 'leave', sc.group, {}, ''); + sc.group = null; if(sc.onclose) sc.onclose.call(sc, e.code, e.reason); reject(new Error('websocket close ' + e.code + ' ' + e.reason)); @@ -280,10 +288,19 @@ ServerConnection.prototype.connect = async function(url) { case 'label': sc.gotLabel(m.id, m.value); break; - case 'permissions': + case 'joined': + if(sc.group) { + if(m.group !== sc.group) { + throw new Error('Joined multiple groups'); + } + } else { + sc.group = m.group; + } sc.permissions = m.permissions; - if(sc.onpermissions) - sc.onpermissions.call(sc, m.permissions); + if(sc.onjoined) + sc.onjoined.call(sc, m.kind, m.group, + m.permissions || {}, + m.value || null); break; case 'user': if(sc.onuser) @@ -324,28 +341,33 @@ ServerConnection.prototype.connect = async function(url) { } /** - * login authenticates with the server. + * join requests to join a group. The onjoined callback will be called + * when we've effectively joined. * - * @param {string} username - the username to login as. + * @param {string} group - The name of the group to join. + * @param {string} username - the username to join as. * @param {string} password - the password. */ -ServerConnection.prototype.login = function(username, password) { +ServerConnection.prototype.join = function(group, username, password) { this.send({ - type: 'login', - id: this.id, + type: 'join', + kind: 'join', + group: group, username: username, password: password, }); } /** - * join joins a group. + * leave leaves a group. The onjoined callback will be called when we've + * effectively left. * * @param {string} group - The name of the group to join. */ -ServerConnection.prototype.join = function(group) { +ServerConnection.prototype.leave = function(group) { this.send({ type: 'join', + kind: 'leave', group: group, }); } diff --git a/static/sfu.js b/static/sfu.js index 578783b..c48bc41 100644 --- a/static/sfu.js +++ b/static/sfu.js @@ -281,9 +281,7 @@ function setConnected(connected) { function gotConnected() { setConnected(true); let up = getUserPass(); - this.login(up.username, up.password); - this.join(group); - this.request(getSettings().request); + this.join(group, up.username, up.password); } /** @@ -1409,14 +1407,26 @@ function displayUsername() { let presentRequested = null; /** + * @this {ServerConnection} + * @param {string} group * @param {Object} perms */ -async function gotPermissions(perms) { +async function gotJoined(kind, group, perms, message) { + if(kind === 'fail') { + displayError('The server said: ' + message); + this.close(); + return; + } + displayUsername(); setButtonsVisibility(); + if(kind !== 'leave') + this.request(getSettings().request); + try { - if(serverConnection.permissions.present && !findUpMedia('local')) { + if(kind === 'join' && + serverConnection.permissions.present && !findUpMedia('local')) { if(presentRequested) { if(presentRequested === 'mike') updateSettings({video: ''}); @@ -2172,7 +2182,7 @@ async function serverConnect() { serverConnection.onclose = gotClose; serverConnection.ondownstream = gotDownStream; serverConnection.onuser = gotUser; - serverConnection.onpermissions = gotPermissions; + serverConnection.onjoined = gotJoined; serverConnection.onchat = addToChatbox; serverConnection.onclearchat = clearChat; serverConnection.onusermessage = function(id, dest, username, time, priviledged, kind, message) {