mirror of
https://github.com/jech/galene.git
synced 2024-11-22 16:45:58 +01:00
1267 lines
25 KiB
Go
1267 lines
25 KiB
Go
package rtpconn
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pion/webrtc/v3"
|
|
|
|
"sfu/conn"
|
|
"sfu/diskwriter"
|
|
"sfu/estimator"
|
|
"sfu/group"
|
|
)
|
|
|
|
func errorToWSCloseMessage(err error) (string, []byte) {
|
|
var code int
|
|
var text string
|
|
switch e := err.(type) {
|
|
case *websocket.CloseError:
|
|
code = websocket.CloseNormalClosure
|
|
case group.ProtocolError:
|
|
code = websocket.CloseProtocolError
|
|
text = string(e)
|
|
case group.UserError:
|
|
code = websocket.CloseNormalClosure
|
|
text = string(e)
|
|
default:
|
|
code = websocket.CloseInternalServerErr
|
|
}
|
|
return text, websocket.FormatCloseMessage(code, text)
|
|
}
|
|
|
|
func isWSNormalError(err error) bool {
|
|
return websocket.IsCloseError(err,
|
|
websocket.CloseNormalClosure,
|
|
websocket.CloseGoingAway)
|
|
}
|
|
|
|
type webClient struct {
|
|
group *group.Group
|
|
id string
|
|
credentials group.ClientCredentials
|
|
permissions group.ClientPermissions
|
|
requested map[string]uint32
|
|
done chan struct{}
|
|
writeCh chan interface{}
|
|
writerDone chan struct{}
|
|
actionCh chan interface{}
|
|
|
|
mu sync.Mutex
|
|
down map[string]*rtpDownConnection
|
|
up map[string]*rtpUpConnection
|
|
}
|
|
|
|
func (c *webClient) Group() *group.Group {
|
|
return c.group
|
|
}
|
|
|
|
func (c *webClient) Id() string {
|
|
return c.id
|
|
}
|
|
|
|
func (c *webClient) Credentials() group.ClientCredentials {
|
|
return c.credentials
|
|
}
|
|
|
|
func (c *webClient) SetPermissions(perms group.ClientPermissions) {
|
|
c.permissions = perms
|
|
}
|
|
|
|
func (c *webClient) OverridePermissions(g *group.Group) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *webClient) PushClient(id, username string, add bool) error {
|
|
kind := "add"
|
|
if !add {
|
|
kind = "delete"
|
|
}
|
|
return c.write(clientMessage{
|
|
Type: "user",
|
|
Kind: kind,
|
|
Id: id,
|
|
Username: username,
|
|
})
|
|
}
|
|
|
|
type rateMap map[string]uint32
|
|
|
|
func (v *rateMap) UnmarshalJSON(b []byte) error {
|
|
var m map[string]interface{}
|
|
|
|
err := json.Unmarshal(b, &m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n := make(map[string]uint32, len(m))
|
|
for k, w := range m {
|
|
switch w := w.(type) {
|
|
case bool:
|
|
if w {
|
|
n[k] = ^uint32(0)
|
|
} else {
|
|
n[k] = 0
|
|
}
|
|
case float64:
|
|
if w < 0 || w >= float64(^uint32(0)) {
|
|
return errors.New("overflow")
|
|
}
|
|
n[k] = uint32(w)
|
|
default:
|
|
return errors.New("unexpected type in JSON map")
|
|
}
|
|
}
|
|
*v = n
|
|
return nil
|
|
}
|
|
|
|
func (v rateMap) MarshalJSON() ([]byte, error) {
|
|
m := make(map[string]interface{}, len(v))
|
|
for k, w := range v {
|
|
switch w {
|
|
case 0:
|
|
m[k] = false
|
|
case ^uint32(0):
|
|
m[k] = true
|
|
default:
|
|
m[k] = w
|
|
}
|
|
}
|
|
return json.Marshal(m)
|
|
}
|
|
|
|
type clientMessage struct {
|
|
Type string `json:"type"`
|
|
Kind string `json:"kind,omitempty"`
|
|
Id string `json:"id,omitempty"`
|
|
Dest string `json:"dest,omitempty"`
|
|
Username string `json:"username,omitempty"`
|
|
Password string `json:"password,omitempty"`
|
|
Permissions group.ClientPermissions `json:"permissions,omitempty"`
|
|
Group string `json:"group,omitempty"`
|
|
Value string `json:"value,omitempty"`
|
|
Time int64 `json:"time,omitempty"`
|
|
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
|
|
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
|
|
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
Request rateMap `json:"request,omitempty"`
|
|
}
|
|
|
|
type closeMessage struct {
|
|
data []byte
|
|
}
|
|
|
|
func getUpConn(c *webClient, id string) *rtpUpConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up == nil {
|
|
return nil
|
|
}
|
|
return c.up[id]
|
|
}
|
|
|
|
func getUpConns(c *webClient) []*rtpUpConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
up := make([]*rtpUpConnection, 0, len(c.up))
|
|
for _, u := range c.up {
|
|
up = append(up, u)
|
|
}
|
|
return up
|
|
}
|
|
|
|
func addUpConn(c *webClient, id string) (*rtpUpConnection, bool, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up == nil {
|
|
c.up = make(map[string]*rtpUpConnection)
|
|
}
|
|
if c.down != nil && c.down[id] != nil {
|
|
return nil, false, errors.New("Adding duplicate connection")
|
|
}
|
|
|
|
old := c.up[id]
|
|
if old != nil {
|
|
return old, false, nil
|
|
}
|
|
|
|
conn, err := newUpConn(c, id)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
c.up[id] = conn
|
|
|
|
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
sendICE(c, id, candidate)
|
|
})
|
|
|
|
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
if state == webrtc.ICEConnectionStateFailed {
|
|
c.action(connectionFailedAction{id: id})
|
|
}
|
|
})
|
|
|
|
return conn, true, nil
|
|
}
|
|
|
|
func delUpConn(c *webClient, id string) bool {
|
|
c.mu.Lock()
|
|
if c.up == nil {
|
|
c.mu.Unlock()
|
|
return false
|
|
}
|
|
conn := c.up[id]
|
|
if conn == nil {
|
|
c.mu.Unlock()
|
|
return false
|
|
}
|
|
delete(c.up, id)
|
|
c.mu.Unlock()
|
|
|
|
go func(clients []group.Client) {
|
|
for _, c := range clients {
|
|
c.PushConn(conn.id, nil, nil, "")
|
|
}
|
|
}(c.Group().GetClients(c))
|
|
|
|
conn.pc.Close()
|
|
return true
|
|
}
|
|
|
|
func getDownConn(c *webClient, id string) *rtpDownConnection {
|
|
if c.down == nil {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
conn := c.down[id]
|
|
if conn == nil {
|
|
return nil
|
|
}
|
|
return conn
|
|
}
|
|
|
|
func getConn(c *webClient, id string) iceConnection {
|
|
up := getUpConn(c, id)
|
|
if up != nil {
|
|
return up
|
|
}
|
|
down := getDownConn(c, id)
|
|
if down != nil {
|
|
return down
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func addDownConn(c *webClient, id string, remote conn.Up) (*rtpDownConnection, error) {
|
|
conn, err := newDownConn(c, id, remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = addDownConnHelper(c, conn, remote)
|
|
if err != nil {
|
|
conn.pc.Close()
|
|
return nil, err
|
|
}
|
|
return conn, err
|
|
}
|
|
|
|
func addDownConnHelper(c *webClient, conn *rtpDownConnection, remote conn.Up) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up != nil && c.up[conn.id] != nil {
|
|
return errors.New("Adding duplicate connection")
|
|
}
|
|
|
|
if c.down == nil {
|
|
c.down = make(map[string]*rtpDownConnection)
|
|
}
|
|
|
|
old := c.down[conn.id]
|
|
if old != nil {
|
|
// Avoid calling Close under a lock
|
|
go old.pc.Close()
|
|
}
|
|
|
|
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
sendICE(c, conn.id, candidate)
|
|
})
|
|
|
|
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
if state == webrtc.ICEConnectionStateFailed {
|
|
c.action(connectionFailedAction{id: conn.id})
|
|
}
|
|
})
|
|
|
|
err := remote.AddLocal(conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.down[conn.id] = conn
|
|
return nil
|
|
}
|
|
|
|
func delDownConn(c *webClient, id string) bool {
|
|
conn := delDownConnHelper(c, id)
|
|
if conn != nil {
|
|
conn.pc.Close()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func delDownConnHelper(c *webClient, id string) *rtpDownConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.down == nil {
|
|
return nil
|
|
}
|
|
conn := c.down[id]
|
|
if conn == nil {
|
|
return nil
|
|
}
|
|
|
|
conn.remote.DelLocal(conn)
|
|
for _, track := range conn.tracks {
|
|
// we only insert the track after we get an answer, so
|
|
// ignore errors here.
|
|
track.remote.DelLocal(track)
|
|
}
|
|
delete(c.down, id)
|
|
return conn
|
|
}
|
|
|
|
func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrack, remoteConn conn.Up) (*webrtc.RTPSender, error) {
|
|
var pt uint8
|
|
var ssrc uint32
|
|
var id, label string
|
|
switch rt := remoteTrack.(type) {
|
|
case *rtpUpTrack:
|
|
pt = rt.track.PayloadType()
|
|
ssrc = rt.track.SSRC()
|
|
id = rt.track.ID()
|
|
label = rt.track.Label()
|
|
default:
|
|
return nil, errors.New("not implemented yet")
|
|
}
|
|
|
|
local, err := conn.pc.NewTrack(pt, ssrc, id, label)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s, err := conn.pc.AddTrack(local)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
track := &rtpDownTrack{
|
|
track: local,
|
|
remote: remoteTrack,
|
|
maxBitrate: new(bitrate),
|
|
stats: new(receiverStats),
|
|
rate: estimator.New(time.Second),
|
|
}
|
|
conn.tracks = append(conn.tracks, track)
|
|
|
|
go rtcpDownListener(conn, track, s)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bool) error {
|
|
options := webrtc.OfferOptions{ICERestart: restartIce}
|
|
offer, err := down.pc.CreateOffer(&options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = down.pc.SetLocalDescription(offer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
labels := make(map[string]string)
|
|
for _, t := range down.pc.GetTransceivers() {
|
|
var track *webrtc.Track
|
|
if t.Sender() != nil {
|
|
track = t.Sender().Track()
|
|
}
|
|
if track == nil {
|
|
continue
|
|
}
|
|
|
|
for _, tr := range down.tracks {
|
|
if tr.track == track {
|
|
labels[t.Mid()] = tr.remote.Label()
|
|
}
|
|
}
|
|
}
|
|
|
|
kind := ""
|
|
if renegotiate {
|
|
kind = "renegotiate"
|
|
}
|
|
|
|
return c.write(clientMessage{
|
|
Type: "offer",
|
|
Kind: kind,
|
|
Id: down.id,
|
|
Offer: &offer,
|
|
Labels: labels,
|
|
})
|
|
}
|
|
|
|
func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error {
|
|
if candidate == nil {
|
|
return nil
|
|
}
|
|
cand := candidate.ToJSON()
|
|
return c.write(clientMessage{
|
|
Type: "ice",
|
|
Id: id,
|
|
Candidate: &cand,
|
|
})
|
|
}
|
|
|
|
func gotOffer(c *webClient, id string, offer webrtc.SessionDescription, renegotiate bool, labels map[string]string) error {
|
|
if !renegotiate {
|
|
// unless the client indicates that this is a compatible
|
|
// renegotiation, tear down the existing connection.
|
|
delUpConn(c, id)
|
|
}
|
|
|
|
up, isnew, err := addUpConn(c, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if u := c.Credentials().Username; u != "" {
|
|
up.label = u
|
|
}
|
|
err = up.pc.SetRemoteDescription(offer)
|
|
if err != nil {
|
|
if renegotiate && !isnew {
|
|
// create a new PC from scratch
|
|
log.Printf("SetRemoteDescription(offer): %v", err)
|
|
return gotOffer(c, id, offer, false, labels)
|
|
}
|
|
return err
|
|
}
|
|
|
|
answer, err := up.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = up.pc.SetLocalDescription(answer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
up.labels = labels
|
|
|
|
err = up.flushICECandidates()
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
|
|
return c.write(clientMessage{
|
|
Type: "answer",
|
|
Id: id,
|
|
Answer: &answer,
|
|
})
|
|
}
|
|
|
|
var ErrUnknownId = errors.New("unknown id")
|
|
|
|
func gotAnswer(c *webClient, id string, answer webrtc.SessionDescription) error {
|
|
down := getDownConn(c, id)
|
|
if down == nil {
|
|
return ErrUnknownId
|
|
}
|
|
err := down.pc.SetRemoteDescription(answer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = down.flushICECandidates()
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
|
|
add := func() {
|
|
down.pc.OnConnectionStateChange(nil)
|
|
for _, t := range down.tracks {
|
|
t.remote.AddLocal(t)
|
|
}
|
|
}
|
|
down.pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
|
if state == webrtc.PeerConnectionStateConnected {
|
|
add()
|
|
}
|
|
})
|
|
if down.pc.ConnectionState() == webrtc.PeerConnectionStateConnected {
|
|
add()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error {
|
|
conn := getConn(c, id)
|
|
if conn == nil {
|
|
return errors.New("unknown id in ICE")
|
|
}
|
|
return conn.addICECandidate(candidate)
|
|
}
|
|
|
|
func (c *webClient) setRequested(requested map[string]uint32) error {
|
|
if c.down != nil {
|
|
for id := range c.down {
|
|
c.write(clientMessage{
|
|
Type: "close",
|
|
Id: id,
|
|
})
|
|
delDownConn(c, id)
|
|
}
|
|
}
|
|
|
|
c.requested = requested
|
|
|
|
go pushConns(c)
|
|
return nil
|
|
}
|
|
|
|
func pushConns(c group.Client) {
|
|
clients := c.Group().GetClients(c)
|
|
for _, cc := range clients {
|
|
ccc, ok := cc.(*webClient)
|
|
if ok {
|
|
ccc.action(pushConnsAction{c})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *webClient) isRequested(label string) bool {
|
|
return c.requested[label] != 0
|
|
}
|
|
|
|
func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rtpDownConnection, error) {
|
|
requested := false
|
|
for _, t := range tracks {
|
|
if c.isRequested(t.Label()) {
|
|
requested = true
|
|
break
|
|
}
|
|
}
|
|
if !requested {
|
|
return nil, nil
|
|
}
|
|
|
|
down, err := addDownConn(c, remote.Id(), remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, t := range tracks {
|
|
if !c.isRequested(t.Label()) {
|
|
continue
|
|
}
|
|
_, err = addDownTrack(c, down, t, remote)
|
|
if err != nil {
|
|
delDownConn(c, down.id)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
go rtcpDownSender(down)
|
|
|
|
return down, nil
|
|
}
|
|
|
|
func (c *webClient) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error {
|
|
err := c.action(pushConnAction{id, up, tracks})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if up != nil && label != "" {
|
|
err := c.action(addLabelAction{up.Id(), up.Label()})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func StartClient(conn *websocket.Conn) (err error) {
|
|
var m clientMessage
|
|
|
|
err = conn.SetReadDeadline(time.Now().Add(15 * time.Second))
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
err = conn.ReadJSON(&m)
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
err = conn.SetReadDeadline(time.Time{})
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
if m.Type != "login" {
|
|
conn.WriteMessage(websocket.CloseMessage,
|
|
websocket.FormatCloseMessage(
|
|
websocket.CloseProtocolError,
|
|
"you must login first",
|
|
),
|
|
)
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
c := &webClient{
|
|
id: m.Id,
|
|
credentials: group.ClientCredentials{
|
|
m.Username,
|
|
m.Password,
|
|
},
|
|
actionCh: make(chan interface{}, 10),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
defer close(c.done)
|
|
|
|
c.writeCh = make(chan interface{}, 25)
|
|
defer func() {
|
|
if isWSNormalError(err) {
|
|
err = nil
|
|
c.close(nil)
|
|
} else {
|
|
m, e := errorToWSCloseMessage(err)
|
|
if m != "" {
|
|
c.write(clientMessage{
|
|
Type: "usermessage",
|
|
Kind: "error",
|
|
Value: m,
|
|
})
|
|
}
|
|
c.close(e)
|
|
}
|
|
}()
|
|
|
|
c.writerDone = make(chan struct{})
|
|
go clientWriter(conn, c.writeCh, c.writerDone)
|
|
|
|
err = conn.ReadJSON(&m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if m.Type != "join" {
|
|
return group.ProtocolError("you must join a group first")
|
|
}
|
|
|
|
g, err := group.AddClient(m.Group, c)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
err = group.UserError("group does not exist")
|
|
}
|
|
return
|
|
}
|
|
if redirect := g.Redirect(); redirect != "" {
|
|
// We normally redirect at the HTTP level, but the group
|
|
// description could have been edited in the meantime.
|
|
err = group.UserError("group is now at " + redirect)
|
|
return
|
|
}
|
|
c.group = g
|
|
defer group.DelClient(c)
|
|
|
|
return clientLoop(c, conn)
|
|
}
|
|
|
|
type pushConnAction struct {
|
|
id string
|
|
conn conn.Up
|
|
tracks []conn.UpTrack
|
|
}
|
|
|
|
type addLabelAction struct {
|
|
id string
|
|
label string
|
|
}
|
|
|
|
type pushConnsAction struct {
|
|
c group.Client
|
|
}
|
|
|
|
type connectionFailedAction struct {
|
|
id string
|
|
}
|
|
|
|
type permissionsChangedAction struct{}
|
|
|
|
type kickAction struct {
|
|
message string
|
|
}
|
|
|
|
func clientLoop(c *webClient, ws *websocket.Conn) error {
|
|
read := make(chan interface{}, 1)
|
|
go clientReader(ws, read, c.done)
|
|
|
|
defer func() {
|
|
c.setRequested(map[string]uint32{})
|
|
if c.up != nil {
|
|
for id := range c.up {
|
|
delUpConn(c, id)
|
|
}
|
|
}
|
|
}()
|
|
|
|
c.write(clientMessage{
|
|
Type: "permissions",
|
|
Permissions: c.permissions,
|
|
})
|
|
|
|
h := c.group.GetChatHistory()
|
|
for _, m := range h {
|
|
err := c.write(clientMessage{
|
|
Type: "chat",
|
|
Id: m.Id,
|
|
Username: m.User,
|
|
Time: m.Time,
|
|
Value: m.Value,
|
|
Kind: m.Kind,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
readTime := time.Now()
|
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case m, ok := <-read:
|
|
if !ok {
|
|
return errors.New("reader died")
|
|
}
|
|
switch m := m.(type) {
|
|
case clientMessage:
|
|
readTime = time.Now()
|
|
err := handleClientMessage(c, m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case error:
|
|
return m
|
|
}
|
|
case a := <-c.actionCh:
|
|
switch a := a.(type) {
|
|
case pushConnAction:
|
|
if a.conn == nil {
|
|
found := delDownConn(c, a.id)
|
|
if found {
|
|
c.write(clientMessage{
|
|
Type: "close",
|
|
Id: a.id,
|
|
})
|
|
} else {
|
|
log.Printf("Deleting unknown " +
|
|
"down connection")
|
|
}
|
|
continue
|
|
}
|
|
down, err := addDownConnTracks(
|
|
c, a.conn, a.tracks,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if down != nil {
|
|
err = negotiate(c, down, false, false)
|
|
if err != nil {
|
|
log.Printf("Negotiate: %v", err)
|
|
delDownConn(c, down.id)
|
|
err = failUpConnection(
|
|
c, down.id,
|
|
"negotiation failed",
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
case addLabelAction:
|
|
c.write(clientMessage{
|
|
Type: "label",
|
|
Id: a.id,
|
|
Value: a.label,
|
|
})
|
|
case pushConnsAction:
|
|
for _, u := range c.up {
|
|
tracks := u.getTracks()
|
|
ts := make([]conn.UpTrack, len(tracks))
|
|
for i, t := range tracks {
|
|
ts[i] = t
|
|
}
|
|
go a.c.PushConn(u.id, u, ts, u.label)
|
|
}
|
|
case connectionFailedAction:
|
|
if down := getDownConn(c, a.id); down != nil {
|
|
err := negotiate(c, down, true, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tracks := make(
|
|
[]conn.UpTrack, len(down.tracks),
|
|
)
|
|
for i, t := range down.tracks {
|
|
tracks[i] = t.remote
|
|
}
|
|
go c.PushConn(
|
|
down.remote.Id(), down.remote,
|
|
tracks, down.remote.Label(),
|
|
)
|
|
} else if up := getUpConn(c, a.id); up != nil {
|
|
c.write(clientMessage{
|
|
Type: "renegotiate",
|
|
Id: a.id,
|
|
})
|
|
} else {
|
|
log.Printf("Attempting to renegotiate " +
|
|
"unknown connection")
|
|
}
|
|
|
|
case permissionsChangedAction:
|
|
c.write(clientMessage{
|
|
Type: "permissions",
|
|
Permissions: c.permissions,
|
|
})
|
|
if !c.permissions.Present {
|
|
up := getUpConns(c)
|
|
for _, u := range up {
|
|
found := delUpConn(c, u.id)
|
|
if found {
|
|
failUpConnection(
|
|
c, u.id,
|
|
"permission denied",
|
|
)
|
|
}
|
|
}
|
|
}
|
|
case kickAction:
|
|
return group.UserError(a.message)
|
|
default:
|
|
log.Printf("unexpected action %T", a)
|
|
return errors.New("unexpected action")
|
|
}
|
|
case <-ticker.C:
|
|
if time.Since(readTime) > 90*time.Second {
|
|
return errors.New("client is dead")
|
|
}
|
|
if time.Since(readTime) > 60*time.Second {
|
|
err := c.write(clientMessage{
|
|
Type: "ping",
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func failUpConnection(c *webClient, id string, message string) error {
|
|
if id != "" {
|
|
err := c.write(clientMessage{
|
|
Type: "abort",
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if message != "" {
|
|
err := c.error(group.UserError(message))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func failDownConnection(c *webClient, id string, message string) error {
|
|
if id != "" {
|
|
err := c.write(clientMessage{
|
|
Type: "close",
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if message != "" {
|
|
err := c.error(group.UserError(message))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func setPermissions(g *group.Group, id string, perm string) error {
|
|
client := g.GetClient(id)
|
|
if client == nil {
|
|
return group.UserError("no such user")
|
|
}
|
|
|
|
c, ok := client.(*webClient)
|
|
if !ok {
|
|
return group.UserError("this is not a real user")
|
|
}
|
|
|
|
switch perm {
|
|
case "op":
|
|
c.permissions.Op = true
|
|
if g.AllowRecording() {
|
|
c.permissions.Record = true
|
|
}
|
|
case "unop":
|
|
c.permissions.Op = false
|
|
c.permissions.Record = false
|
|
case "present":
|
|
c.permissions.Present = true
|
|
case "unpresent":
|
|
c.permissions.Present = false
|
|
default:
|
|
return group.UserError("unknown permission")
|
|
}
|
|
return c.action(permissionsChangedAction{})
|
|
}
|
|
|
|
func (c *webClient) Kick(message string) error {
|
|
return c.action(kickAction{message})
|
|
}
|
|
|
|
func kickClient(g *group.Group, id string, message string) error {
|
|
client := g.GetClient(id)
|
|
if client == nil {
|
|
return group.UserError("no such user")
|
|
}
|
|
|
|
c, ok := client.(group.Kickable)
|
|
if !ok {
|
|
return group.UserError("this client is not kickable")
|
|
}
|
|
|
|
return c.Kick(message)
|
|
}
|
|
|
|
func handleClientMessage(c *webClient, m clientMessage) error {
|
|
switch m.Type {
|
|
case "request":
|
|
err := c.setRequested(m.Request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case "offer":
|
|
if !c.permissions.Present {
|
|
c.write(clientMessage{
|
|
Type: "abort",
|
|
Id: m.Id,
|
|
})
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
if m.Offer == nil {
|
|
return group.ProtocolError("null offer")
|
|
}
|
|
err := gotOffer(
|
|
c, m.Id, *m.Offer, m.Kind == "renegotiate", m.Labels,
|
|
)
|
|
if err != nil {
|
|
log.Printf("gotOffer: %v", err)
|
|
return failUpConnection(c, m.Id, "negotiation failed")
|
|
}
|
|
case "answer":
|
|
if m.Answer == nil {
|
|
return group.ProtocolError("null answer")
|
|
}
|
|
err := gotAnswer(c, m.Id, *m.Answer)
|
|
if err != nil {
|
|
log.Printf("gotAnswer: %v", err)
|
|
message := ""
|
|
if err != ErrUnknownId {
|
|
message = "negotiation failed"
|
|
}
|
|
return failDownConnection(c, m.Id, message)
|
|
}
|
|
case "renegotiate":
|
|
down := getDownConn(c, m.Id)
|
|
if down != nil {
|
|
err := negotiate(c, down, true, true)
|
|
if err != nil {
|
|
return failDownConnection(c, m.Id,
|
|
"renegotiation failed")
|
|
}
|
|
} else {
|
|
log.Printf("Trying to renegotiate unknown connection")
|
|
}
|
|
case "close":
|
|
found := delUpConn(c, m.Id)
|
|
if !found {
|
|
log.Printf("Deleting unknown up connection %v", m.Id)
|
|
}
|
|
case "ice":
|
|
if m.Candidate == nil {
|
|
return group.ProtocolError("null candidate")
|
|
}
|
|
err := gotICE(c, m.Candidate, m.Id)
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
case "chat":
|
|
tm := group.ToJSTime(time.Now())
|
|
if m.Dest == "" {
|
|
c.group.AddToChatHistory(
|
|
m.Id, m.Username, tm, m.Kind, m.Value,
|
|
)
|
|
}
|
|
mm := clientMessage{
|
|
Type: "chat",
|
|
Id: m.Id,
|
|
Dest: m.Dest,
|
|
Username: m.Username,
|
|
Time: tm,
|
|
Kind: m.Kind,
|
|
Value: m.Value,
|
|
}
|
|
if m.Dest == "" {
|
|
clients := c.group.GetClients(nil)
|
|
for _, cc := range clients {
|
|
ccc, ok := cc.(*webClient)
|
|
if ok {
|
|
ccc.write(mm)
|
|
}
|
|
}
|
|
} else {
|
|
cc := c.group.GetClient(m.Dest)
|
|
if cc == nil {
|
|
return c.error(group.UserError("user unknown"))
|
|
}
|
|
ccc, ok := cc.(*webClient)
|
|
if !ok {
|
|
return c.error(group.UserError("this user doesn't chat"))
|
|
}
|
|
ccc.write(mm)
|
|
}
|
|
case "groupaction":
|
|
switch m.Kind {
|
|
case "clearchat":
|
|
c.group.ClearChatHistory()
|
|
m := clientMessage{Type: "clearchat"}
|
|
clients := c.group.GetClients(nil)
|
|
for _, cc := range clients {
|
|
cc, ok := cc.(*webClient)
|
|
if ok {
|
|
cc.write(m)
|
|
}
|
|
}
|
|
case "lock", "unlock":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
c.group.SetLocked(m.Kind == "lock", m.Value)
|
|
case "record":
|
|
if !c.permissions.Record {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
for _, cc := range c.group.GetClients(c) {
|
|
_, ok := cc.(*diskwriter.Client)
|
|
if ok {
|
|
return c.error(group.UserError("already recording"))
|
|
}
|
|
}
|
|
disk := diskwriter.New(c.group)
|
|
_, err := group.AddClient(c.group.Name(), disk)
|
|
if err != nil {
|
|
disk.Close()
|
|
return c.error(err)
|
|
}
|
|
go pushConns(disk)
|
|
case "unrecord":
|
|
if !c.permissions.Record {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
for _, cc := range c.group.GetClients(c) {
|
|
disk, ok := cc.(*diskwriter.Client)
|
|
if ok {
|
|
disk.Close()
|
|
group.DelClient(disk)
|
|
}
|
|
}
|
|
default:
|
|
return group.ProtocolError("unknown group action")
|
|
}
|
|
case "useraction":
|
|
switch m.Kind {
|
|
case "op", "unop", "present", "unpresent":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
err := setPermissions(c.group, m.Id, m.Kind)
|
|
if err != nil {
|
|
return c.error(err)
|
|
}
|
|
case "kick":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
message := m.Value
|
|
if message == "" {
|
|
message = "you have been kicked"
|
|
}
|
|
err := kickClient(c.group, m.Id, message)
|
|
if err != nil {
|
|
return c.error(err)
|
|
}
|
|
default:
|
|
return group.ProtocolError("unknown user action")
|
|
}
|
|
case "pong":
|
|
// nothing
|
|
case "ping":
|
|
c.write(clientMessage{
|
|
Type: "pong",
|
|
})
|
|
default:
|
|
log.Printf("unexpected message: %v", m.Type)
|
|
return group.ProtocolError("unexpected message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) {
|
|
defer close(read)
|
|
for {
|
|
var m clientMessage
|
|
err := conn.ReadJSON(&m)
|
|
if err != nil {
|
|
select {
|
|
case read <- err:
|
|
return
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
select {
|
|
case read <- m:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) {
|
|
defer func() {
|
|
close(done)
|
|
conn.Close()
|
|
}()
|
|
|
|
for {
|
|
m, ok := <-ch
|
|
if !ok {
|
|
break
|
|
}
|
|
err := conn.SetWriteDeadline(
|
|
time.Now().Add(2 * time.Second))
|
|
if err != nil {
|
|
return
|
|
}
|
|
switch m := m.(type) {
|
|
case clientMessage:
|
|
err := conn.WriteJSON(m)
|
|
if err != nil {
|
|
return
|
|
}
|
|
case closeMessage:
|
|
if m.data != nil {
|
|
conn.WriteMessage(
|
|
websocket.CloseMessage,
|
|
m.data,
|
|
)
|
|
}
|
|
return
|
|
default:
|
|
log.Printf("clientWiter: unexpected message %T", m)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var ErrClientDead = errors.New("client is dead")
|
|
|
|
func (c *webClient) action(m interface{}) error {
|
|
select {
|
|
case c.actionCh <- m:
|
|
return nil
|
|
case <-c.done:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func (c *webClient) write(m clientMessage) error {
|
|
select {
|
|
case c.writeCh <- m:
|
|
return nil
|
|
case <-c.writerDone:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func (c *webClient) close(data []byte) error {
|
|
select {
|
|
case c.writeCh <- closeMessage{data}:
|
|
return nil
|
|
case <-c.writerDone:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func (c *webClient) error(err error) error {
|
|
switch e := err.(type) {
|
|
case group.UserError:
|
|
return c.write(clientMessage{
|
|
Type: "usermessage",
|
|
Kind: "error",
|
|
Value: string(e),
|
|
})
|
|
default:
|
|
return err
|
|
}
|
|
}
|