1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-13 20:25:57 +01:00
galene/client.go

1008 lines
20 KiB
Go
Raw Normal View History

2020-04-24 19:38:21 +02:00
// Copyright (c) 2020 by Juliusz Chroboczek.
// This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist.
package main
import (
"encoding/json"
"errors"
"io"
"log"
"os"
2020-04-25 19:59:53 +02:00
"strings"
2020-04-24 19:38:21 +02:00
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/pion/rtcp"
"github.com/pion/rtp"
2020-04-24 19:38:21 +02:00
"github.com/pion/sdp"
"github.com/pion/webrtc/v2"
)
var iceConf webrtc.Configuration
var iceOnce sync.Once
func iceConfiguration() webrtc.Configuration {
iceOnce.Do(func() {
var iceServers []webrtc.ICEServer
file, err := os.Open(iceFilename)
if err != nil {
log.Printf("Open %v: %v", iceFilename, err)
return
}
defer file.Close()
d := json.NewDecoder(file)
err = d.Decode(&iceServers)
if err != nil {
log.Printf("Get ICE configuration: %v", err)
return
}
iceConf = webrtc.Configuration{
ICEServers: iceServers,
}
})
return iceConf
}
type protocolError string
func (err protocolError) Error() string {
return string(err)
}
type userError string
func (err userError) Error() string {
return string(err)
}
func errorToWSCloseMessage(err error) (string, []byte) {
2020-04-24 19:38:21 +02:00
var code int
var text string
switch e := err.(type) {
case *websocket.CloseError:
code = websocket.CloseNormalClosure
case protocolError:
code = websocket.CloseProtocolError
text = string(e)
case userError:
code = websocket.CloseNormalClosure
text = string(e)
2020-04-24 19:38:21 +02:00
default:
code = websocket.CloseInternalServerErr
}
return text, websocket.FormatCloseMessage(code, text)
2020-04-24 19:38:21 +02:00
}
func isWSNormalError(err error) bool {
return websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway)
}
type clientMessage struct {
Type string `json:"type"`
Id string `json:"id,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
2020-04-25 02:25:51 +02:00
Permissions userPermission `json:"permissions,omitempty"`
Group string `json:"group,omitempty"`
Value string `json:"value,omitempty"`
2020-04-25 20:30:33 +02:00
Me bool `json:"me,omitempty"`
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
Del bool `json:"del,omitempty"`
2020-04-24 19:38:21 +02:00
}
type closeMessage struct {
data []byte
}
func startClient(conn *websocket.Conn) (err error) {
var m clientMessage
2020-04-25 22:49:07 +02:00
err = conn.SetReadDeadline(time.Now().Add(15 * time.Second))
if err != nil {
conn.Close()
return
}
2020-04-24 19:38:21 +02:00
err = conn.ReadJSON(&m)
if err != nil {
conn.Close()
2020-04-24 19:38:21 +02:00
return
}
2020-04-25 22:49:07 +02:00
err = conn.SetReadDeadline(time.Time{})
if err != nil {
conn.Close()
return
}
2020-04-24 19:38:21 +02:00
if m.Type != "handshake" {
conn.Close()
2020-04-24 19:38:21 +02:00
return
}
c := &client{
id: m.Id,
username: m.Username,
actionCh: make(chan interface{}, 10),
done: make(chan struct{}),
}
defer close(c.done)
2020-04-25 21:16:49 +02:00
c.writeCh = make(chan interface{}, 25)
2020-04-24 19:38:21 +02:00
defer func() {
if isWSNormalError(err) {
err = nil
} else {
m, e := errorToWSCloseMessage(err)
if m != "" {
c.write(clientMessage{
Type: "error",
Value: m,
})
}
2020-04-24 19:38:21 +02:00
select {
case c.writeCh <- closeMessage{e}:
2020-04-24 19:38:21 +02:00
case <-c.writerDone:
}
}
close(c.writeCh)
c.writeCh = nil
}()
c.writerDone = make(chan struct{})
go clientWriter(conn, c.writeCh, c.writerDone)
2020-04-25 19:59:53 +02:00
if strings.ContainsRune(m.Username, ' ') {
err = userError("don't put spaces in your username")
return
}
2020-04-25 02:25:51 +02:00
g, users, err := addClient(m.Group, c, m.Username, m.Password)
2020-04-24 19:38:21 +02:00
if err != nil {
return
}
c.group = g
defer delClient(c)
for _, u := range users {
c.write(clientMessage{
Type: "user",
Id: u.id,
Username: u.username,
})
}
clients := g.getClients(nil)
u := clientMessage{
Type: "user",
Id: c.id,
Username: c.username,
}
for _, c := range clients {
c.write(u)
}
defer func() {
clients := g.getClients(c)
u := clientMessage{
Type: "user",
Id: c.id,
Username: c.username,
Del: true,
}
for _, c := range clients {
c.write(u)
}
}()
return clientLoop(c, conn)
}
func getUpConn(c *client, id string) *upConnection {
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.up == nil {
return nil
}
conn := c.up[id]
if conn == nil {
return nil
}
return conn
}
func getUpConns(c *client) []string {
c.mu.Lock()
defer c.mu.Unlock()
up := make([]string, 0, len(c.up))
for id := range c.up {
up = append(up, id)
}
return up
}
2020-04-24 19:38:21 +02:00
func addUpConn(c *client, id string) (*upConnection, error) {
pc, err := groups.api.NewPeerConnection(iceConfiguration())
if err != nil {
return nil, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
},
)
if err != nil {
pc.Close()
return nil, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
},
)
if err != nil {
pc.Close()
return nil, err
}
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
sendICE(c, id, candidate)
})
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
c.mu.Lock()
2020-04-24 19:38:21 +02:00
u, ok := c.up[id]
if !ok {
log.Printf("Unknown connection")
c.mu.Unlock()
2020-04-24 19:38:21 +02:00
return
}
track := &upTrack{
track: remote,
2020-04-26 01:33:18 +02:00
maxBitrate: ^uint64(0),
}
u.tracks = append(u.tracks, track)
done := len(u.tracks) >= u.trackCount
c.mu.Unlock()
2020-04-24 19:38:21 +02:00
clients := c.group.getClients(c)
for _, cc := range clients {
cc.action(addTrackAction{track, u, done})
2020-04-25 02:25:51 +02:00
if done && u.label != "" {
2020-04-24 19:38:21 +02:00
cc.action(addLabelAction{id, u.label})
}
}
go func() {
buf := make([]byte, 1500)
var packet rtp.Packet
var local []*downTrack
var localTime time.Time
2020-04-24 19:38:21 +02:00
for {
now := time.Now()
if now.Sub(localTime) > time.Second/2 {
local = track.getLocal()
localTime = now
}
2020-04-24 19:38:21 +02:00
i, err := remote.Read(buf)
if err != nil {
if err != io.EOF {
log.Printf("%v", err)
}
break
}
err = packet.Unmarshal(buf[:i])
if err != nil {
log.Printf("%v", err)
continue
}
for _, l := range local {
2020-04-28 15:26:50 +02:00
if l.muted() {
continue
}
err := l.track.WriteRTP(&packet)
if err != nil {
log.Printf("%v", err)
}
2020-04-24 19:38:21 +02:00
}
}
}()
})
conn := &upConnection{id: id, pc: pc}
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.up == nil {
c.up = make(map[string]*upConnection)
}
if c.up[id] != nil {
conn.pc.Close()
return nil, errors.New("Adding duplicate connection")
}
c.up[id] = conn
return conn, nil
}
func delUpConn(c *client, id string) {
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.up == nil {
log.Printf("Deleting unknown connection")
return
}
conn := c.up[id]
if conn == nil {
log.Printf("Deleting unknown connection")
return
}
type clientId struct {
client *client
id string
}
cids := make([]clientId, 0)
clients := c.group.getClients(c)
for _, cc := range clients {
cc.mu.Lock()
2020-04-24 19:38:21 +02:00
for _, otherconn := range cc.down {
if otherconn.remote == conn {
cids = append(cids, clientId{cc, otherconn.id})
}
}
cc.mu.Unlock()
2020-04-24 19:38:21 +02:00
}
for _, cid := range cids {
cid.client.action(delConnAction{cid.id})
2020-04-24 19:38:21 +02:00
}
conn.pc.Close()
delete(c.up, id)
}
func getDownConn(c *client, id string) *downConnection {
if c.down == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
conn := c.down[id]
if conn == nil {
return nil
}
return conn
}
func addDownConn(c *client, id string, remote *upConnection) (*downConnection, error) {
pc, err := groups.api.NewPeerConnection(iceConfiguration())
if err != nil {
return nil, err
}
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
sendICE(c, id, candidate)
})
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
log.Printf("Got track on downstream connection")
})
if c.down == nil {
c.down = make(map[string]*downConnection)
}
conn := &downConnection{id: id, pc: pc, remote: remote}
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.down[id] != nil {
conn.pc.Close()
return nil, errors.New("Adding duplicate connection")
}
c.down[id] = conn
return conn, nil
}
func delDownConn(c *client, id string) {
c.mu.Lock()
defer c.mu.Unlock()
2020-04-24 19:38:21 +02:00
if c.down == nil {
log.Printf("Deleting unknown connection")
return
}
2020-04-25 02:25:51 +02:00
conn := c.down[id]
2020-04-24 19:38:21 +02:00
if conn == nil {
log.Printf("Deleting unknown connection")
return
}
for _, track := range conn.tracks {
found := track.remote.delLocal(track)
if !found {
log.Printf("Couldn't find remote track")
}
track.remote = nil
}
2020-04-24 19:38:21 +02:00
conn.pc.Close()
delete(c.down, id)
}
func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConnection) (*downConnection, *webrtc.RTPSender, error) {
2020-04-24 19:38:21 +02:00
conn := getDownConn(c, id)
if conn == nil {
var err error
conn, err = addDownConn(c, id, remoteConn)
2020-04-24 19:38:21 +02:00
if err != nil {
return nil, nil, err
}
}
local, err := conn.pc.NewTrack(
remoteTrack.track.PayloadType(),
remoteTrack.track.SSRC(),
remoteTrack.track.ID(),
remoteTrack.track.Label(),
)
2020-04-24 19:38:21 +02:00
if err != nil {
return nil, nil, err
}
s, err := conn.pc.AddTrack(local)
if err != nil {
return nil, nil, err
}
2020-04-26 01:33:18 +02:00
2020-04-28 15:26:50 +02:00
track := &downTrack{
track: local,
remote: remoteTrack,
maxBitrate: new(timeStampedBitrate),
}
conn.tracks = append(conn.tracks, track)
remoteTrack.addLocal(track)
2020-04-28 17:17:52 +02:00
go rtcpListener(c.group, conn, track, s)
2020-04-24 19:38:21 +02:00
return conn, s, nil
}
2020-04-26 01:33:18 +02:00
var epoch = time.Now()
func msSinceEpoch() uint64 {
return uint64(time.Since(epoch) / time.Millisecond)
}
2020-04-28 17:17:52 +02:00
func rtcpListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) {
2020-04-24 19:38:21 +02:00
for {
ps, err := s.ReadRTCP()
if err != nil {
if err != io.EOF {
log.Printf("ReadRTCP: %v", err)
}
return
}
for _, p := range ps {
switch p := p.(type) {
case *rtcp.PictureLossIndication:
2020-04-28 17:17:52 +02:00
err := sendPLI(conn.remote.pc, p.MediaSSRC)
2020-04-24 19:38:21 +02:00
if err != nil {
log.Printf("sendPLI: %v", err)
2020-04-24 19:38:21 +02:00
}
case *rtcp.ReceiverEstimatedMaximumBitrate:
2020-04-26 01:33:18 +02:00
ms := msSinceEpoch()
// this is racy -- a reader might read the
// data between the two writes. This shouldn't
// matter, we'll recover at the next sample.
atomic.StoreUint64(
2020-04-28 17:17:52 +02:00
&track.maxBitrate.bitrate,
2020-04-26 01:33:18 +02:00
p.Bitrate,
)
atomic.StoreUint64(
2020-04-28 17:17:52 +02:00
&track.maxBitrate.timestamp,
2020-04-26 01:33:18 +02:00
uint64(ms),
)
2020-04-24 19:38:21 +02:00
case *rtcp.ReceiverReport:
default:
log.Printf("RTCP: %T", p)
}
}
}
}
func trackKinds(down *downConnection) (audio bool, video bool) {
if down.pc == nil {
return
}
for _, s := range down.pc.GetSenders() {
track := s.Track()
if track == nil {
continue
}
switch track.Kind() {
case webrtc.RTPCodecTypeAudio:
audio = true
case webrtc.RTPCodecTypeVideo:
video = true
}
}
return
}
func updateUpBitrate(up *upConnection) {
2020-04-26 01:33:18 +02:00
now := msSinceEpoch()
for _, track := range up.tracks {
2020-04-28 15:26:50 +02:00
track.maxBitrate = ^uint64(0)
local := track.getLocal()
for _, l := range local {
ms := atomic.LoadUint64(&l.maxBitrate.timestamp)
bitrate := atomic.LoadUint64(&l.maxBitrate.bitrate)
2020-04-28 15:26:50 +02:00
if now < ms || now > ms + 5000 || bitrate == 0 {
l.setMuted(false)
continue
}
2020-04-28 15:26:50 +02:00
if bitrate < 9600 ||
(l.track.Kind() == webrtc.RTPCodecTypeVideo &&
bitrate < 128000) {
l.setMuted(true)
continue
}
l.setMuted(false)
if track.maxBitrate > bitrate {
track.maxBitrate = bitrate
2020-04-24 19:38:21 +02:00
}
}
}
2020-04-24 19:38:21 +02:00
}
2020-04-26 01:33:18 +02:00
func sendPLI(pc *webrtc.PeerConnection, ssrc uint32) error {
return pc.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: ssrc},
2020-04-24 19:38:21 +02:00
})
}
2020-04-26 01:33:18 +02:00
func sendREMB(pc *webrtc.PeerConnection, ssrc uint32, bitrate uint64) error {
return pc.WriteRTCP([]rtcp.Packet{
&rtcp.ReceiverEstimatedMaximumBitrate{
Bitrate: bitrate,
2020-04-27 03:08:03 +02:00
SSRCs: []uint32{ssrc},
2020-04-26 01:33:18 +02:00
},
})
}
2020-04-24 19:38:21 +02:00
func countMediaStreams(data string) (int, error) {
desc := sdp.NewJSEPSessionDescription(false)
err := desc.Unmarshal(data)
if err != nil {
return 0, err
}
return len(desc.MediaDescriptions), nil
}
func negotiate(c *client, id string, pc *webrtc.PeerConnection) error {
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
err = pc.SetLocalDescription(offer)
if err != nil {
return err
}
return c.write(clientMessage{
Type: "offer",
Id: id,
Offer: &offer,
})
}
func sendICE(c *client, id string, candidate *webrtc.ICECandidate) error {
if candidate == nil {
return nil
}
cand := candidate.ToJSON()
return c.write(clientMessage{
Type: "ice",
Id: id,
Candidate: &cand,
})
}
func gotOffer(c *client, offer webrtc.SessionDescription, id string) error {
var err error
up, ok := c.up[id]
if !ok {
up, err = addUpConn(c, id)
if err != nil {
return err
}
}
if c.username != "" {
up.label = c.username
}
n, err := countMediaStreams(offer.SDP)
if err != nil {
log.Printf("Couldn't parse SDP: %v", err)
n = 2
}
2020-04-26 00:39:07 +02:00
up.trackCount = n
2020-04-24 19:38:21 +02:00
err = up.pc.SetRemoteDescription(offer)
if err != nil {
return err
}
answer, err := up.pc.CreateAnswer(nil)
if err != nil {
return err
}
err = up.pc.SetLocalDescription(answer)
if err != nil {
return err
}
return c.write(clientMessage{
Type: "answer",
Id: id,
Answer: &answer,
})
}
func gotAnswer(c *client, answer webrtc.SessionDescription, id string) error {
conn := getDownConn(c, id)
if conn == nil {
return protocolError("unknown id in answer")
}
err := conn.pc.SetRemoteDescription(answer)
if err != nil {
return err
}
return nil
}
func gotICE(c *client, candidate *webrtc.ICECandidateInit, id string) error {
var pc *webrtc.PeerConnection
down := getDownConn(c, id)
if down != nil {
pc = down.pc
} else {
up := getUpConn(c, id)
if up == nil {
return errors.New("unknown id in ICE")
}
pc = up.pc
}
return pc.AddICECandidate(*candidate)
}
func clientLoop(c *client, conn *websocket.Conn) error {
read := make(chan interface{}, 1)
go clientReader(conn, read, c.done)
defer func() {
if c.down != nil {
for id := range c.down {
c.write(clientMessage{
Type: "close",
Id: id,
})
delDownConn(c, id)
}
}
if c.up != nil {
for id := range c.up {
delUpConn(c, id)
}
}
}()
g := c.group
2020-04-25 02:25:51 +02:00
c.write(clientMessage{
Type: "permissions",
Permissions: c.permissions,
})
2020-04-24 19:38:21 +02:00
for _, cc := range g.getClients(c) {
cc.action(pushTracksAction{c})
}
2020-04-25 21:16:49 +02:00
h := c.group.getChatHistory()
for _, m := range h {
err := c.write(clientMessage{
Type: "chat",
Id: m.id,
Username: m.user,
Value: m.value,
Me: m.me,
})
if err != nil {
return err
}
}
readTime := time.Now()
2020-04-26 01:33:18 +02:00
ticker := time.NewTicker(time.Second)
2020-04-24 19:38:21 +02:00
defer ticker.Stop()
slowTicker := time.NewTicker(10 * time.Second)
defer slowTicker.Stop()
2020-04-24 19:38:21 +02:00
for {
select {
case m, ok := <-read:
if !ok {
return errors.New("reader died")
}
switch m := m.(type) {
case clientMessage:
readTime = time.Now()
2020-04-24 19:38:21 +02:00
err := handleClientMessage(c, m)
if err != nil {
return err
}
case error:
return m
}
case a := <-c.actionCh:
switch a := a.(type) {
case addTrackAction:
down, _, err :=
addDownTrack(
c, a.remote.id, a.track,
2020-04-24 19:38:21 +02:00
a.remote)
if err != nil {
return err
}
if a.done {
err = negotiate(c, a.remote.id, down.pc)
2020-04-24 19:38:21 +02:00
if err != nil {
return err
}
}
case delConnAction:
2020-04-24 19:38:21 +02:00
c.write(clientMessage{
Type: "close",
Id: a.id,
})
delDownConn(c, a.id)
case addLabelAction:
c.write(clientMessage{
Type: "label",
Id: a.id,
Value: a.label,
})
case pushTracksAction:
for _, u := range c.up {
var done bool
for i, t := range u.tracks {
2020-04-26 00:39:07 +02:00
done = i >= u.trackCount-1
2020-04-24 19:38:21 +02:00
a.c.action(addTrackAction{
t, u, done,
2020-04-24 19:38:21 +02:00
})
}
if done && u.label != "" {
a.c.action(addLabelAction{
u.id, u.label,
})
}
}
case permissionsChangedAction:
2020-04-25 17:36:35 +02:00
c.write(clientMessage{
Type: "permissions",
Permissions: c.permissions,
})
if !c.permissions.Present {
ids := getUpConns(c)
for _, id := range ids {
c.write(clientMessage{
Type: "abort",
Id: id,
})
delUpConn(c, id)
}
}
2020-04-25 17:36:35 +02:00
case kickAction:
return userError("you have been kicked")
2020-04-24 19:38:21 +02:00
default:
log.Printf("unexpected action %T", a)
return errors.New("unexpected action")
}
case <-ticker.C:
sendRateUpdate(c)
case <-slowTicker.C:
if time.Since(readTime) > 90*time.Second {
return errors.New("client is dead")
}
if time.Since(readTime) > 60*time.Second {
err := c.write(clientMessage{
Type: "ping",
})
if err != nil {
return err
}
}
2020-04-24 19:38:21 +02:00
}
}
}
func handleClientMessage(c *client, m clientMessage) error {
switch m.Type {
case "offer":
2020-04-25 02:25:51 +02:00
if !c.permissions.Present {
c.write(clientMessage{
Type: "abort",
Id: m.Id,
})
return c.error(userError("not authorised"))
2020-04-25 02:25:51 +02:00
}
2020-04-24 19:38:21 +02:00
if m.Offer == nil {
return protocolError("null offer")
}
err := gotOffer(c, *m.Offer, m.Id)
if err != nil {
return err
}
case "answer":
if m.Answer == nil {
return protocolError("null answer")
}
err := gotAnswer(c, *m.Answer, m.Id)
if err != nil {
return err
}
case "close":
delUpConn(c, m.Id)
case "ice":
if m.Candidate == nil {
return protocolError("null candidate")
}
err := gotICE(c, m.Candidate, m.Id)
if err != nil {
log.Printf("ICE: %v", err)
}
case "chat":
2020-04-25 21:16:49 +02:00
c.group.addToChatHistory(m.Id, m.Username, m.Value, m.Me)
2020-04-24 19:38:21 +02:00
clients := c.group.getClients(c)
for _, cc := range clients {
cc.write(m)
}
2020-04-25 17:36:35 +02:00
case "op", "unop", "present", "unpresent":
2020-04-25 18:09:31 +02:00
if !c.permissions.Op {
2020-04-25 17:36:35 +02:00
c.error(userError("not authorised"))
return nil
}
err := setPermission(c.group, m.Id, m.Type)
if err != nil {
return c.error(err)
}
case "kick":
2020-04-25 18:09:31 +02:00
if !c.permissions.Op {
2020-04-25 17:36:35 +02:00
c.error(userError("not authorised"))
return nil
}
err := kickClient(c.group, m.Id)
if err != nil {
return c.error(err)
}
case "pong":
// nothing
case "ping":
c.write(clientMessage{
Type: "pong",
})
2020-04-24 19:38:21 +02:00
default:
log.Printf("unexpected message: %v", m.Type)
return protocolError("unexpected message")
}
return nil
}
func sendRateUpdate(c *client) {
type remb struct {
pc *webrtc.PeerConnection
ssrc uint32
bitrate uint64
}
rembs := make([]remb, 0)
c.mu.Lock()
2020-04-24 19:38:21 +02:00
for _, u := range c.up {
updateUpBitrate(u)
for _, t := range u.tracks {
bitrate := t.maxBitrate
2020-04-28 15:26:50 +02:00
if bitrate == ^uint64(0) {
continue
2020-04-26 01:33:18 +02:00
}
2020-04-28 15:26:50 +02:00
rembs = append(rembs,
remb{u.pc, t.track.SSRC(), bitrate})
2020-04-24 19:38:21 +02:00
}
}
c.mu.Unlock()
for _, r := range rembs {
err := sendREMB(r.pc, r.ssrc, r.bitrate)
if err != nil {
log.Printf("sendREMB: %v", err)
}
}
2020-04-24 19:38:21 +02:00
}
func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) {
defer close(read)
for {
var m clientMessage
err := conn.ReadJSON(&m)
if err != nil {
select {
case read <- err:
return
case <-done:
return
}
}
select {
case read <- m:
case <-done:
return
}
}
}
func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) {
defer func() {
close(done)
conn.Close()
}()
for {
m, ok := <-ch
if !ok {
break
}
err := conn.SetWriteDeadline(
time.Now().Add(2 * time.Second))
if err != nil {
return
}
switch m := m.(type) {
case clientMessage:
err := conn.WriteJSON(m)
if err != nil {
return
}
case closeMessage:
err := conn.WriteMessage(websocket.CloseMessage, m.data)
if err != nil {
return
}
default:
log.Printf("clientWiter: unexpected message %T", m)
return
}
}
}