1
Fork 0
galene/rtpconn/webclient.go

1254 lines
25 KiB
Go
Raw Normal View History

package rtpconn
2020-04-24 19:38:21 +02:00
import (
"encoding/json"
"errors"
"log"
"os"
"sync"
"time"
"github.com/gorilla/websocket"
2020-07-16 20:17:32 +02:00
"github.com/pion/webrtc/v3"
2020-04-24 19:38:21 +02:00
2020-09-13 11:04:16 +02:00
"sfu/conn"
2020-09-13 14:12:00 +02:00
"sfu/disk"
2020-09-13 11:04:16 +02:00
"sfu/estimator"
"sfu/group"
2020-04-24 19:38:21 +02:00
)
func errorToWSCloseMessage(err error) (string, []byte) {
2020-04-24 19:38:21 +02:00
var code int
var text string
switch e := err.(type) {
case *websocket.CloseError:
code = websocket.CloseNormalClosure
case group.ProtocolError:
2020-04-24 19:38:21 +02:00
code = websocket.CloseProtocolError
text = string(e)
case group.UserError:
code = websocket.CloseNormalClosure
text = string(e)
2020-04-24 19:38:21 +02:00
default:
code = websocket.CloseInternalServerErr
}
return text, websocket.FormatCloseMessage(code, text)
2020-04-24 19:38:21 +02:00
}
func isWSNormalError(err error) bool {
return websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway)
}
2020-05-27 11:44:49 +02:00
type webClient struct {
group *group.Group
2020-05-27 11:44:49 +02:00
id string
credentials group.ClientCredentials
permissions group.ClientPermissions
2020-05-27 11:44:49 +02:00
requested map[string]uint32
done chan struct{}
writeCh chan interface{}
writerDone chan struct{}
actionCh chan interface{}
mu sync.Mutex
down map[string]*rtpDownConnection
2020-06-08 22:14:28 +02:00
up map[string]*rtpUpConnection
2020-05-27 11:44:49 +02:00
}
func (c *webClient) Group() *group.Group {
2020-05-28 02:35:09 +02:00
return c.group
}
2020-06-08 22:14:28 +02:00
func (c *webClient) Id() string {
2020-05-28 02:35:09 +02:00
return c.id
}
func (c *webClient) Credentials() group.ClientCredentials {
return c.credentials
2020-05-28 02:35:09 +02:00
}
func (c *webClient) SetPermissions(perms group.ClientPermissions) {
c.permissions = perms
}
func (c *webClient) OverridePermissions(g *group.Group) bool {
return false
}
func (c *webClient) PushClient(id, username string, add bool) error {
2020-08-12 13:51:31 +02:00
kind := "add"
if !add {
kind = "delete"
}
return c.write(clientMessage{
Type: "user",
2020-08-12 13:51:31 +02:00
Kind: kind,
Id: id,
Username: username,
})
}
type rateMap map[string]uint32
func (v *rateMap) UnmarshalJSON(b []byte) error {
var m map[string]interface{}
err := json.Unmarshal(b, &m)
if err != nil {
return err
}
n := make(map[string]uint32, len(m))
for k, w := range m {
switch w := w.(type) {
case bool:
if w {
n[k] = ^uint32(0)
} else {
n[k] = 0
}
case float64:
if w < 0 || w >= float64(^uint32(0)) {
return errors.New("overflow")
}
n[k] = uint32(w)
default:
return errors.New("unexpected type in JSON map")
}
}
*v = n
return nil
}
func (v rateMap) MarshalJSON() ([]byte, error) {
m := make(map[string]interface{}, len(v))
for k, w := range v {
switch w {
case 0:
m[k] = false
case ^uint32(0):
m[k] = true
default:
m[k] = w
}
}
return json.Marshal(m)
}
2020-04-24 19:38:21 +02:00
type clientMessage struct {
Type string `json:"type"`
2020-08-12 13:56:35 +02:00
Kind string `json:"kind,omitempty"`
Id string `json:"id,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Permissions group.ClientPermissions `json:"permissions,omitempty"`
Group string `json:"group,omitempty"`
Value string `json:"value,omitempty"`
2020-09-30 00:33:23 +02:00
Time uint64 `json:"time,omitempty"`
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Request rateMap `json:"request,omitempty"`
2020-04-24 19:38:21 +02:00
}
2020-09-30 00:33:23 +02:00
func fromJSTime(tm uint64) time.Time {
if tm == 0 {
return time.Time{}
}
return time.Unix(int64(tm)/1000, (int64(tm)%1000) * 1000000)
}
func toJSTime(tm time.Time) uint64 {
if tm.Before(time.Unix(0, 0)) {
return 0
}
return uint64((tm.Sub(time.Unix(0, 0)) + time.Millisecond / 2) / time.Millisecond)
}
2020-04-24 19:38:21 +02:00
type closeMessage struct {
data []byte
}
2020-06-08 22:14:28 +02:00
func getUpConn(c *webClient, id string) *rtpUpConnection {
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.up == nil {
return nil
}
2020-06-08 22:14:28 +02:00
return c.up[id]
2020-04-24 19:38:21 +02:00
}
2020-06-08 22:14:28 +02:00
func getUpConns(c *webClient) []*rtpUpConnection {
c.mu.Lock()
defer c.mu.Unlock()
2020-06-08 22:14:28 +02:00
up := make([]*rtpUpConnection, 0, len(c.up))
for _, u := range c.up {
up = append(up, u)
}
return up
}
func addUpConn(c *webClient, id string) (*rtpUpConnection, bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.up == nil {
2020-06-08 22:14:28 +02:00
c.up = make(map[string]*rtpUpConnection)
}
if c.down != nil && c.down[id] != nil {
return nil, false, errors.New("Adding duplicate connection")
}
old := c.up[id]
if old != nil {
return old, false, nil
}
conn, err := newUpConn(c, id)
if err != nil {
return nil, false, err
}
c.up[id] = conn
2020-06-08 22:14:28 +02:00
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
2020-04-24 19:38:21 +02:00
sendICE(c, id, candidate)
})
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
if state == webrtc.ICEConnectionStateFailed {
c.action(connectionFailedAction{id: id})
}
})
return conn, true, nil
}
2020-04-24 19:38:21 +02:00
2020-05-27 11:44:49 +02:00
func delUpConn(c *webClient, id string) bool {
c.mu.Lock()
2020-04-24 19:38:21 +02:00
if c.up == nil {
c.mu.Unlock()
return false
2020-04-24 19:38:21 +02:00
}
conn := c.up[id]
if conn == nil {
c.mu.Unlock()
return false
2020-04-24 19:38:21 +02:00
}
delete(c.up, id)
c.mu.Unlock()
2020-04-24 19:38:21 +02:00
go func(clients []group.Client) {
for _, c := range clients {
c.PushConn(conn.id, nil, nil, "")
}
}(c.Group().GetClients(c))
conn.pc.Close()
return true
}
2020-05-27 11:44:49 +02:00
func getDownConn(c *webClient, id string) *rtpDownConnection {
2020-04-24 19:38:21 +02:00
if c.down == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
conn := c.down[id]
if conn == nil {
return nil
}
return conn
}
2020-05-27 11:44:49 +02:00
func getConn(c *webClient, id string) iceConnection {
up := getUpConn(c, id)
if up != nil {
return up
}
down := getDownConn(c, id)
if down != nil {
return down
}
return nil
}
2020-09-13 11:04:16 +02:00
func addDownConn(c *webClient, id string, remote conn.Up) (*rtpDownConnection, error) {
2020-09-13 13:01:06 +02:00
conn, err := newDownConn(c, id, remote)
2020-04-24 19:38:21 +02:00
if err != nil {
return nil, err
}
err = addDownConnHelper(c, conn, remote)
if err != nil {
conn.pc.Close()
return nil, err
}
return conn, err
}
func addDownConnHelper(c *webClient, conn *rtpDownConnection, remote conn.Up) (error) {
2020-06-08 22:14:28 +02:00
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.up != nil && c.up[conn.id] != nil {
return errors.New("Adding duplicate connection")
}
2020-04-24 19:38:21 +02:00
if c.down == nil {
c.down = make(map[string]*rtpDownConnection)
2020-04-24 19:38:21 +02:00
}
2020-05-23 02:22:43 +02:00
old := c.down[conn.id]
if old != nil {
// Avoid calling Close under a lock
go old.pc.Close()
2020-04-24 19:38:21 +02:00
}
2020-06-08 22:14:28 +02:00
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
sendICE(c, conn.id, candidate)
2020-06-08 22:14:28 +02:00
})
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
if state == webrtc.ICEConnectionStateFailed {
c.action(connectionFailedAction{id: conn.id})
}
})
err := remote.AddLocal(conn)
2020-05-30 03:36:15 +02:00
if err != nil {
return err
2020-05-30 03:36:15 +02:00
}
2020-05-23 02:22:43 +02:00
c.down[conn.id] = conn
return nil
2020-04-24 19:38:21 +02:00
}
2020-05-27 11:44:49 +02:00
func delDownConn(c *webClient, id string) bool {
conn := delDownConnHelper(c, id)
if conn != nil {
conn.pc.Close()
return true
}
return false
}
func delDownConnHelper(c *webClient, id string) *rtpDownConnection {
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.down == nil {
return nil
2020-04-24 19:38:21 +02:00
}
2020-04-25 02:25:51 +02:00
conn := c.down[id]
2020-04-24 19:38:21 +02:00
if conn == nil {
return nil
2020-04-24 19:38:21 +02:00
}
2020-09-13 11:04:16 +02:00
conn.remote.DelLocal(conn)
for _, track := range conn.tracks {
// we only insert the track after we get an answer, so
// ignore errors here.
2020-09-13 11:04:16 +02:00
track.remote.DelLocal(track)
}
2020-04-24 19:38:21 +02:00
delete(c.down, id)
return conn
2020-04-24 19:38:21 +02:00
}
2020-09-13 11:04:16 +02:00
func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrack, remoteConn conn.Up) (*webrtc.RTPSender, error) {
2020-06-08 22:14:28 +02:00
var pt uint8
var ssrc uint32
var id, label string
switch rt := remoteTrack.(type) {
case *rtpUpTrack:
pt = rt.track.PayloadType()
ssrc = rt.track.SSRC()
id = rt.track.ID()
label = rt.track.Label()
default:
return nil, errors.New("not implemented yet")
}
local, err := conn.pc.NewTrack(pt, ssrc, id, label)
2020-04-24 19:38:21 +02:00
if err != nil {
return nil, err
2020-04-24 19:38:21 +02:00
}
s, err := conn.pc.AddTrack(local)
if err != nil {
return nil, err
}
2020-04-26 01:33:18 +02:00
track := &rtpDownTrack{
track: local,
remote: remoteTrack,
maxBitrate: new(bitrate),
stats: new(receiverStats),
rate: estimator.New(time.Second),
2020-04-28 15:26:50 +02:00
}
conn.tracks = append(conn.tracks, track)
2020-05-07 10:29:48 +02:00
go rtcpDownListener(conn, track, s)
2020-04-24 19:38:21 +02:00
return s, nil
2020-04-24 19:38:21 +02:00
}
func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bool) error {
options := webrtc.OfferOptions{ICERestart: restartIce}
offer, err := down.pc.CreateOffer(&options)
2020-04-24 19:38:21 +02:00
if err != nil {
return err
2020-04-24 19:38:21 +02:00
}
err = down.pc.SetLocalDescription(offer)
2020-04-24 19:38:21 +02:00
if err != nil {
return err
}
labels := make(map[string]string)
for _, t := range down.pc.GetTransceivers() {
var track *webrtc.Track
if t.Sender() != nil {
track = t.Sender().Track()
}
if track == nil {
continue
}
for _, tr := range down.tracks {
if tr.track == track {
2020-06-08 22:14:28 +02:00
labels[t.Mid()] = tr.remote.Label()
}
}
2020-04-24 19:38:21 +02:00
}
2020-08-12 13:56:35 +02:00
kind := ""
if renegotiate {
kind = "renegotiate"
}
2020-04-24 19:38:21 +02:00
return c.write(clientMessage{
2020-08-12 13:56:35 +02:00
Type: "offer",
Kind: kind,
Id: down.id,
Offer: &offer,
Labels: labels,
2020-04-24 19:38:21 +02:00
})
}
2020-05-27 11:44:49 +02:00
func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error {
2020-04-24 19:38:21 +02:00
if candidate == nil {
return nil
}
cand := candidate.ToJSON()
return c.write(clientMessage{
Type: "ice",
Id: id,
Candidate: &cand,
})
}
func gotOffer(c *webClient, id string, offer webrtc.SessionDescription, renegotiate bool, labels map[string]string) error {
if !renegotiate {
// unless the client indicates that this is a compatible
// renegotiation, tear down the existing connection.
delUpConn(c, id)
}
up, isnew, err := addUpConn(c, id)
if err != nil {
return err
2020-04-24 19:38:21 +02:00
}
if u := c.Credentials().Username; u != "" {
up.label = u
2020-04-24 19:38:21 +02:00
}
err = up.pc.SetRemoteDescription(offer)
if err != nil {
if renegotiate && !isnew {
// create a new PC from scratch
log.Printf("SetRemoteDescription(offer): %v", err)
return gotOffer(c, id, offer, false, labels)
}
2020-04-24 19:38:21 +02:00
return err
}
answer, err := up.pc.CreateAnswer(nil)
if err != nil {
return err
}
err = up.pc.SetLocalDescription(answer)
if err != nil {
return err
}
up.labels = labels
2020-05-22 23:07:38 +02:00
err = up.flushICECandidates()
if err != nil {
log.Printf("ICE: %v", err)
}
2020-04-24 19:38:21 +02:00
return c.write(clientMessage{
Type: "answer",
Id: id,
Answer: &answer,
})
}
var ErrUnknownId = errors.New("unknown id")
2020-05-27 11:44:49 +02:00
func gotAnswer(c *webClient, id string, answer webrtc.SessionDescription) error {
2020-05-22 23:07:38 +02:00
down := getDownConn(c, id)
if down == nil {
return ErrUnknownId
2020-04-24 19:38:21 +02:00
}
2020-05-22 23:07:38 +02:00
err := down.pc.SetRemoteDescription(answer)
2020-04-24 19:38:21 +02:00
if err != nil {
return err
}
2020-05-21 00:55:00 +02:00
2020-05-22 23:07:38 +02:00
err = down.flushICECandidates()
if err != nil {
log.Printf("ICE: %v", err)
}
for _, t := range down.tracks {
2020-09-13 11:04:16 +02:00
t.remote.AddLocal(t)
2020-05-21 00:55:00 +02:00
}
2020-04-24 19:38:21 +02:00
return nil
}
2020-05-27 11:44:49 +02:00
func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error {
conn := getConn(c, id)
if conn == nil {
return errors.New("unknown id in ICE")
2020-04-24 19:38:21 +02:00
}
2020-05-22 23:07:38 +02:00
return conn.addICECandidate(candidate)
2020-04-24 19:38:21 +02:00
}
2020-05-27 11:44:49 +02:00
func (c *webClient) setRequested(requested map[string]uint32) error {
2020-05-09 19:39:34 +02:00
if c.down != nil {
for id := range c.down {
c.write(clientMessage{
Type: "close",
Id: id,
})
delDownConn(c, id)
}
}
c.requested = requested
2020-05-09 19:39:34 +02:00
2020-05-30 00:23:54 +02:00
go pushConns(c)
2020-05-09 19:39:34 +02:00
return nil
}
func pushConns(c group.Client) {
clients := c.Group().GetClients(c)
2020-05-30 00:23:54 +02:00
for _, cc := range clients {
ccc, ok := cc.(*webClient)
if ok {
ccc.action(pushConnsAction{c})
}
}
}
2020-05-27 11:44:49 +02:00
func (c *webClient) isRequested(label string) bool {
return c.requested[label] != 0
2020-05-09 19:39:34 +02:00
}
2020-09-13 11:04:16 +02:00
func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rtpDownConnection, error) {
requested := false
for _, t := range tracks {
2020-06-08 22:14:28 +02:00
if c.isRequested(t.Label()) {
requested = true
break
}
}
if !requested {
return nil, nil
}
2020-06-08 22:14:28 +02:00
down, err := addDownConn(c, remote.Id(), remote)
if err != nil {
return nil, err
}
for _, t := range tracks {
2020-06-08 22:14:28 +02:00
if !c.isRequested(t.Label()) {
continue
}
_, err = addDownTrack(c, down, t, remote)
if err != nil {
delDownConn(c, down.id)
return nil, err
}
}
2020-05-31 23:35:56 +02:00
go rtcpDownSender(down)
return down, nil
}
func (c *webClient) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error {
2020-09-13 11:04:16 +02:00
err := c.action(pushConnAction{id, up, tracks})
2020-05-28 02:35:09 +02:00
if err != nil {
return err
}
2020-09-13 11:04:16 +02:00
if up != nil && label != "" {
err := c.action(addLabelAction{up.Id(), up.Label()})
2020-05-28 02:35:09 +02:00
if err != nil {
return err
}
}
2020-05-28 02:35:09 +02:00
return nil
}
func StartClient(conn *websocket.Conn) (err error) {
2020-06-08 22:14:28 +02:00
var m clientMessage
err = conn.SetReadDeadline(time.Now().Add(15 * time.Second))
if err != nil {
conn.Close()
return
}
err = conn.ReadJSON(&m)
if err != nil {
conn.Close()
return
}
err = conn.SetReadDeadline(time.Time{})
if err != nil {
conn.Close()
return
}
if m.Type != "login" {
conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(
websocket.CloseProtocolError,
"you must login first",
),
)
2020-06-08 22:14:28 +02:00
conn.Close()
return
}
c := &webClient{
id: m.Id,
credentials: group.ClientCredentials{
m.Username,
m.Password,
},
2020-06-08 22:14:28 +02:00
actionCh: make(chan interface{}, 10),
done: make(chan struct{}),
}
defer close(c.done)
c.writeCh = make(chan interface{}, 25)
defer func() {
if isWSNormalError(err) {
err = nil
c.close(nil)
2020-06-08 22:14:28 +02:00
} else {
m, e := errorToWSCloseMessage(err)
if m != "" {
c.write(clientMessage{
2020-08-12 12:17:56 +02:00
Type: "usermessage",
Kind: "error",
2020-06-08 22:14:28 +02:00
Value: m,
})
}
c.close(e)
2020-06-08 22:14:28 +02:00
}
}()
c.writerDone = make(chan struct{})
go clientWriter(conn, c.writeCh, c.writerDone)
err = conn.ReadJSON(&m)
if err != nil {
return err
}
if m.Type != "join" {
return group.ProtocolError("you must join a group first")
}
g, err := group.AddClient(m.Group, c)
2020-06-08 22:14:28 +02:00
if err != nil {
2020-09-10 13:39:38 +02:00
if os.IsNotExist(err) {
err = group.UserError("group does not exist")
2020-09-10 13:39:38 +02:00
}
2020-06-08 22:14:28 +02:00
return
}
if redirect := g.Redirect(); redirect != "" {
2020-09-10 13:55:57 +02:00
// We normally redirect at the HTTP level, but the group
// description could have been edited in the meantime.
err = group.UserError("group is now at " + redirect)
2020-09-10 13:55:57 +02:00
return
}
2020-06-08 22:14:28 +02:00
c.group = g
defer group.DelClient(c)
2020-06-08 22:14:28 +02:00
return clientLoop(c, conn)
}
type pushConnAction struct {
id string
2020-09-13 11:04:16 +02:00
conn conn.Up
tracks []conn.UpTrack
}
type addLabelAction struct {
id string
label string
}
type pushConnsAction struct {
c group.Client
}
type connectionFailedAction struct {
id string
}
type permissionsChangedAction struct{}
type kickAction struct {
message string
}
2020-09-13 11:04:16 +02:00
func clientLoop(c *webClient, ws *websocket.Conn) error {
2020-04-24 19:38:21 +02:00
read := make(chan interface{}, 1)
2020-09-13 11:04:16 +02:00
go clientReader(ws, read, c.done)
2020-04-24 19:38:21 +02:00
defer func() {
c.setRequested(map[string]uint32{})
2020-04-24 19:38:21 +02:00
if c.up != nil {
for id := range c.up {
delUpConn(c, id)
}
}
}()
2020-04-25 02:25:51 +02:00
c.write(clientMessage{
Type: "permissions",
Permissions: c.permissions,
})
h := c.group.GetChatHistory()
2020-04-25 21:16:49 +02:00
for _, m := range h {
err := c.write(clientMessage{
Type: "chat",
Id: m.Id,
Username: m.User,
2020-09-30 00:33:23 +02:00
Time: m.Time,
Value: m.Value,
Kind: m.Kind,
2020-04-25 21:16:49 +02:00
})
if err != nil {
return err
}
}
readTime := time.Now()
ticker := time.NewTicker(10 * time.Second)
2020-04-24 19:38:21 +02:00
defer ticker.Stop()
for {
select {
case m, ok := <-read:
if !ok {
return errors.New("reader died")
}
switch m := m.(type) {
case clientMessage:
readTime = time.Now()
2020-04-24 19:38:21 +02:00
err := handleClientMessage(c, m)
if err != nil {
return err
}
case error:
return m
}
case a := <-c.actionCh:
switch a := a.(type) {
case pushConnAction:
if a.conn == nil {
found := delDownConn(c, a.id)
if found {
c.write(clientMessage{
Type: "close",
Id: a.id,
})
} else {
log.Printf("Deleting unknown " +
"down connection")
}
continue
}
down, err := addDownConnTracks(
c, a.conn, a.tracks,
)
if err != nil {
return err
2020-04-24 19:38:21 +02:00
}
if down != nil {
err = negotiate(c, down, false, false)
2020-04-24 19:38:21 +02:00
if err != nil {
2020-05-24 13:36:42 +02:00
log.Printf("Negotiate: %v", err)
delDownConn(c, down.id)
err = failUpConnection(
2020-05-24 13:36:42 +02:00
c, down.id,
"negotiation failed",
)
if err != nil {
return err
}
continue
2020-04-24 19:38:21 +02:00
}
}
case addLabelAction:
c.write(clientMessage{
Type: "label",
Id: a.id,
Value: a.label,
})
case pushConnsAction:
2020-04-24 19:38:21 +02:00
for _, u := range c.up {
tracks := u.getTracks()
2020-09-13 11:04:16 +02:00
ts := make([]conn.UpTrack, len(tracks))
2020-06-08 22:14:28 +02:00
for i, t := range tracks {
ts[i] = t
}
go a.c.PushConn(u.id, u, ts, u.label)
2020-04-24 19:38:21 +02:00
}
case connectionFailedAction:
if down := getDownConn(c, a.id); down != nil {
err := negotiate(c, down, true, true)
2020-08-13 13:48:17 +02:00
if err != nil {
return err
}
tracks := make(
2020-09-13 11:04:16 +02:00
[]conn.UpTrack, len(down.tracks),
)
for i, t := range down.tracks {
tracks[i] = t.remote
}
go c.PushConn(
down.remote.Id(), down.remote,
tracks, down.remote.Label(),
)
} else if up := getUpConn(c, a.id); up != nil {
c.write(clientMessage{
Type: "renegotiate",
Id: a.id,
})
} else {
log.Printf("Attempting to renegotiate " +
"unknown connection")
}
case permissionsChangedAction:
2020-04-25 17:36:35 +02:00
c.write(clientMessage{
Type: "permissions",
Permissions: c.permissions,
})
if !c.permissions.Present {
up := getUpConns(c)
for _, u := range up {
found := delUpConn(c, u.id)
2020-05-24 13:36:42 +02:00
if found {
failUpConnection(
c, u.id,
2020-05-24 13:36:42 +02:00
"permission denied",
)
}
}
}
2020-04-25 17:36:35 +02:00
case kickAction:
return group.UserError(a.message)
2020-04-24 19:38:21 +02:00
default:
log.Printf("unexpected action %T", a)
return errors.New("unexpected action")
}
case <-ticker.C:
if time.Since(readTime) > 90*time.Second {
return errors.New("client is dead")
}
if time.Since(readTime) > 60*time.Second {
err := c.write(clientMessage{
Type: "ping",
})
if err != nil {
return err
}
}
2020-04-24 19:38:21 +02:00
}
}
}
func failUpConnection(c *webClient, id string, message string) error {
2020-05-24 13:36:42 +02:00
if id != "" {
err := c.write(clientMessage{
Type: "abort",
Id: id,
})
if err != nil {
return err
}
}
if message != "" {
err := c.error(group.UserError(message))
2020-05-24 13:36:42 +02:00
if err != nil {
return err
}
}
return nil
}
func failDownConnection(c *webClient, id string, message string) error {
if id != "" {
err := c.write(clientMessage{
Type: "close",
Id: id,
})
if err != nil {
return err
}
}
if message != "" {
err := c.error(group.UserError(message))
if err != nil {
return err
}
}
return nil
}
func setPermissions(g *group.Group, id string, perm string) error {
client := g.GetClient(id)
2020-05-28 02:35:09 +02:00
if client == nil {
return group.UserError("no such user")
2020-05-28 02:35:09 +02:00
}
c, ok := client.(*webClient)
if !ok {
return group.UserError("this is not a real user")
2020-05-28 02:35:09 +02:00
}
switch perm {
case "op":
c.permissions.Op = true
if g.AllowRecording() {
2020-05-30 00:23:54 +02:00
c.permissions.Record = true
}
2020-05-28 02:35:09 +02:00
case "unop":
c.permissions.Op = false
2020-05-30 00:23:54 +02:00
c.permissions.Record = false
2020-05-28 02:35:09 +02:00
case "present":
c.permissions.Present = true
case "unpresent":
c.permissions.Present = false
default:
return group.UserError("unknown permission")
2020-05-28 02:35:09 +02:00
}
return c.action(permissionsChangedAction{})
}
2020-09-18 14:12:51 +02:00
func (c *webClient) Kick(message string) error {
return c.action(kickAction{message})
}
func kickClient(g *group.Group, id string, message string) error {
client := g.GetClient(id)
2020-05-28 02:35:09 +02:00
if client == nil {
return group.UserError("no such user")
2020-05-28 02:35:09 +02:00
}
c, ok := client.(group.Kickable)
2020-05-28 02:35:09 +02:00
if !ok {
return group.UserError("this client is not kickable")
2020-05-28 02:35:09 +02:00
}
return c.Kick(message)
2020-05-28 02:35:09 +02:00
}
2020-05-27 11:44:49 +02:00
func handleClientMessage(c *webClient, m clientMessage) error {
2020-04-24 19:38:21 +02:00
switch m.Type {
2020-05-09 19:39:34 +02:00
case "request":
err := c.setRequested(m.Request)
2020-05-09 19:39:34 +02:00
if err != nil {
return err
}
2020-04-24 19:38:21 +02:00
case "offer":
2020-04-25 02:25:51 +02:00
if !c.permissions.Present {
c.write(clientMessage{
Type: "abort",
Id: m.Id,
})
return c.error(group.UserError("not authorised"))
2020-04-25 02:25:51 +02:00
}
2020-04-24 19:38:21 +02:00
if m.Offer == nil {
return group.ProtocolError("null offer")
2020-04-24 19:38:21 +02:00
}
2020-08-12 13:56:35 +02:00
err := gotOffer(
c, m.Id, *m.Offer, m.Kind == "renegotiate", m.Labels,
)
2020-04-24 19:38:21 +02:00
if err != nil {
2020-05-24 13:36:42 +02:00
log.Printf("gotOffer: %v", err)
return failUpConnection(c, m.Id, "negotiation failed")
2020-04-24 19:38:21 +02:00
}
case "answer":
if m.Answer == nil {
return group.ProtocolError("null answer")
2020-04-24 19:38:21 +02:00
}
err := gotAnswer(c, m.Id, *m.Answer)
2020-04-24 19:38:21 +02:00
if err != nil {
log.Printf("gotAnswer: %v", err)
message := ""
if err != ErrUnknownId {
message = "negotiation failed"
}
return failDownConnection(c, m.Id, message)
2020-04-24 19:38:21 +02:00
}
case "renegotiate":
down := getDownConn(c, m.Id)
if down != nil {
err := negotiate(c, down, true, true)
if err != nil {
return failDownConnection(c, m.Id,
"renegotiation failed")
}
} else {
log.Printf("Trying to renegotiate unknown connection")
}
2020-04-24 19:38:21 +02:00
case "close":
found := delUpConn(c, m.Id)
if !found {
log.Printf("Deleting unknown up connection %v", m.Id)
}
2020-04-24 19:38:21 +02:00
case "ice":
if m.Candidate == nil {
return group.ProtocolError("null candidate")
2020-04-24 19:38:21 +02:00
}
err := gotICE(c, m.Candidate, m.Id)
if err != nil {
log.Printf("ICE: %v", err)
}
case "chat":
2020-09-30 00:33:23 +02:00
tm := toJSTime(time.Now())
c.group.AddToChatHistory(
m.Id, m.Username, tm, m.Kind, m.Value,
)
mm := clientMessage{
Type: "chat",
Id: m.Id,
Username: m.Username,
Time: tm,
Kind: m.Kind,
Value: m.Value,
}
clients := c.group.GetClients(nil)
2020-04-24 19:38:21 +02:00
for _, cc := range clients {
2020-05-28 02:35:09 +02:00
cc, ok := cc.(*webClient)
if ok {
2020-09-30 00:33:23 +02:00
cc.write(mm)
2020-05-28 02:35:09 +02:00
}
2020-04-24 19:38:21 +02:00
}
case "groupaction":
switch m.Kind {
case "clearchat":
c.group.ClearChatHistory()
m := clientMessage{Type: "clearchat"}
clients := c.group.GetClients(nil)
for _, cc := range clients {
cc, ok := cc.(*webClient)
if ok {
cc.write(m)
}
2020-05-28 02:35:09 +02:00
}
case "lock", "unlock":
if !c.permissions.Op {
return c.error(group.UserError("not authorised"))
2020-05-30 00:23:54 +02:00
}
2020-09-18 11:40:00 +02:00
c.group.SetLocked(m.Kind == "lock", m.Value)
case "record":
if !c.permissions.Record {
return c.error(group.UserError("not authorised"))
}
for _, cc := range c.group.GetClients(c) {
2020-09-13 14:12:00 +02:00
_, ok := cc.(*disk.Client)
if ok {
return c.error(group.UserError("already recording"))
}
}
2020-09-13 14:12:00 +02:00
disk := disk.New(c.group)
_, err := group.AddClient(c.group.Name(), disk)
if err != nil {
2020-05-30 00:23:54 +02:00
disk.Close()
return c.error(err)
2020-05-30 00:23:54 +02:00
}
go pushConns(disk)
case "unrecord":
if !c.permissions.Record {
return c.error(group.UserError("not authorised"))
}
for _, cc := range c.group.GetClients(c) {
2020-09-13 14:12:00 +02:00
disk, ok := cc.(*disk.Client)
if ok {
disk.Close()
group.DelClient(disk)
}
}
default:
return group.ProtocolError("unknown group action")
2020-05-30 00:23:54 +02:00
}
case "useraction":
switch m.Kind {
case "op", "unop", "present", "unpresent":
if !c.permissions.Op {
return c.error(group.UserError("not authorised"))
}
err := setPermissions(c.group, m.Id, m.Kind)
if err != nil {
return c.error(err)
}
case "kick":
if !c.permissions.Op {
return c.error(group.UserError("not authorised"))
}
message := m.Value
if message == "" {
message = "you have been kicked"
}
err := kickClient(c.group, m.Id, message)
if err != nil {
return c.error(err)
}
default:
return group.ProtocolError("unknown user action")
2020-04-25 17:36:35 +02:00
}
case "pong":
// nothing
case "ping":
c.write(clientMessage{
Type: "pong",
})
2020-04-24 19:38:21 +02:00
default:
log.Printf("unexpected message: %v", m.Type)
return group.ProtocolError("unexpected message")
2020-04-24 19:38:21 +02:00
}
return nil
}
func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) {
defer close(read)
for {
var m clientMessage
err := conn.ReadJSON(&m)
if err != nil {
select {
case read <- err:
return
case <-done:
return
}
}
select {
case read <- m:
case <-done:
return
}
}
}
func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) {
defer func() {
close(done)
conn.Close()
}()
for {
m, ok := <-ch
if !ok {
break
}
err := conn.SetWriteDeadline(
time.Now().Add(2 * time.Second))
if err != nil {
return
}
switch m := m.(type) {
case clientMessage:
err := conn.WriteJSON(m)
if err != nil {
return
}
case closeMessage:
if m.data != nil {
conn.WriteMessage(
websocket.CloseMessage,
m.data,
)
2020-04-24 19:38:21 +02:00
}
return
2020-04-24 19:38:21 +02:00
default:
log.Printf("clientWiter: unexpected message %T", m)
return
}
}
}
2020-05-28 02:35:09 +02:00
var ErrClientDead = errors.New("client is dead")
2020-05-30 12:38:13 +02:00
2020-05-28 02:35:09 +02:00
func (c *webClient) action(m interface{}) error {
select {
case c.actionCh <- m:
return nil
case <-c.done:
2020-05-30 12:38:13 +02:00
return ErrClientDead
2020-05-28 02:35:09 +02:00
}
}
func (c *webClient) write(m clientMessage) error {
select {
case c.writeCh <- m:
return nil
case <-c.writerDone:
return ErrClientDead
2020-05-28 02:35:09 +02:00
}
}
func (c *webClient) close(data []byte) error {
select {
case c.writeCh <- closeMessage{data}:
return nil
case <-c.writerDone:
return ErrClientDead
}
}
2020-05-28 02:35:09 +02:00
func (c *webClient) error(err error) error {
switch e := err.(type) {
case group.UserError:
2020-05-28 02:35:09 +02:00
return c.write(clientMessage{
2020-08-12 12:17:56 +02:00
Type: "usermessage",
Kind: "error",
2020-05-28 02:35:09 +02:00
Value: string(e),
})
default:
return err
}
}