package rtpconn import ( crand "crypto/rand" "encoding/base64" "encoding/json" "errors" "fmt" "log" "net" "os" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" "github.com/jech/galene/conn" "github.com/jech/galene/diskwriter" "github.com/jech/galene/estimator" "github.com/jech/galene/group" "github.com/jech/galene/ice" "github.com/jech/galene/token" "github.com/jech/galene/unbounded" ) func errorToWSCloseMessage(id string, err error) (*clientMessage, []byte) { var code int var m *clientMessage var text string switch e := err.(type) { case *websocket.CloseError: code = websocket.CloseNormalClosure case group.ProtocolError: code = websocket.CloseProtocolError m = &clientMessage{ Type: "usermessage", Kind: "error", Dest: id, Privileged: true, Value: e.Error(), } text = e.Error() case group.UserError, group.KickError: code = websocket.CloseNormalClosure m = errorMessage(id, err) text = e.Error() default: code = websocket.CloseInternalServerErr } return m, websocket.FormatCloseMessage(code, text) } func isWSNormalError(err error) bool { return websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) } type webClient struct { group *group.Group addr net.Addr id string username string permissions []string data map[string]interface{} requested map[string][]string done chan struct{} writeCh chan interface{} writerDone chan struct{} actions *unbounded.Channel[any] mu sync.Mutex down map[string]*rtpDownConnection up map[string]*rtpUpConnection } func (c *webClient) Group() *group.Group { return c.group } func (c *webClient) Addr() net.Addr { return c.addr } func (c *webClient) Id() string { return c.id } func (c *webClient) Username() string { return c.username } func (c *webClient) SetUsername(username string) { c.username = username } func (c *webClient) Permissions() []string { return c.permissions } func (c *webClient) Data() map[string]interface{} { return c.data } func (c *webClient) SetPermissions(perms []string) { c.permissions = perms } func (c *webClient) PushClient(group, kind, id string, username string, perms []string, data map[string]interface{}) error { c.action(pushClientAction{ group, kind, id, username, perms, data, }) return nil } type clientMessage struct { Type string `json:"type"` Version []string `json:"version,omitempty"` Kind string `json:"kind,omitempty"` Error string `json:"error,omitempty"` Id string `json:"id,omitempty"` Replace string `json:"replace,omitempty"` Source string `json:"source,omitempty"` Dest string `json:"dest,omitempty"` Username *string `json:"username,omitempty"` Password string `json:"password,omitempty"` Token string `json:"token,omitempty"` Privileged bool `json:"privileged,omitempty"` Permissions []string `json:"permissions,omitempty"` Status *group.Status `json:"status,omitempty"` Data map[string]interface{} `json:"data,omitempty"` Group string `json:"group,omitempty"` Value interface{} `json:"value,omitempty"` NoEcho bool `json:"noecho,omitempty"` Time string `json:"time,omitempty"` SDP string `json:"sdp,omitempty"` Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"` Label string `json:"label,omitempty"` Request interface{} `json:"request,omitempty"` RTCConfiguration *webrtc.Configuration `json:"rtcConfiguration,omitempty"` } type closeMessage struct { data []byte } func getUpConn(c *webClient, id string) *rtpUpConnection { c.mu.Lock() defer c.mu.Unlock() if c.up == nil { return nil } return c.up[id] } func getUpConns(c *webClient) []*rtpUpConnection { c.mu.Lock() defer c.mu.Unlock() up := make([]*rtpUpConnection, 0, len(c.up)) for _, u := range c.up { up = append(up, u) } return up } func addUpConn(c *webClient, id, label string, offer string) (*rtpUpConnection, bool, error) { c.mu.Lock() defer c.mu.Unlock() if c.up == nil { c.up = make(map[string]*rtpUpConnection) } if c.down != nil && c.down[id] != nil { return nil, false, errors.New("adding duplicate connection") } old := c.up[id] if old != nil { return old, false, nil } conn, err := newUpConn(c, id, label, offer) if err != nil { return nil, false, err } c.up[id] = conn conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { sendICE(c, id, candidate) }) conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { if state == webrtc.ICEConnectionStateFailed { c.action(connectionFailedAction{id: id}) } }) return conn, true, nil } var ErrUserMismatch = errors.New("user id mismatch") // delUpConn deletes an up connection. If push is closed, the close is // pushed to all corresponding down connections. func delUpConn(c *webClient, id string, userId string, push bool) error { c.mu.Lock() if c.up == nil { c.mu.Unlock() return os.ErrNotExist } conn := c.up[id] if conn == nil { c.mu.Unlock() return os.ErrNotExist } if userId != "" { id, _ := conn.User() if id != userId { c.mu.Unlock() return ErrUserMismatch } } replace := conn.getReplace(false) delete(c.up, id) g := c.group c.mu.Unlock() conn.mu.Lock() conn.closed = true conn.mu.Unlock() conn.pc.Close() if push && g != nil { for _, c := range g.GetClients(c) { err := c.PushConn(g, id, nil, nil, replace) if err != nil { log.Printf("PushConn: %v", err) } } } return nil } func getDownConn(c *webClient, id string) *rtpDownConnection { if c.down == nil { return nil } c.mu.Lock() defer c.mu.Unlock() conn := c.down[id] if conn == nil { return nil } return conn } func getConn(c *webClient, id string) iceConnection { up := getUpConn(c, id) if up != nil { return up } down := getDownConn(c, id) if down != nil { return down } return nil } func addDownConn(c *webClient, remote conn.Up) (*rtpDownConnection, bool, error) { id := remote.Id() c.mu.Lock() defer c.mu.Unlock() if c.up != nil && c.up[id] != nil { return nil, false, errors.New("adding duplicate connection") } if c.down == nil { c.down = make(map[string]*rtpDownConnection) } if down := c.down[id]; down != nil { return down, false, nil } down, err := newDownConn(c, id, remote) if err != nil { return nil, false, err } down.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { sendICE(c, down.id, candidate) }) down.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { if state == webrtc.ICEConnectionStateFailed { c.action(connectionFailedAction{id: down.id}) } }) err = remote.AddLocal(down) if err != nil { down.pc.Close() return nil, false, err } c.down[down.id] = down go rtcpDownSender(down) return down, true, nil } func delDownConn(c *webClient, id string) error { conn := delDownConnHelper(c, id) if conn != nil { conn.pc.Close() return nil } return os.ErrNotExist } func delDownConnHelper(c *webClient, id string) *rtpDownConnection { c.mu.Lock() defer c.mu.Unlock() if c.down == nil { return nil } conn := c.down[id] if conn == nil { return nil } conn.remote.DelLocal(conn) for _, track := range conn.tracks { // we only insert the track after we get an answer, so // ignore errors here. track.remote.DelLocal(track) } delete(c.down, id) return conn } var errUnexpectedTrackType = errors.New("unexpected track type, this shouldn't happen") func addDownTrackUnlocked(conn *rtpDownConnection, remoteTrack *rtpUpTrack) error { for _, t := range conn.tracks { tt, ok := t.remote.(*rtpUpTrack) if !ok { return errUnexpectedTrackType } if tt == remoteTrack { return os.ErrExist } } id := remoteTrack.track.ID() if id == "" { log.Println("Got track with empty id") id = remoteTrack.track.RID() } if id == "" { id = remoteTrack.track.Kind().String() } msid := remoteTrack.track.StreamID() if msid == "" || msid == "-" { log.Println("Got track with empty msid") msid = remoteTrack.conn.Label() } if msid == "" { msid = "dummy" } // replace the RTCP feedback types with the ones we understand remoteCodec := remoteTrack.Codec() if strings.HasPrefix(strings.ToLower(remoteCodec.MimeType), "video/") { remoteCodec.RTCPFeedback = group.VideoRTCPFeedback } else { remoteCodec.RTCPFeedback = group.AudioRTCPFeedback } local, err := webrtc.NewTrackLocalStaticRTP( remoteCodec, id, msid, ) if err != nil { return err } transceiver, err := conn.pc.AddTransceiverFromTrack(local, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionSendonly, }, ) if err != nil { return err } codec := local.Codec() ptype, err := group.CodecPayloadType(local.Codec()) if err != nil { log.Printf("Couldn't determine ptype for codec %v: %v", codec.MimeType, err) } else { err := transceiver.SetCodecPreferences( []webrtc.RTPCodecParameters{ { RTPCodecCapability: codec, PayloadType: ptype, }, }, ) if err != nil { log.Printf("Couldn't set ptype for codec %v: %v", codec.MimeType, err) } } parms := transceiver.Sender().GetParameters() if len(parms.Encodings) != 1 { return errors.New("got multiple encodings") } track := &rtpDownTrack{ track: local, sender: transceiver.Sender(), ssrc: parms.Encodings[0].SSRC, conn: conn, remote: remoteTrack, maxBitrate: new(bitrate), maxREMBBitrate: new(bitrate), stats: new(receiverStats), rate: estimator.New(time.Second), atomics: &downTrackAtomics{}, } conn.tracks = append(conn.tracks, track) go rtcpDownListener(track) return nil } func delDownTrackUnlocked(conn *rtpDownConnection, track *rtpDownTrack) error { for i := range conn.tracks { if conn.tracks[i] == track { track.remote.DelLocal(track) conn.tracks = append(conn.tracks[:i], conn.tracks[i+1:]...) return conn.pc.RemoveTrack(track.sender) } } return os.ErrNotExist } func replaceTracks(conn *rtpDownConnection, remote []conn.UpTrack, limitSid bool) (bool, error) { conn.mu.Lock() defer conn.mu.Unlock() var add []*rtpUpTrack var del []*rtpDownTrack outer: for _, rtrack := range remote { rt, ok := rtrack.(*rtpUpTrack) if !ok { return false, errUnexpectedTrackType } for _, track := range conn.tracks { rt2, ok := track.remote.(*rtpUpTrack) if !ok { return false, errUnexpectedTrackType } if rt == rt2 { continue outer } } add = append(add, rt) } outer2: for _, track := range conn.tracks { rt, ok := track.remote.(*rtpUpTrack) if !ok { return false, errUnexpectedTrackType } for _, rtrack := range remote { rt2, ok := rtrack.(*rtpUpTrack) if !ok { return false, errUnexpectedTrackType } if rt == rt2 { continue outer2 } } del = append(del, track) } defer func() { for _, t := range conn.tracks { layer := t.getLayerInfo() layer.limitSid = limitSid if limitSid { layer.wantedSid = 0 } t.setLayerInfo(layer) } }() if len(del) == 0 && len(add) == 0 { return false, nil } for _, t := range del { err := delDownTrackUnlocked(conn, t) if err != nil { return false, err } } for _, rt := range add { err := addDownTrackUnlocked(conn, rt) if err != nil { return false, err } } return true, nil } func negotiate(c *webClient, down *rtpDownConnection, restartIce bool, replace string) error { if down.pc.SignalingState() == webrtc.SignalingStateHaveLocalOffer { // avoid sending multiple offers back-to-back if restartIce { down.negotiationNeeded = negotiationRestartIce } else if down.negotiationNeeded == negotiationUnneeded { down.negotiationNeeded = negotiationNeeded } return nil } down.negotiationNeeded = negotiationUnneeded options := webrtc.OfferOptions{ICERestart: restartIce} offer, err := down.pc.CreateOffer(&options) if err != nil { return err } err = down.pc.SetLocalDescription(offer) if err != nil { return err } source, username := down.remote.User() return c.write(clientMessage{ Type: "offer", Id: down.id, Label: down.remote.Label(), Replace: replace, Source: source, Username: &username, SDP: down.pc.LocalDescription().SDP, }) } func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error { if candidate == nil { return nil } cand := candidate.ToJSON() return c.write(clientMessage{ Type: "ice", Id: id, Candidate: &cand, }) } func gotOffer(c *webClient, id, label string, sdp string, replace string) error { up, _, err := addUpConn(c, id, label, sdp) if err != nil { return err } if replace != "" { up.replace = replace delUpConn(c, replace, c.Id(), false) } err = up.pc.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: sdp, }) if err != nil { return err } answer, err := up.pc.CreateAnswer(nil) if err != nil { return err } err = up.pc.SetLocalDescription(answer) if err != nil { return err } err = up.flushICECandidates() if err != nil { log.Printf("ICE: %v", err) } return c.write(clientMessage{ Type: "answer", Id: id, SDP: up.pc.LocalDescription().SDP, }) } var ErrUnknownId = errors.New("unknown id") func gotAnswer(c *webClient, id string, sdp string) error { down := getDownConn(c, id) if down == nil { return ErrUnknownId } err := down.pc.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: sdp, }) if err != nil { return err } err = down.flushICECandidates() if err != nil { log.Printf("ICE: %v", err) } add := func() { down.pc.OnConnectionStateChange(nil) for _, t := range down.tracks { err := t.remote.AddLocal(t) if err != nil && err != os.ErrClosed { log.Printf("Add track: %v", err) } } } down.pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { if state == webrtc.PeerConnectionStateConnected { add() } }) if down.pc.ConnectionState() == webrtc.PeerConnectionStateConnected { add() } return nil } func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error { conn := getConn(c, id) if conn == nil { return errors.New("unknown id in ICE") } return conn.addICECandidate(candidate) } var errBadType = errors.New("bad type") func toStringArray(r interface{}) ([]string, error) { if r == nil { return nil, nil } rr, ok := r.([]interface{}) if !ok { return nil, errBadType } if rr == nil { return nil, nil } rrr := make([]string, len(rr)) for i, s := range rr { rrr[i], ok = s.(string) if !ok { return nil, errBadType } } return rrr, nil } func parseRequested(r interface{}) (map[string][]string, error) { if r == nil { return nil, nil } rr, ok := r.(map[string]interface{}) if !ok { return nil, errBadType } if rr == nil { return nil, nil } rrr := make(map[string][]string) for k, v := range rr { vv, err := toStringArray(v) if err != nil { return nil, err } rrr[k] = vv } return rrr, nil } func (c *webClient) setRequested(requested map[string][]string) error { if c.group == nil { return errors.New("attempted to request with no group joined") } c.requested = requested requestConns(c, c.group, "") return nil } func (c *webClient) setRequestedStream(down *rtpDownConnection, requested []string) error { var remoteClient group.Client remote, ok := down.remote.(*rtpUpConnection) if ok { remoteClient = remote.client } down.requested = requested return remoteClient.RequestConns(c, c.group, remote.id) } func (c *webClient) RequestConns(target group.Client, g *group.Group, id string) error { c.action(requestConnsAction{g, target, id}) return nil } func requestConns(target group.Client, g *group.Group, id string) { clients := g.GetClients(target) for _, c := range clients { c.RequestConns(target, g, id) } } func requestedTracks(c *webClient, requested []string, tracks []conn.UpTrack) ([]conn.UpTrack, bool) { if len(requested) == 0 { return nil, false } var audio, video, videoLow bool for _, s := range requested { switch s { case "audio": audio = true case "video": video = true case "video-low": videoLow = true default: log.Printf("client requested unknown value %v", s) } } find := func(kind webrtc.RTPCodecType, last bool) (conn.UpTrack, int) { var track conn.UpTrack count := 0 for _, t := range tracks { if t.Kind() != kind { continue } track = t count++ if !last { break } } return track, count } var ts []conn.UpTrack limitSid := false if audio { t, _ := find(webrtc.RTPCodecTypeAudio, false) if t != nil { ts = append(ts, t) } } if video { t, _ := find(webrtc.RTPCodecTypeVideo, false) if t != nil { ts = append(ts, t) } } else if videoLow { t, count := find(webrtc.RTPCodecTypeVideo, true) if t != nil { ts = append(ts, t) } if count < 2 { limitSid = true } } return ts, limitSid } func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, replace string) error { c.action(pushConnAction{g, id, up, tracks, replace}) return nil } func readMessage(conn *websocket.Conn, m *clientMessage) error { err := conn.SetReadDeadline(time.Now().Add(15 * time.Second)) if err != nil { return err } defer conn.SetReadDeadline(time.Time{}) return conn.ReadJSON(&m) } const protocolVersion = "2" func StartClient(conn *websocket.Conn, addr net.Addr) (err error) { var m clientMessage err = readMessage(conn, &m) if err != nil { conn.Close() return } if m.Type != "handshake" { conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage( websocket.CloseProtocolError, "you must handshake first", ), ) conn.Close() err = group.ProtocolError("client didn't handshake") return } versionError := true if m.Version != nil { for _, v := range m.Version { if v == protocolVersion { versionError = false } } } c := &webClient{ addr: addr, id: m.Id, actions: unbounded.New[any](), done: make(chan struct{}), } defer close(c.done) c.writeCh = make(chan interface{}, 100) c.writerDone = make(chan struct{}) go clientWriter(conn, c.writeCh, c.writerDone) defer func() { m, e := errorToWSCloseMessage(c.id, err) if isWSNormalError(err) { err = nil } else if _, ok := err.(group.KickError); ok { err = nil } if m != nil { c.write(*m) } c.close(e) }() return clientLoop(c, conn, versionError) } type pushConnAction struct { group *group.Group id string conn conn.Up tracks []conn.UpTrack replace string } type requestConnsAction struct { group *group.Group target group.Client id string } type connectionFailedAction struct { id string } type pushClientAction struct { group string kind string id string username string permissions []string data map[string]interface{} } type permissionsChangedAction struct{} type joinedAction struct { group string kind string } type kickAction struct { id string username *string message string } var errEmptyId = group.ProtocolError("empty id") func member(v string, l []string) bool { for _, w := range l { if v == w { return true } } return false } func remove(v string, l []string) []string { for i, w := range l { if v == w { l = append(l[:i], l[i+1:]...) return l } } return l } func addnew(v string, l []string) []string { if member(v, l) { return l } l = append(l, v) return l } func clientLoop(c *webClient, ws *websocket.Conn, versionError bool) error { read := make(chan interface{}, 1) go clientReader(ws, read, c.done) defer leaveGroup(c) readTime := time.Now() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() err := c.write(clientMessage{ Type: "handshake", Version: []string{protocolVersion}, }) if err != nil { return err } if versionError { c.write(clientMessage{ Type: "usermessage", Kind: "warning", Dest: c.id, Privileged: true, Value: "This client is using an unknown protocol version.\n" + "Perhaps it needs upgrading?\n" + "Trying to continue, things may break.", }) } for { select { case m, ok := <-read: if !ok { return errors.New("reader died") } switch m := m.(type) { case clientMessage: readTime = time.Now() err := handleClientMessage(c, m) if err != nil { return err } case error: return m } case <-c.actions.Ch: actions := c.actions.Get() for _, a := range actions { err := handleAction(c, a) if err != nil { return err } } case <-ticker.C: if time.Since(readTime) > 45*time.Second { return errors.New("client is dead") } // Some reverse proxies timeout connexions at 60 // seconds, make sure we generate some activity if time.Since(readTime) > 20*time.Second { err := c.write(clientMessage{ Type: "ping", }) if err != nil { return err } } } } } func pushDownConn(c *webClient, id string, up conn.Up, tracks []conn.UpTrack, replace string) error { var requested []conn.UpTrack limitSid := false if up != nil { var old *rtpDownConnection if replace != "" { old = getDownConn(c, replace) } else { old = getDownConn(c, up.Id()) } var req []string if old != nil { req = old.requested } if req == nil { var ok bool req, ok = c.requested[up.Label()] if !ok { req = c.requested[""] } } requested, limitSid = requestedTracks(c, req, tracks) } if replace != "" { err := delDownConn(c, replace) if err != nil { log.Printf("Replace: %v", err) } } // closes over replace, which will be modified below defer func() { if replace != "" { closeDownConn(c, replace, "") } }() if len(requested) == 0 { closeDownConn(c, id, "") return nil } down, _, err := addDownConn(c, up) if err != nil { if errors.Is(err, os.ErrClosed) { return nil } return err } done, err := replaceTracks(down, requested, limitSid) if err != nil || !done { return err } err = negotiate(c, down, false, replace) if err != nil { log.Printf("Negotiation failed: %v", err) closeDownConn(c, down.id, err.Error()) return err } replace = "" return nil } func handleAction(c *webClient, a any) error { switch a := a.(type) { case pushConnAction: if c.group == nil || c.group != a.group { log.Printf("Got connectsions for wrong group") return nil } return pushDownConn(c, a.id, a.conn, a.tracks, a.replace) case requestConnsAction: g := c.group if g == nil || a.group != g { log.Printf("Misdirected pushConns") return nil } for _, u := range c.up { if a.id != "" && a.id != u.id { continue } tracks := u.getTracks() replace := u.getReplace(false) ts := make([]conn.UpTrack, len(tracks)) for i, t := range tracks { ts[i] = t } err := a.target.PushConn(g, u.id, u, ts, replace) if err != nil { log.Printf("PushConn: %v", err) } } case connectionFailedAction: if down := getDownConn(c, a.id); down != nil { err := negotiate(c, down, true, "") if err != nil { return err } tracks := make( []conn.UpTrack, len(down.tracks), ) for i, t := range down.tracks { tracks[i] = t.remote } c.PushConn( c.group, down.remote.Id(), down.remote, tracks, "", ) } else if up := getUpConn(c, a.id); up != nil { c.write(clientMessage{ Type: "renegotiate", Id: a.id, }) } else { log.Printf("Attempting to renegotiate " + "unknown connection") } case pushClientAction: if a.group != c.group.Name() { log.Printf("got client for wrong group") return nil } perms := append([]string(nil), a.permissions...) username := a.username return c.write(clientMessage{ Type: "user", Kind: a.kind, Id: a.id, Username: &username, Permissions: perms, Data: a.data, }) case joinedAction: var status *group.Status var data map[string]interface{} var g *group.Group if a.group != "" { g = group.Get(a.group) if g != nil { s := g.Status(true, nil) status = &s data = g.Data() } } perms := append([]string(nil), c.permissions...) username := c.username err := c.write(clientMessage{ Type: "joined", Kind: a.kind, Group: a.group, Username: &username, Permissions: perms, Status: status, Data: data, RTCConfiguration: ice.ICEConfiguration(), }) if err != nil { return err } if a.kind == "join" { if g == nil { log.Println("g is null when joining" + "this shouldn't happen") return nil } h := g.GetChatHistory() for _, m := range h { err := c.write(clientMessage{ Type: "chathistory", Id: m.Id, Source: m.Source, Username: m.User, Time: m.Time.Format(time.RFC3339), Value: m.Value, Kind: m.Kind, }) if err != nil { return err } } } case permissionsChangedAction: g := c.Group() if g == nil { return errors.New("Permissions changed in no group") } perms := append([]string(nil), c.permissions...) status := g.Status(true, nil) username := c.username c.write(clientMessage{ Type: "joined", Kind: "change", Group: g.Name(), Username: &username, Permissions: perms, Status: &status, RTCConfiguration: ice.ICEConfiguration(), }) if !member("present", c.permissions) { up := getUpConns(c) for _, u := range up { err := delUpConn( c, u.id, c.id, true, ) if err == nil { failUpConnection( c, u.id, "permission denied", ) } } } id := c.Id() user := c.Username() d := c.Data() clients := g.GetClients(nil) go func(clients []group.Client) { for _, cc := range clients { cc.PushClient( g.Name(), "change", id, user, perms, d, ) } }(clients) case kickAction: return group.KickError{ a.id, a.username, a.message, } default: log.Printf("unexpected action %T", a) return errors.New("unexpected action") } return nil } func failUpConnection(c *webClient, id string, message string) error { if id != "" { err := c.write(clientMessage{ Type: "abort", Id: id, }) if err != nil { return err } } if message != "" { err := c.error(group.UserError(message)) if err != nil { return err } } return nil } func leaveGroup(c *webClient) { if c.group == nil { return } if c.up != nil { for id := range c.up { delUpConn(c, id, c.id, true) } } if c.down != nil { for id := range c.down { delDownConn(c, id) } } group.DelClient(c) c.permissions = nil c.data = nil c.requested = make(map[string][]string) c.group = nil } func closeDownConn(c *webClient, id string, message string) error { err := delDownConn(c, id) if err != nil && !errors.Is(err, os.ErrNotExist) { log.Printf("Close down connection: %v", err) } err = c.write(clientMessage{ Type: "close", Id: id, }) if err != nil { return err } if message != "" { err := c.error(group.UserError(message)) if err != nil { return err } } return nil } func setPermissions(g *group.Group, id string, perm string) error { client := g.GetClient(id) if client == nil { return group.UserError("no such user") } c, ok := client.(*webClient) if !ok { return group.UserError("this is not a real user") } switch perm { case "op": c.permissions = addnew("op", c.permissions) if g.Description().AllowRecording { c.permissions = addnew("record", c.permissions) } case "unop": c.permissions = remove("op", c.permissions) c.permissions = remove("record", c.permissions) case "present": c.permissions = addnew("present", c.permissions) case "unpresent": c.permissions = remove("present", c.permissions) case "shutup": c.permissions = remove("message", c.permissions) case "unshutup": c.permissions = addnew("message", c.permissions) default: return group.UserError("unknown permission") } c.action(permissionsChangedAction{}) return nil } func (c *webClient) Kick(id string, user *string, message string) error { c.action(kickAction{id, user, message}) return nil } func (c *webClient) Joined(group, kind string) error { c.action(joinedAction{group, kind}) return nil } func kickClient(g *group.Group, id string, user *string, dest string, message string) error { client := g.GetClient(dest) if client == nil { return group.UserError("no such user") } return client.Kick(id, user, message) } func handleClientMessage(c *webClient, m clientMessage) error { if m.Source != "" { if m.Source != c.Id() { return group.ProtocolError("spoofed client id") } } if m.Type != "join" { if m.Username != nil { if *m.Username != c.Username() { return group.ProtocolError("spoofed username") } } } switch m.Type { case "join": if m.Kind == "leave" { if c.group == nil || c.group.Name() != m.Group { return group.UserError("you are not joined") } leaveGroup(c) return nil } if m.Kind != "join" { return group.ProtocolError("unknown kind") } if c.group != nil { return group.ProtocolError( "cannot join multiple groups", ) } c.data = m.Data g, err := group.AddClient(m.Group, c, group.ClientCredentials{ Username: m.Username, Password: m.Password, Token: m.Token, }, ) if err != nil { var e, s string var autherr *group.NotAuthorisedError if errors.Is(err, token.ErrUsernameRequired) { s = err.Error() e = "need-username" } else if errors.Is(err, group.ErrDuplicateUsername) { s = err.Error() e = "duplicate-username" } else if errors.As(err, &autherr) { s = "not authorised" time.Sleep(200 * time.Millisecond) log.Printf("Join group: %v", err) } else if errors.Is(err, os.ErrNotExist) { s = "group does not exist" } else if _, ok := err.(group.UserError); ok { s = err.Error() } else { s = "internal server error" log.Printf("Join group: %v", err) } username := c.username return c.write(clientMessage{ Type: "joined", Kind: "fail", Error: e, Group: m.Group, Username: &username, Value: s, }) } if redirect := g.Description().Redirect; redirect != "" { // We normally redirect at the HTTP level, but the group // description could have been edited in the meantime. username := c.username return c.write(clientMessage{ Type: "joined", Kind: "redirect", Group: m.Group, Username: &username, Value: redirect, }) } c.group = g case "request": requested, err := parseRequested(m.Request) if err != nil { return err } return c.setRequested(requested) case "requestStream": down := getDownConn(c, m.Id) if down == nil { return ErrUnknownId } requested, err := toStringArray(m.Request) if err != nil { return err } c.setRequestedStream(down, requested) case "offer": if m.Id == "" { return errEmptyId } if !member("present", c.permissions) { if m.Replace != "" { delUpConn(c, m.Replace, c.id, true) } c.write(clientMessage{ Type: "abort", Id: m.Id, }) return c.error(group.UserError("not authorised")) } err := gotOffer(c, m.Id, m.Label, m.SDP, m.Replace) if err != nil { log.Printf("gotOffer: %v", err) return failUpConnection(c, m.Id, err.Error()) } case "answer": if m.Id == "" { return errEmptyId } err := gotAnswer(c, m.Id, m.SDP) if err != nil { log.Printf("gotAnswer: %v", err) message := "" if err != ErrUnknownId { message = err.Error() } return closeDownConn(c, m.Id, message) } down := getDownConn(c, m.Id) if down == nil { return ErrUnknownId } if down.negotiationNeeded > negotiationUnneeded { err := negotiate( c, down, down.negotiationNeeded == negotiationRestartIce, "", ) if err != nil { return closeDownConn(c, m.Id, err.Error()) } } case "renegotiate": if m.Id == "" { return errEmptyId } down := getDownConn(c, m.Id) if down != nil { err := negotiate(c, down, true, "") if err != nil { return closeDownConn(c, m.Id, err.Error()) } } else { log.Printf("Trying to renegotiate unknown connection") } case "close": if m.Id == "" { return errEmptyId } err := delUpConn(c, m.Id, c.id, true) if err != nil { log.Printf("Deleting up connection %v: %v", m.Id, err) return nil } case "abort": if m.Id == "" { return errEmptyId } return closeDownConn(c, m.Id, "") case "ice": if m.Id == "" { return errEmptyId } if m.Candidate == nil { return group.ProtocolError("null candidate") } err := gotICE(c, m.Candidate, m.Id) if err != nil { log.Printf("ICE: %v", err) } case "chat", "usermessage": g := c.group if g == nil { return c.error(group.UserError("join a group first")) } required := "message" if m.Type == "chat" && m.Kind == "caption" { required = "caption" } if !member(required, c.permissions) { return c.error(group.UserError("not authorised")) } id := m.Id if m.Type == "chat" && m.Dest == "" && id == "" { buf := make([]byte, 8) crand.Read(buf) id = base64.RawURLEncoding.EncodeToString(buf) } now := time.Now() if m.Type == "chat" { if m.Dest == "" { g.AddToChatHistory( id, m.Source, m.Username, now, m.Kind, m.Value, ) } } mm := clientMessage{ Type: m.Type, Id: id, Source: m.Source, Dest: m.Dest, Username: m.Username, Privileged: member("op", c.permissions), Time: now.Format(time.RFC3339), Kind: m.Kind, NoEcho: m.NoEcho, Value: m.Value, } if m.Dest == "" { var except group.Client if m.NoEcho { except = c } err := broadcast(g.GetClients(except), mm) if err != nil { log.Printf("broadcast(chat): %v", err) } } else { cc := g.GetClient(m.Dest) if cc == nil { return c.error(group.UserError("user unknown")) } ccc, ok := cc.(*webClient) if !ok { return c.error(group.UserError( "this user doesn't chat", )) } ccc.write(mm) } case "groupaction": g := c.group if g == nil { return c.error(group.UserError("join a group first")) } switch m.Kind { case "clearchat": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } var id, userId string if m.Value != nil { value, ok := m.Value.(map[string]any) if !ok { return c.error(group.UserError( "bad value in clearchat", )) } id, _ = value["id"].(string) userId, _ = value["userId"].(string) if userId == "" && id != "" { return c.error(group.UserError( "bad value in clearchat", )) } } g.ClearChatHistory(id, userId) m := clientMessage{ Type: "usermessage", Kind: "clearchat", Value: m.Value, Privileged: true, } err := broadcast(g.GetClients(nil), m) if err != nil { log.Printf("broadcast(clearchat): %v", err) } case "lock", "unlock": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } message := "" v, ok := m.Value.(string) if ok { message = v } g.SetLocked(m.Kind == "lock", message) case "record": if !member("record", c.permissions) { return c.error(group.UserError("not authorised")) } for _, cc := range g.GetClients(c) { _, ok := cc.(*diskwriter.Client) if ok { return c.error(group.UserError("already recording")) } } disk := diskwriter.New(g) _, err := group.AddClient(g.Name(), disk, group.ClientCredentials{ System: true, }, ) if err != nil { disk.Close() return c.error(err) } requestConns(disk, c.group, "") case "unrecord": if !member("record", c.permissions) { return c.error(group.UserError("not authorised")) } for _, cc := range g.GetClients(c) { disk, ok := cc.(*diskwriter.Client) if ok { disk.Close() group.DelClient(disk) } } case "subgroups": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } s := "" for _, sg := range group.GetSubGroups(g.Name()) { plural := "" if sg.Clients > 1 { plural = "s" } s = s + fmt.Sprintf("%v (%v client%v)\n", sg.Name, sg.Clients, plural) } username := "Server" c.write(clientMessage{ Type: "chat", Dest: c.id, Username: &username, Time: time.Now().Format(time.RFC3339), Value: s, }) case "setdata": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } data, ok := m.Value.(map[string]interface{}) if !ok { return c.error(group.UserError( "Bad value in setdata", )) } g.UpdateData(data) case "maketoken": terror := func(e, m string) error { return c.write(clientMessage{ Type: "usermessage", Kind: "token", Privileged: true, Error: e, Value: m, }) } if !member("token", c.permissions) { return terror("not-authorised", "not authorised") } tok, err := parseStatefulToken(m.Value) if err != nil { return terror("error", err.Error()) } if tok.Token == "" { buf := make([]byte, 8) crand.Read(buf) tok.Token = base64.RawURLEncoding.EncodeToString(buf) } else { return terror("error", "client specified token") } if tok.Group != c.group.Name() { return terror("error", "wrong group in token") } if tok.Expires == nil { return terror("error", "token doesn't expire") } if tok.Username != nil && c.group.UserExists(*tok.Username) { return terror("error", "that username is taken") } for _, p := range tok.Permissions { if !member(p, c.permissions) { return terror( "not-authorised", "not authorised", ) } } user := c.username if user != "" { tok.IssuedBy = &user } now := time.Now().UTC() tok.IssuedAt = &now new, err := token.Update(tok, "") if err != nil { return terror("error", err.Error()) } c.write(clientMessage{ Type: "usermessage", Kind: "token", Privileged: true, Value: new, }) case "edittoken": terror := func(e, m string) error { return c.write(clientMessage{ Type: "usermessage", Kind: "token", Privileged: true, Error: e, Value: m, }) } if !member("op", c.permissions) || !member("token", c.permissions) { return terror("not-authorised", "not authorised") } tok, err := parseStatefulToken(m.Value) if err != nil { return terror("error", err.Error()) } if tok.Group != "" || tok.Username != nil || tok.Permissions != nil || tok.IssuedBy != nil || tok.IssuedAt != nil { return terror( "error", "this field cannot be edited", ) } old, etag, err := token.Get(tok.Token) if err != nil { return terror("error", err.Error()) } t := old.Clone() if tok.Expires != nil { t.Expires = tok.Expires } if tok.NotBefore != nil { t.NotBefore = tok.NotBefore } new, err := token.Update(t, etag) if err != nil { return terror("error", err.Error()) } c.write(clientMessage{ Type: "usermessage", Kind: "token", Privileged: true, Value: new, }) case "listtokens": terror := func(e, m string) error { return c.write(clientMessage{ Type: "usermessage", Kind: "tokenlist", Privileged: true, Error: e, Value: m, }) } if !member("op", c.permissions) || !member("token", c.permissions) { return terror("not-authorised", "not authorised") } tokens, _, err := token.List(c.group.Name()) if err != nil { return terror("error", err.Error()) } c.write(clientMessage{ Type: "usermessage", Kind: "tokenlist", Privileged: true, Value: tokens, }) default: return group.UserError("unknown group action") } case "useraction": g := c.group if g == nil { return c.error(group.UserError("join a group first")) } switch m.Kind { case "op", "unop", "present", "unpresent", "shutup", "unshutup": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } err := setPermissions(g, m.Dest, m.Kind) if err != nil { return c.error(err) } case "identify": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } d := g.GetClient(m.Dest) if d == nil { return c.error( group.UserError("client not found"), ) } value := make(map[string]any) value["id"] = d.Id() if username := d.Username(); username != "" { value["username"] = username } if addr := d.Addr(); addr != nil { value["address"] = addr.String() } type warner interface { Warn(bool, string) error } w, ok := d.(warner) if ok { w.Warn(false, "Your IP address has been "+ "communicated to user "+ c.Username()+".") } c.write(clientMessage{ Type: "usermessage", Kind: "userinfo", Privileged: true, Value: value, }) case "kick": if !member("op", c.permissions) { return c.error(group.UserError("not authorised")) } message := "" v, ok := m.Value.(string) if ok { message = v } err := kickClient(g, m.Source, m.Username, m.Dest, message) if err != nil { return c.error(err) } case "setdata": if m.Dest != c.Id() { return c.error(group.UserError("not authorised")) } data, ok := m.Value.(map[string]interface{}) if !ok { return c.error(group.UserError( "Bad value in setdata", )) } if c.data == nil { c.data = make(map[string]interface{}) } for k, v := range data { if v == nil { delete(c.data, k) } else { c.data[k] = v } } id := c.Id() user := c.Username() perms := c.Permissions() data = c.Data() go func(clients []group.Client) { for _, cc := range clients { cc.PushClient( g.Name(), "change", id, user, perms, data, ) } }(g.GetClients(nil)) default: return group.UserError("unknown user action") } case "pong": // nothing case "ping": return c.write(clientMessage{ Type: "pong", }) default: log.Printf("unexpected message: %v", m.Type) return group.ProtocolError("unexpected message") } return nil } func parseStatefulToken(value interface{}) (*token.Stateful, error) { data, ok := value.(map[string]interface{}) if !ok || data == nil { return nil, errors.New("bad token value") } parseString := func(key string) (*string, error) { v := data[key] if v == nil { return nil, nil } vv, ok := v.(string) if !ok { return nil, errors.New("bad string value") } return &vv, nil } parseStringList := func(key string) ([]string, error) { v := data[key] if v == nil { return nil, nil } vv, ok := v.([]interface{}) if !ok { return nil, errors.New("bad string list") } vvv := make([]string, 0, len(vv)) for _, s := range vv { ss, ok := s.(string) if !ok { return nil, errors.New("bad string list") } vvv = append(vvv, ss) } return vvv, nil } parseTime := func(key string) (*time.Time, error) { v := data[key] if v == nil { return nil, nil } switch v := v.(type) { case string: vv, err := time.Parse(time.RFC3339, v) if err != nil { return nil, errors.New("bad time value") } return &vv, nil case float64: // relative time vv := time.Now().Add(time.Duration(v) * time.Millisecond) return &vv, nil default: return nil, errors.New("bad time value") } } t, err := parseString("token") if err != nil { return nil, err } tt := "" if t != nil { tt = *t } u, err := parseString("username") if err != nil { return nil, err } g, err := parseString("group") if err != nil { return nil, err } gg := "" if g != nil { gg = *g } p, err := parseStringList("permissions") if err != nil { return nil, err } e, err := parseTime("expires") if err != nil { return nil, err } n, err := parseTime("not-before") if err != nil { return nil, err } return &token.Stateful{ Token: tt, Group: gg, Username: u, Permissions: p, Expires: e, NotBefore: n, }, nil } func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) { defer close(read) for { var m clientMessage err := conn.ReadJSON(&m) if err != nil { select { case read <- err: return case <-done: return } } select { case read <- m: case <-done: return } } } func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) { defer func() { close(done) conn.Close() }() for { m, ok := <-ch if !ok { break } err := conn.SetWriteDeadline( time.Now().Add(500 * time.Millisecond), ) if err != nil { return } switch m := m.(type) { case clientMessage: err := conn.WriteJSON(m) if err != nil { return } case []byte: err := conn.WriteMessage(websocket.TextMessage, m) if err != nil { return } case closeMessage: if m.data != nil { conn.WriteMessage( websocket.CloseMessage, m.data, ) } return default: log.Printf("clientWriter: unexpected message %T", m) return } } } func (c *webClient) Warn(oponly bool, message string) error { if oponly && !member("op", c.permissions) { return nil } return c.write(clientMessage{ Type: "usermessage", Kind: "warning", Dest: c.id, Privileged: true, Value: message, }) } var ErrClientDead = errors.New("client is dead") func (c *webClient) action(a interface{}) { c.actions.Put(a) } func (c *webClient) write(m clientMessage) error { select { case c.writeCh <- m: return nil case <-c.writerDone: return ErrClientDead } } func broadcast(cs []group.Client, m clientMessage) error { b, err := json.Marshal(m) if err != nil { return err } for _, c := range cs { cc, ok := c.(*webClient) if !ok { continue } select { case cc.writeCh <- b: case <-cc.writerDone: } } return nil } func (c *webClient) close(data []byte) error { select { case c.writeCh <- closeMessage{data}: return nil case <-c.writerDone: return ErrClientDead } } func errorMessage(id string, err error) *clientMessage { switch e := err.(type) { case group.UserError: return &clientMessage{ Type: "usermessage", Kind: "error", Dest: id, Privileged: true, Value: e.Error(), } case group.KickError: message := e.Message if message == "" { message = "you have been kicked out" } return &clientMessage{ Type: "usermessage", Kind: "kicked", Id: e.Id, Username: e.Username, Dest: id, Privileged: true, Value: message, } default: return nil } } func (c *webClient) error(err error) error { m := errorMessage(c.id, err) if m == nil { return err } return c.write(*m) }