mirror of
https://github.com/jech/galene.git
synced 2024-11-14 12:45:58 +01:00
1460 lines
30 KiB
Go
1460 lines
30 KiB
Go
package rtpconn
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pion/webrtc/v3"
|
|
|
|
"sfu/conn"
|
|
"sfu/diskwriter"
|
|
"sfu/estimator"
|
|
"sfu/group"
|
|
)
|
|
|
|
func errorToWSCloseMessage(id string, err error) (*clientMessage, []byte) {
|
|
var code int
|
|
var m *clientMessage
|
|
var text string
|
|
switch e := err.(type) {
|
|
case *websocket.CloseError:
|
|
code = websocket.CloseNormalClosure
|
|
case group.ProtocolError:
|
|
code = websocket.CloseProtocolError
|
|
s := e.Error()
|
|
m = &clientMessage{
|
|
Type: "usermessage",
|
|
Kind: "error",
|
|
Dest: id,
|
|
Priviledged: true,
|
|
Value: &s,
|
|
}
|
|
text = e.Error()
|
|
case group.UserError, group.KickError:
|
|
code = websocket.CloseNormalClosure
|
|
m = errorMessage(id, err)
|
|
text = e.Error()
|
|
default:
|
|
code = websocket.CloseInternalServerErr
|
|
}
|
|
return m, websocket.FormatCloseMessage(code, text)
|
|
}
|
|
|
|
func isWSNormalError(err error) bool {
|
|
return websocket.IsCloseError(err,
|
|
websocket.CloseNormalClosure,
|
|
websocket.CloseGoingAway)
|
|
}
|
|
|
|
type webClient struct {
|
|
group *group.Group
|
|
id string
|
|
username string
|
|
password string
|
|
permissions group.ClientPermissions
|
|
requested map[string]uint32
|
|
done chan struct{}
|
|
writeCh chan interface{}
|
|
writerDone chan struct{}
|
|
actionCh chan interface{}
|
|
|
|
mu sync.Mutex
|
|
down map[string]*rtpDownConnection
|
|
up map[string]*rtpUpConnection
|
|
}
|
|
|
|
func (c *webClient) Group() *group.Group {
|
|
return c.group
|
|
}
|
|
|
|
func (c *webClient) Id() string {
|
|
return c.id
|
|
}
|
|
|
|
func (c *webClient) Username() string {
|
|
return c.username
|
|
}
|
|
|
|
func (c *webClient) Challenge(group string, creds group.ClientCredentials) bool {
|
|
if creds.Password == nil {
|
|
return true
|
|
}
|
|
m, err := creds.Password.Match(c.password)
|
|
if err != nil {
|
|
log.Printf("Password match: %v", err)
|
|
return false
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (c *webClient) SetPermissions(perms group.ClientPermissions) {
|
|
c.permissions = perms
|
|
}
|
|
|
|
func (c *webClient) OverridePermissions(g *group.Group) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *webClient) PushClient(id, username string, add bool) error {
|
|
kind := "add"
|
|
if !add {
|
|
kind = "delete"
|
|
}
|
|
return c.write(clientMessage{
|
|
Type: "user",
|
|
Kind: kind,
|
|
Id: id,
|
|
Username: username,
|
|
})
|
|
}
|
|
|
|
type rateMap map[string]uint32
|
|
|
|
func (v *rateMap) UnmarshalJSON(b []byte) error {
|
|
var m map[string]interface{}
|
|
|
|
err := json.Unmarshal(b, &m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n := make(map[string]uint32, len(m))
|
|
for k, w := range m {
|
|
switch w := w.(type) {
|
|
case bool:
|
|
if w {
|
|
n[k] = ^uint32(0)
|
|
} else {
|
|
n[k] = 0
|
|
}
|
|
case float64:
|
|
if w < 0 || w >= float64(^uint32(0)) {
|
|
return errors.New("overflow")
|
|
}
|
|
n[k] = uint32(w)
|
|
default:
|
|
return errors.New("unexpected type in JSON map")
|
|
}
|
|
}
|
|
*v = n
|
|
return nil
|
|
}
|
|
|
|
func (v rateMap) MarshalJSON() ([]byte, error) {
|
|
m := make(map[string]interface{}, len(v))
|
|
for k, w := range v {
|
|
switch w {
|
|
case 0:
|
|
m[k] = false
|
|
case ^uint32(0):
|
|
m[k] = true
|
|
default:
|
|
m[k] = w
|
|
}
|
|
}
|
|
return json.Marshal(m)
|
|
}
|
|
|
|
type clientMessage struct {
|
|
Type string `json:"type"`
|
|
Kind string `json:"kind,omitempty"`
|
|
Id string `json:"id,omitempty"`
|
|
Dest string `json:"dest,omitempty"`
|
|
Username string `json:"username,omitempty"`
|
|
Password string `json:"password,omitempty"`
|
|
Priviledged bool `json:"priviledged,omitempty"`
|
|
Permissions *group.ClientPermissions `json:"permissions,omitempty"`
|
|
Group string `json:"group,omitempty"`
|
|
Value *string `json:"value,omitempty"`
|
|
Time int64 `json:"time,omitempty"`
|
|
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
|
|
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
|
|
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
Request rateMap `json:"request,omitempty"`
|
|
}
|
|
|
|
type closeMessage struct {
|
|
data []byte
|
|
}
|
|
|
|
func getUpConn(c *webClient, id string) *rtpUpConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up == nil {
|
|
return nil
|
|
}
|
|
return c.up[id]
|
|
}
|
|
|
|
func getUpConns(c *webClient) []*rtpUpConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
up := make([]*rtpUpConnection, 0, len(c.up))
|
|
for _, u := range c.up {
|
|
up = append(up, u)
|
|
}
|
|
return up
|
|
}
|
|
|
|
func addUpConn(c *webClient, id string) (*rtpUpConnection, bool, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up == nil {
|
|
c.up = make(map[string]*rtpUpConnection)
|
|
}
|
|
if c.down != nil && c.down[id] != nil {
|
|
return nil, false, errors.New("Adding duplicate connection")
|
|
}
|
|
|
|
old := c.up[id]
|
|
if old != nil {
|
|
return old, false, nil
|
|
}
|
|
|
|
conn, err := newUpConn(c, id)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
c.up[id] = conn
|
|
|
|
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
sendICE(c, id, candidate)
|
|
})
|
|
|
|
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
if state == webrtc.ICEConnectionStateFailed {
|
|
c.action(connectionFailedAction{id: id})
|
|
}
|
|
})
|
|
|
|
return conn, true, nil
|
|
}
|
|
|
|
func delUpConn(c *webClient, id string) bool {
|
|
c.mu.Lock()
|
|
if c.up == nil {
|
|
c.mu.Unlock()
|
|
return false
|
|
}
|
|
conn := c.up[id]
|
|
if conn == nil {
|
|
c.mu.Unlock()
|
|
return false
|
|
}
|
|
delete(c.up, id)
|
|
c.mu.Unlock()
|
|
|
|
g := c.group
|
|
if g != nil {
|
|
go func(clients []group.Client) {
|
|
for _, c := range clients {
|
|
err := c.PushConn(g, conn.id, nil, nil, "")
|
|
if err != nil {
|
|
log.Printf("PushConn: %v", err)
|
|
}
|
|
}
|
|
}(g.GetClients(c))
|
|
} else {
|
|
log.Printf("Deleting connection for client with no group")
|
|
}
|
|
|
|
conn.pc.Close()
|
|
return true
|
|
}
|
|
|
|
func getDownConn(c *webClient, id string) *rtpDownConnection {
|
|
if c.down == nil {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
conn := c.down[id]
|
|
if conn == nil {
|
|
return nil
|
|
}
|
|
return conn
|
|
}
|
|
|
|
func getConn(c *webClient, id string) iceConnection {
|
|
up := getUpConn(c, id)
|
|
if up != nil {
|
|
return up
|
|
}
|
|
down := getDownConn(c, id)
|
|
if down != nil {
|
|
return down
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func addDownConn(c *webClient, id string, remote conn.Up) (*rtpDownConnection, error) {
|
|
conn, err := newDownConn(c, id, remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = addDownConnHelper(c, conn, remote)
|
|
if err != nil {
|
|
conn.pc.Close()
|
|
return nil, err
|
|
}
|
|
return conn, err
|
|
}
|
|
|
|
func addDownConnHelper(c *webClient, conn *rtpDownConnection, remote conn.Up) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.up != nil && c.up[conn.id] != nil {
|
|
return errors.New("Adding duplicate connection")
|
|
}
|
|
|
|
if c.down == nil {
|
|
c.down = make(map[string]*rtpDownConnection)
|
|
}
|
|
|
|
old := c.down[conn.id]
|
|
if old != nil {
|
|
// Avoid calling Close under a lock
|
|
go old.pc.Close()
|
|
}
|
|
|
|
conn.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
sendICE(c, conn.id, candidate)
|
|
})
|
|
|
|
conn.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
if state == webrtc.ICEConnectionStateFailed {
|
|
c.action(connectionFailedAction{id: conn.id})
|
|
}
|
|
})
|
|
|
|
err := remote.AddLocal(conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.down[conn.id] = conn
|
|
return nil
|
|
}
|
|
|
|
func delDownConn(c *webClient, id string) bool {
|
|
conn := delDownConnHelper(c, id)
|
|
if conn != nil {
|
|
conn.pc.Close()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func delDownConnHelper(c *webClient, id string) *rtpDownConnection {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.down == nil {
|
|
return nil
|
|
}
|
|
conn := c.down[id]
|
|
if conn == nil {
|
|
return nil
|
|
}
|
|
|
|
conn.remote.DelLocal(conn)
|
|
for _, track := range conn.tracks {
|
|
// we only insert the track after we get an answer, so
|
|
// ignore errors here.
|
|
track.remote.DelLocal(track)
|
|
}
|
|
delete(c.down, id)
|
|
return conn
|
|
}
|
|
|
|
func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrack, remoteConn conn.Up) (*webrtc.RTPSender, error) {
|
|
var pt uint8
|
|
var ssrc uint32
|
|
var id, label string
|
|
switch rt := remoteTrack.(type) {
|
|
case *rtpUpTrack:
|
|
pt = rt.track.PayloadType()
|
|
ssrc = rt.track.SSRC()
|
|
id = rt.track.ID()
|
|
label = rt.track.Label()
|
|
default:
|
|
return nil, errors.New("not implemented yet")
|
|
}
|
|
|
|
local, err := conn.pc.NewTrack(pt, ssrc, id, label)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s, err := conn.pc.AddTrack(local)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
track := &rtpDownTrack{
|
|
track: local,
|
|
remote: remoteTrack,
|
|
maxBitrate: new(bitrate),
|
|
stats: new(receiverStats),
|
|
rate: estimator.New(time.Second),
|
|
}
|
|
conn.tracks = append(conn.tracks, track)
|
|
|
|
go rtcpDownListener(conn, track, s)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bool) error {
|
|
options := webrtc.OfferOptions{ICERestart: restartIce}
|
|
offer, err := down.pc.CreateOffer(&options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = down.pc.SetLocalDescription(offer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
labels := make(map[string]string)
|
|
for _, t := range down.pc.GetTransceivers() {
|
|
var track *webrtc.Track
|
|
if t.Sender() != nil {
|
|
track = t.Sender().Track()
|
|
}
|
|
if track == nil {
|
|
continue
|
|
}
|
|
|
|
for _, tr := range down.tracks {
|
|
if tr.track == track {
|
|
labels[t.Mid()] = tr.remote.Label()
|
|
}
|
|
}
|
|
}
|
|
|
|
kind := ""
|
|
if renegotiate {
|
|
kind = "renegotiate"
|
|
}
|
|
|
|
return c.write(clientMessage{
|
|
Type: "offer",
|
|
Kind: kind,
|
|
Id: down.id,
|
|
Offer: &offer,
|
|
Labels: labels,
|
|
})
|
|
}
|
|
|
|
func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error {
|
|
if candidate == nil {
|
|
return nil
|
|
}
|
|
cand := candidate.ToJSON()
|
|
return c.write(clientMessage{
|
|
Type: "ice",
|
|
Id: id,
|
|
Candidate: &cand,
|
|
})
|
|
}
|
|
|
|
func gotOffer(c *webClient, id string, offer webrtc.SessionDescription, renegotiate bool, labels map[string]string) error {
|
|
if !renegotiate {
|
|
// unless the client indicates that this is a compatible
|
|
// renegotiation, tear down the existing connection.
|
|
delUpConn(c, id)
|
|
}
|
|
|
|
up, isnew, err := addUpConn(c, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if u := c.Username(); u != "" {
|
|
up.label = u
|
|
}
|
|
err = up.pc.SetRemoteDescription(offer)
|
|
if err != nil {
|
|
if renegotiate && !isnew {
|
|
// create a new PC from scratch
|
|
log.Printf("SetRemoteDescription(offer): %v", err)
|
|
return gotOffer(c, id, offer, false, labels)
|
|
}
|
|
return err
|
|
}
|
|
|
|
answer, err := up.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = up.pc.SetLocalDescription(answer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
up.labels = labels
|
|
|
|
err = up.flushICECandidates()
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
|
|
return c.write(clientMessage{
|
|
Type: "answer",
|
|
Id: id,
|
|
Answer: &answer,
|
|
})
|
|
}
|
|
|
|
var ErrUnknownId = errors.New("unknown id")
|
|
|
|
func gotAnswer(c *webClient, id string, answer webrtc.SessionDescription) error {
|
|
down := getDownConn(c, id)
|
|
if down == nil {
|
|
return ErrUnknownId
|
|
}
|
|
err := down.pc.SetRemoteDescription(answer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = down.flushICECandidates()
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
|
|
add := func() {
|
|
down.pc.OnConnectionStateChange(nil)
|
|
for _, t := range down.tracks {
|
|
t.remote.AddLocal(t)
|
|
}
|
|
}
|
|
down.pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
|
if state == webrtc.PeerConnectionStateConnected {
|
|
add()
|
|
}
|
|
})
|
|
if down.pc.ConnectionState() == webrtc.PeerConnectionStateConnected {
|
|
add()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error {
|
|
conn := getConn(c, id)
|
|
if conn == nil {
|
|
return errors.New("unknown id in ICE")
|
|
}
|
|
return conn.addICECandidate(candidate)
|
|
}
|
|
|
|
func (c *webClient) setRequested(requested map[string]uint32) error {
|
|
if c.down != nil {
|
|
down := make([]string, 0, len(c.down))
|
|
for id := range c.down {
|
|
down = append(down, id)
|
|
}
|
|
for _, id := range down {
|
|
c.write(clientMessage{
|
|
Type: "close",
|
|
Id: id,
|
|
})
|
|
delDownConn(c, id)
|
|
}
|
|
}
|
|
|
|
c.requested = requested
|
|
|
|
go pushConns(c, c.group)
|
|
return nil
|
|
}
|
|
|
|
func pushConns(c group.Client, g *group.Group) {
|
|
clients := g.GetClients(c)
|
|
for _, cc := range clients {
|
|
ccc, ok := cc.(*webClient)
|
|
if ok {
|
|
ccc.action(pushConnsAction{g, c})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *webClient) isRequested(label string) bool {
|
|
return c.requested[label] != 0
|
|
}
|
|
|
|
func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rtpDownConnection, error) {
|
|
requested := false
|
|
for _, t := range tracks {
|
|
if c.isRequested(t.Label()) {
|
|
requested = true
|
|
break
|
|
}
|
|
}
|
|
if !requested {
|
|
return nil, nil
|
|
}
|
|
|
|
down, err := addDownConn(c, remote.Id(), remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, t := range tracks {
|
|
if !c.isRequested(t.Label()) {
|
|
continue
|
|
}
|
|
_, err = addDownTrack(c, down, t, remote)
|
|
if err != nil {
|
|
delDownConn(c, down.id)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
go rtcpDownSender(down)
|
|
|
|
return down, nil
|
|
}
|
|
|
|
func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, label string) error {
|
|
err := c.action(pushConnAction{g, id, up, tracks})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if up != nil && label != "" {
|
|
err := c.action(addLabelAction{up.Id(), up.Label()})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func readMessage(conn *websocket.Conn, m *clientMessage) error {
|
|
err := conn.SetReadDeadline(time.Now().Add(15 * time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.SetReadDeadline(time.Time{})
|
|
|
|
return conn.ReadJSON(&m)
|
|
}
|
|
|
|
func StartClient(conn *websocket.Conn) (err error) {
|
|
var m clientMessage
|
|
|
|
err = readMessage(conn, &m)
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
if m.Type != "handshake" {
|
|
conn.WriteMessage(websocket.CloseMessage,
|
|
websocket.FormatCloseMessage(
|
|
websocket.CloseProtocolError,
|
|
"you must handshake first",
|
|
),
|
|
)
|
|
conn.Close()
|
|
err = group.ProtocolError("client didn't handshake")
|
|
return
|
|
}
|
|
|
|
c := &webClient{
|
|
id: m.Id,
|
|
actionCh: make(chan interface{}, 10),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
defer close(c.done)
|
|
|
|
c.writeCh = make(chan interface{}, 25)
|
|
c.writerDone = make(chan struct{})
|
|
go clientWriter(conn, c.writeCh, c.writerDone)
|
|
defer func() {
|
|
m, e := errorToWSCloseMessage(c.id, err)
|
|
if isWSNormalError(err) {
|
|
err = nil
|
|
} else if _, ok := err.(group.KickError); ok {
|
|
err = nil
|
|
}
|
|
if m != nil {
|
|
c.write(*m)
|
|
}
|
|
c.close(e)
|
|
}()
|
|
|
|
return clientLoop(c, conn)
|
|
}
|
|
|
|
type pushConnAction struct {
|
|
group *group.Group
|
|
id string
|
|
conn conn.Up
|
|
tracks []conn.UpTrack
|
|
}
|
|
|
|
type addLabelAction struct {
|
|
id string
|
|
label string
|
|
}
|
|
|
|
type pushConnsAction struct {
|
|
group *group.Group
|
|
client group.Client
|
|
}
|
|
|
|
type connectionFailedAction struct {
|
|
id string
|
|
}
|
|
|
|
type permissionsChangedAction struct{}
|
|
|
|
type kickAction struct {
|
|
id string
|
|
username string
|
|
message string
|
|
}
|
|
|
|
func clientLoop(c *webClient, ws *websocket.Conn) error {
|
|
read := make(chan interface{}, 1)
|
|
go clientReader(ws, read, c.done)
|
|
|
|
defer leaveGroup(c)
|
|
|
|
readTime := time.Now()
|
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case m, ok := <-read:
|
|
if !ok {
|
|
return errors.New("reader died")
|
|
}
|
|
switch m := m.(type) {
|
|
case clientMessage:
|
|
readTime = time.Now()
|
|
err := handleClientMessage(c, m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case error:
|
|
return m
|
|
}
|
|
case a := <-c.actionCh:
|
|
switch a := a.(type) {
|
|
case pushConnAction:
|
|
g := c.group
|
|
if g == nil || a.group != g {
|
|
return nil
|
|
}
|
|
if a.conn == nil {
|
|
found := delDownConn(c, a.id)
|
|
if found {
|
|
c.write(clientMessage{
|
|
Type: "close",
|
|
Id: a.id,
|
|
})
|
|
} else {
|
|
log.Printf("Deleting unknown " +
|
|
"down connection")
|
|
}
|
|
continue
|
|
}
|
|
down, err := addDownConnTracks(
|
|
c, a.conn, a.tracks,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if down != nil {
|
|
err = negotiate(c, down, false, false)
|
|
if err != nil {
|
|
delDownConn(c, down.id)
|
|
c.error(group.UserError(
|
|
"Negotiation failed",
|
|
))
|
|
continue
|
|
}
|
|
}
|
|
case addLabelAction:
|
|
label := a.label
|
|
c.write(clientMessage{
|
|
Type: "label",
|
|
Id: a.id,
|
|
Value: &label,
|
|
})
|
|
case pushConnsAction:
|
|
g := c.group
|
|
if g == nil || a.group != g {
|
|
return nil
|
|
}
|
|
for _, u := range c.up {
|
|
if !u.complete() {
|
|
continue
|
|
}
|
|
tracks := u.getTracks()
|
|
ts := make([]conn.UpTrack, len(tracks))
|
|
for i, t := range tracks {
|
|
ts[i] = t
|
|
}
|
|
go func() {
|
|
err := a.client.PushConn(
|
|
g, u.id, u, ts, u.label,
|
|
)
|
|
if err != nil {
|
|
log.Printf(
|
|
"PushConn: %v",
|
|
err,
|
|
)
|
|
}
|
|
}()
|
|
}
|
|
case connectionFailedAction:
|
|
if down := getDownConn(c, a.id); down != nil {
|
|
err := negotiate(c, down, true, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tracks := make(
|
|
[]conn.UpTrack, len(down.tracks),
|
|
)
|
|
for i, t := range down.tracks {
|
|
tracks[i] = t.remote
|
|
}
|
|
go c.PushConn(
|
|
c.group,
|
|
down.remote.Id(), down.remote,
|
|
tracks, down.remote.Label(),
|
|
)
|
|
} else if up := getUpConn(c, a.id); up != nil {
|
|
c.write(clientMessage{
|
|
Type: "renegotiate",
|
|
Id: a.id,
|
|
})
|
|
} else {
|
|
log.Printf("Attempting to renegotiate " +
|
|
"unknown connection")
|
|
}
|
|
|
|
case permissionsChangedAction:
|
|
group := c.Group()
|
|
if group == nil {
|
|
return errors.New("Permissions changed in no group")
|
|
}
|
|
perms := c.permissions
|
|
c.write(clientMessage{
|
|
Type: "joined",
|
|
Kind: "change",
|
|
Group: group.Name(),
|
|
Permissions: &perms,
|
|
})
|
|
if !c.permissions.Present {
|
|
up := getUpConns(c)
|
|
for _, u := range up {
|
|
found := delUpConn(c, u.id)
|
|
if found {
|
|
failUpConnection(
|
|
c, u.id,
|
|
"permission denied",
|
|
)
|
|
}
|
|
}
|
|
}
|
|
case kickAction:
|
|
return group.KickError{
|
|
a.id, a.username, a.message,
|
|
}
|
|
default:
|
|
log.Printf("unexpected action %T", a)
|
|
return errors.New("unexpected action")
|
|
}
|
|
case <-ticker.C:
|
|
if time.Since(readTime) > 90*time.Second {
|
|
return errors.New("client is dead")
|
|
}
|
|
if time.Since(readTime) > 60*time.Second {
|
|
err := c.write(clientMessage{
|
|
Type: "ping",
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func failUpConnection(c *webClient, id string, message string) error {
|
|
if id != "" {
|
|
err := c.write(clientMessage{
|
|
Type: "abort",
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if message != "" {
|
|
err := c.error(group.UserError(message))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func leaveGroup(c *webClient) {
|
|
if c.group == nil {
|
|
return
|
|
}
|
|
|
|
c.setRequested(map[string]uint32{})
|
|
if c.up != nil {
|
|
for id := range c.up {
|
|
delUpConn(c, id)
|
|
}
|
|
}
|
|
|
|
group.DelClient(c)
|
|
c.permissions = group.ClientPermissions{}
|
|
c.group = nil
|
|
}
|
|
|
|
|
|
func failDownConnection(c *webClient, id string, message string) error {
|
|
if id != "" {
|
|
err := c.write(clientMessage{
|
|
Type: "close",
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if message != "" {
|
|
err := c.error(group.UserError(message))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func setPermissions(g *group.Group, id string, perm string) error {
|
|
client := g.GetClient(id)
|
|
if client == nil {
|
|
return group.UserError("no such user")
|
|
}
|
|
|
|
c, ok := client.(*webClient)
|
|
if !ok {
|
|
return group.UserError("this is not a real user")
|
|
}
|
|
|
|
switch perm {
|
|
case "op":
|
|
c.permissions.Op = true
|
|
if g.AllowRecording() {
|
|
c.permissions.Record = true
|
|
}
|
|
case "unop":
|
|
c.permissions.Op = false
|
|
c.permissions.Record = false
|
|
case "present":
|
|
c.permissions.Present = true
|
|
case "unpresent":
|
|
c.permissions.Present = false
|
|
default:
|
|
return group.UserError("unknown permission")
|
|
}
|
|
return c.action(permissionsChangedAction{})
|
|
}
|
|
|
|
func (c *webClient) Kick(id, user, message string) error {
|
|
return c.action(kickAction{id, user, message})
|
|
}
|
|
|
|
func kickClient(g *group.Group, id, user, dest string, message string) error {
|
|
client := g.GetClient(dest)
|
|
if client == nil {
|
|
return group.UserError("no such user")
|
|
}
|
|
|
|
c, ok := client.(group.Kickable)
|
|
if !ok {
|
|
return group.UserError("this client is not kickable")
|
|
}
|
|
|
|
return c.Kick(id, user, message)
|
|
}
|
|
|
|
func handleClientMessage(c *webClient, m clientMessage) error {
|
|
switch m.Type {
|
|
case "join":
|
|
if m.Kind == "leave" {
|
|
if c.group == nil || c.group.Name() != m.Group {
|
|
return group.ProtocolError("you are not joined")
|
|
}
|
|
leaveGroup(c)
|
|
perms := c.permissions
|
|
return c.write(clientMessage{
|
|
Type: "joined",
|
|
Kind: "leave",
|
|
Group: m.Group,
|
|
Permissions: &perms,
|
|
})
|
|
}
|
|
|
|
if m.Kind != "join" {
|
|
return group.ProtocolError("unknown kind")
|
|
}
|
|
|
|
if c.group != nil {
|
|
return group.ProtocolError("cannot join multiple groups")
|
|
}
|
|
c.username = m.Username
|
|
c.password = m.Password
|
|
g, err := group.AddClient(m.Group, c)
|
|
if err != nil {
|
|
var s string
|
|
if os.IsNotExist(err) {
|
|
s = "group does not exist"
|
|
} else if err == group.ErrNotAuthorised {
|
|
s = "not authorised"
|
|
time.Sleep(200 * time.Millisecond)
|
|
} else if e, ok := err.(group.UserError); ok {
|
|
s = string(e)
|
|
} else {
|
|
s = "internal server error"
|
|
log.Printf("Join group: %v")
|
|
}
|
|
return c.write(clientMessage{
|
|
Type: "joined",
|
|
Kind: "fail",
|
|
Group: m.Group,
|
|
Permissions: &group.ClientPermissions{},
|
|
Value: &s,
|
|
})
|
|
}
|
|
if redirect := g.Redirect(); redirect != "" {
|
|
// We normally redirect at the HTTP level, but the group
|
|
// description could have been edited in the meantime.
|
|
return c.write(clientMessage{
|
|
Type: "joined",
|
|
Kind: "redirect",
|
|
Group: m.Group,
|
|
Permissions: &group.ClientPermissions{},
|
|
Value: &redirect,
|
|
})
|
|
}
|
|
c.group = g
|
|
perms := c.permissions
|
|
err = c.write(clientMessage{
|
|
Type: "joined",
|
|
Kind: "join",
|
|
Group: m.Group,
|
|
Permissions: &perms,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
h := c.group.GetChatHistory()
|
|
for _, m := range h {
|
|
message := m.Value
|
|
err := c.write(clientMessage{
|
|
Type: "chat",
|
|
Id: m.Id,
|
|
Username: m.User,
|
|
Time: m.Time,
|
|
Value: &message,
|
|
Kind: m.Kind,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
case "request":
|
|
return c.setRequested(m.Request)
|
|
case "offer":
|
|
if !c.permissions.Present {
|
|
c.write(clientMessage{
|
|
Type: "abort",
|
|
Id: m.Id,
|
|
})
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
if m.Offer == nil {
|
|
return group.ProtocolError("null offer")
|
|
}
|
|
err := gotOffer(
|
|
c, m.Id, *m.Offer, m.Kind == "renegotiate", m.Labels,
|
|
)
|
|
if err != nil {
|
|
log.Printf("gotOffer: %v", err)
|
|
return failUpConnection(c, m.Id, "negotiation failed")
|
|
}
|
|
case "answer":
|
|
if m.Answer == nil {
|
|
return group.ProtocolError("null answer")
|
|
}
|
|
err := gotAnswer(c, m.Id, *m.Answer)
|
|
if err != nil {
|
|
log.Printf("gotAnswer: %v", err)
|
|
message := ""
|
|
if err != ErrUnknownId {
|
|
message = "negotiation failed"
|
|
}
|
|
return failDownConnection(c, m.Id, message)
|
|
}
|
|
case "renegotiate":
|
|
down := getDownConn(c, m.Id)
|
|
if down != nil {
|
|
err := negotiate(c, down, true, true)
|
|
if err != nil {
|
|
return failDownConnection(c, m.Id,
|
|
"renegotiation failed")
|
|
}
|
|
} else {
|
|
log.Printf("Trying to renegotiate unknown connection")
|
|
}
|
|
case "close":
|
|
found := delUpConn(c, m.Id)
|
|
if !found {
|
|
log.Printf("Deleting unknown up connection %v", m.Id)
|
|
}
|
|
case "ice":
|
|
if m.Candidate == nil {
|
|
return group.ProtocolError("null candidate")
|
|
}
|
|
err := gotICE(c, m.Candidate, m.Id)
|
|
if err != nil {
|
|
log.Printf("ICE: %v", err)
|
|
}
|
|
case "chat", "usermessage":
|
|
if m.Id != c.id {
|
|
return group.ProtocolError("wrong sender id")
|
|
}
|
|
if m.Username != "" && m.Username != c.username {
|
|
return group.ProtocolError("wrong sender username")
|
|
}
|
|
g := c.group
|
|
if g == nil {
|
|
return c.error(group.UserError("join a group first"))
|
|
}
|
|
|
|
tm := group.ToJSTime(time.Now())
|
|
|
|
if m.Type == "chat" {
|
|
if m.Value == nil {
|
|
return group.ProtocolError("missing value")
|
|
}
|
|
if m.Dest == "" {
|
|
g.AddToChatHistory(
|
|
m.Id, m.Username, tm, m.Kind, *m.Value,
|
|
)
|
|
}
|
|
}
|
|
mm := clientMessage{
|
|
Type: m.Type,
|
|
Id: m.Id,
|
|
Dest: m.Dest,
|
|
Username: m.Username,
|
|
Priviledged: c.permissions.Op,
|
|
Time: tm,
|
|
Kind: m.Kind,
|
|
Value: m.Value,
|
|
}
|
|
if m.Dest == "" {
|
|
clients := g.GetClients(nil)
|
|
for _, cc := range clients {
|
|
ccc, ok := cc.(*webClient)
|
|
if ok {
|
|
ccc.write(mm)
|
|
}
|
|
}
|
|
} else {
|
|
cc := g.GetClient(m.Dest)
|
|
if cc == nil {
|
|
return c.error(group.UserError("user unknown"))
|
|
}
|
|
ccc, ok := cc.(*webClient)
|
|
if !ok {
|
|
return c.error(group.UserError("this user doesn't chat"))
|
|
}
|
|
ccc.write(mm)
|
|
}
|
|
case "groupaction":
|
|
if m.Id != c.id {
|
|
return group.ProtocolError("wrong sender id")
|
|
}
|
|
if m.Username != "" && m.Username != c.username {
|
|
return group.ProtocolError("wrong sender username")
|
|
}
|
|
g := c.group
|
|
if g == nil {
|
|
return c.error(group.UserError("join a group first"))
|
|
}
|
|
switch m.Kind {
|
|
case "clearchat":
|
|
g.ClearChatHistory()
|
|
m := clientMessage{Type: "clearchat"}
|
|
clients := g.GetClients(nil)
|
|
for _, cc := range clients {
|
|
cc, ok := cc.(*webClient)
|
|
if ok {
|
|
cc.write(m)
|
|
}
|
|
}
|
|
case "lock", "unlock":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
message := ""
|
|
if m.Value != nil {
|
|
message = *m.Value
|
|
}
|
|
g.SetLocked(m.Kind == "lock", message)
|
|
case "record":
|
|
if !c.permissions.Record {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
for _, cc := range g.GetClients(c) {
|
|
_, ok := cc.(*diskwriter.Client)
|
|
if ok {
|
|
return c.error(group.UserError("already recording"))
|
|
}
|
|
}
|
|
disk := diskwriter.New(g)
|
|
_, err := group.AddClient(g.Name(), disk)
|
|
if err != nil {
|
|
disk.Close()
|
|
return c.error(err)
|
|
}
|
|
go pushConns(disk, c.group)
|
|
case "unrecord":
|
|
if !c.permissions.Record {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
for _, cc := range g.GetClients(c) {
|
|
disk, ok := cc.(*diskwriter.Client)
|
|
if ok {
|
|
disk.Close()
|
|
group.DelClient(disk)
|
|
}
|
|
}
|
|
case "subgroups":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
s := ""
|
|
for _, sg := range group.GetSubGroups(g.Name()) {
|
|
plural := ""
|
|
if sg.Clients > 1 {
|
|
plural = "s"
|
|
}
|
|
s = s + fmt.Sprintf("%v (%v client%v)",
|
|
sg.Name, sg.Clients, plural)
|
|
}
|
|
c.write(clientMessage{
|
|
Type: "chat",
|
|
Dest: c.id,
|
|
Username: "Server",
|
|
Time: group.ToJSTime(time.Now()),
|
|
Value: &s,
|
|
})
|
|
default:
|
|
return group.ProtocolError("unknown group action")
|
|
}
|
|
case "useraction":
|
|
if m.Id != c.id {
|
|
return group.ProtocolError("wrong sender id")
|
|
}
|
|
if m.Username != "" && m.Username != c.username {
|
|
return group.ProtocolError("wrong sender username")
|
|
}
|
|
g := c.group
|
|
if g == nil {
|
|
return c.error(group.UserError("join a group first"))
|
|
}
|
|
switch m.Kind {
|
|
case "op", "unop", "present", "unpresent":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
err := setPermissions(g, m.Dest, m.Kind)
|
|
if err != nil {
|
|
return c.error(err)
|
|
}
|
|
case "kick":
|
|
if !c.permissions.Op {
|
|
return c.error(group.UserError("not authorised"))
|
|
}
|
|
message := ""
|
|
if m.Value != nil {
|
|
message = *m.Value
|
|
}
|
|
err := kickClient(g, m.Id, m.Username, m.Dest, message)
|
|
if err != nil {
|
|
return c.error(err)
|
|
}
|
|
default:
|
|
return group.ProtocolError("unknown user action")
|
|
}
|
|
case "pong":
|
|
// nothing
|
|
case "ping":
|
|
return c.write(clientMessage{
|
|
Type: "pong",
|
|
})
|
|
default:
|
|
log.Printf("unexpected message: %v", m.Type)
|
|
return group.ProtocolError("unexpected message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) {
|
|
defer close(read)
|
|
for {
|
|
var m clientMessage
|
|
err := conn.ReadJSON(&m)
|
|
if err != nil {
|
|
select {
|
|
case read <- err:
|
|
return
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
select {
|
|
case read <- m:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) {
|
|
defer func() {
|
|
close(done)
|
|
conn.Close()
|
|
}()
|
|
|
|
for {
|
|
m, ok := <-ch
|
|
if !ok {
|
|
break
|
|
}
|
|
err := conn.SetWriteDeadline(
|
|
time.Now().Add(2 * time.Second))
|
|
if err != nil {
|
|
return
|
|
}
|
|
switch m := m.(type) {
|
|
case clientMessage:
|
|
err := conn.WriteJSON(m)
|
|
if err != nil {
|
|
return
|
|
}
|
|
case closeMessage:
|
|
if m.data != nil {
|
|
conn.WriteMessage(
|
|
websocket.CloseMessage,
|
|
m.data,
|
|
)
|
|
}
|
|
return
|
|
default:
|
|
log.Printf("clientWiter: unexpected message %T", m)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var ErrClientDead = errors.New("client is dead")
|
|
|
|
func (c *webClient) action(m interface{}) error {
|
|
select {
|
|
case c.actionCh <- m:
|
|
return nil
|
|
case <-c.done:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func (c *webClient) write(m clientMessage) error {
|
|
select {
|
|
case c.writeCh <- m:
|
|
return nil
|
|
case <-c.writerDone:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func (c *webClient) close(data []byte) error {
|
|
select {
|
|
case c.writeCh <- closeMessage{data}:
|
|
return nil
|
|
case <-c.writerDone:
|
|
return ErrClientDead
|
|
}
|
|
}
|
|
|
|
func errorMessage(id string, err error) *clientMessage {
|
|
switch e := err.(type) {
|
|
case group.UserError:
|
|
message := e.Error()
|
|
return &clientMessage{
|
|
Type: "usermessage",
|
|
Kind: "error",
|
|
Dest: id,
|
|
Priviledged: true,
|
|
Value: &message,
|
|
}
|
|
case group.KickError:
|
|
message := e.Message
|
|
if message == "" {
|
|
message = "you have been kicked out"
|
|
}
|
|
return &clientMessage{
|
|
Type: "usermessage",
|
|
Kind: "error",
|
|
Id: e.Id,
|
|
Username: e.Username,
|
|
Dest: id,
|
|
Priviledged: true,
|
|
Value: &message,
|
|
}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (c *webClient) error(err error) error {
|
|
m := errorMessage(c.id, err)
|
|
if m == nil {
|
|
return err
|
|
}
|
|
return c.write(*m)
|
|
}
|