commit f5a518a448ce0d65e2a67e45ff44483d6aa2578f Author: Juliusz Chroboczek Date: Fri Apr 24 19:38:21 2020 +0200 Initial commit. diff --git a/client.go b/client.go new file mode 100644 index 0000000..af4f166 --- /dev/null +++ b/client.go @@ -0,0 +1,841 @@ +// Copyright (c) 2020 by Juliusz Chroboczek. + +// This is not open source software. Copy it, and I'll break into your +// house and tell your three year-old that Santa doesn't exist. + +package main + +import ( + "encoding/json" + "errors" + "io" + "log" + "math" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/pion/rtcp" + "github.com/pion/sdp" + "github.com/pion/webrtc/v2" +) + +var iceConf webrtc.Configuration +var iceOnce sync.Once + +func iceConfiguration() webrtc.Configuration { + iceOnce.Do(func() { + var iceServers []webrtc.ICEServer + file, err := os.Open(iceFilename) + if err != nil { + log.Printf("Open %v: %v", iceFilename, err) + return + } + defer file.Close() + d := json.NewDecoder(file) + err = d.Decode(&iceServers) + if err != nil { + log.Printf("Get ICE configuration: %v", err) + return + } + iceConf = webrtc.Configuration{ + ICEServers: iceServers, + } + }) + + return iceConf +} + +type protocolError string + +func (err protocolError) Error() string { + return string(err) +} + +func errorToWSCloseMessage(err error) []byte { + var code int + var text string + switch e := err.(type) { + case *websocket.CloseError: + code = websocket.CloseNormalClosure + case protocolError: + code = websocket.CloseProtocolError + text = string(e) + default: + code = websocket.CloseInternalServerErr + } + return websocket.FormatCloseMessage(code, text) +} + +func isWSNormalError(err error) bool { + return websocket.IsCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway) +} + +type clientMessage struct { + Type string `json:"type"` + Id string `json:"id,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + Group string `json:"group,omitempty"` + Value string `json:"value,omitempty"` + Me *bool `json:"me,omitempty"` + Offer *webrtc.SessionDescription `json:"offer,omitempty"` + Answer *webrtc.SessionDescription `json:"answer,omitempty"` + Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"` + Del bool `json:"del,omitempty"` + AudioRate int `json:"audiorate,omitempty"` + VideoRate int `json:"audiorate,omitempty"` +} + +type closeMessage struct { + data []byte +} + +func startClient(conn *websocket.Conn) (err error) { + var m clientMessage + err = conn.ReadJSON(&m) + if err != nil { + return + } + if m.Type != "handshake" { + err = protocolError("expected handshake") + return + } + + c := &client{ + id: m.Id, + username: m.Username, + actionCh: make(chan interface{}, 10), + done: make(chan struct{}), + } + + defer close(c.done) + + c.writeCh = make(chan interface{}, 1) + defer func() { + if isWSNormalError(err) { + err = nil + } else { + select { + case c.writeCh <- closeMessage{ + errorToWSCloseMessage(err), + }: + case <-c.writerDone: + } + } + close(c.writeCh) + c.writeCh = nil + }() + + c.writerDone = make(chan struct{}) + go clientWriter(conn, c.writeCh, c.writerDone) + + g, users, err := addClient(m.Group, c) + if err != nil { + return + } + c.group = g + defer delClient(c) + + for _, u := range users { + c.write(clientMessage{ + Type: "user", + Id: u.id, + Username: u.username, + }) + } + + clients := g.getClients(nil) + u := clientMessage{ + Type: "user", + Id: c.id, + Username: c.username, + } + for _, c := range clients { + c.write(u) + } + + defer func() { + clients := g.getClients(c) + u := clientMessage{ + Type: "user", + Id: c.id, + Username: c.username, + Del: true, + } + for _, c := range clients { + c.write(u) + } + }() + + return clientLoop(c, conn) +} + +func getUpConn(c *client, id string) *upConnection { + c.group.mu.Lock() + defer c.group.mu.Unlock() + + if c.up == nil { + return nil + } + conn := c.up[id] + if conn == nil { + return nil + } + return conn +} + +func addUpConn(c *client, id string) (*upConnection, error) { + pc, err := groups.api.NewPeerConnection(iceConfiguration()) + if err != nil { + return nil, err + } + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }, + ) + if err != nil { + pc.Close() + return nil, err + } + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }, + ) + if err != nil { + pc.Close() + return nil, err + } + + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + sendICE(c, id, candidate) + }) + + pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { + local, err := pc.NewTrack( + remote.PayloadType(), + remote.SSRC(), + remote.ID(), + remote.Label()) + if err != nil { + log.Printf("%v", err) + return + } + + c.group.mu.Lock() + u, ok := c.up[id] + if !ok { + log.Printf("Unknown connection") + c.group.mu.Unlock() + return + } + u.pairs = append(u.pairs, trackPair{ + remote: remote, + local: local, + }) + done := len(u.pairs) >= u.streamCount + c.group.mu.Unlock() + + clients := c.group.getClients(c) + for _, cc := range clients { + cc.action(addTrackAction{id, local, u, done}) + if(done && u.label != "") { + cc.action(addLabelAction{id, u.label}) + } + } + + go func() { + buf := make([]byte, 1500) + for { + i, err := remote.Read(buf) + if err != nil { + if err != io.EOF { + log.Printf("%v", err) + } + break + } + + _, err = local.Write(buf[:i]) + if err != nil && err != io.ErrClosedPipe { + log.Printf("%v", err) + } + } + }() + }) + + conn := &upConnection{id: id, pc: pc} + + c.group.mu.Lock() + defer c.group.mu.Unlock() + + if c.up == nil { + c.up = make(map[string]*upConnection) + } + if c.up[id] != nil { + conn.pc.Close() + return nil, errors.New("Adding duplicate connection") + } + c.up[id] = conn + return conn, nil +} + +func delUpConn(c *client, id string) { + c.group.mu.Lock() + defer c.group.mu.Unlock() + + if c.up == nil { + log.Printf("Deleting unknown connection") + return + } + + conn := c.up[id] + if conn == nil { + log.Printf("Deleting unknown connection") + return + } + + type clientId struct { + client *client + id string + } + cids := make([]clientId, 0) + for _, cc := range c.group.clients { + for _, otherconn := range cc.down { + if otherconn.remote == conn { + cids = append(cids, clientId{cc, otherconn.id}) + } + } + } + + for _, cid := range cids { + cid.client.action(delPCAction{cid.id}) + } + + conn.pc.Close() + delete(c.up, id) +} + +func getDownConn(c *client, id string) *downConnection { + if c.down == nil { + return nil + } + + c.group.mu.Lock() + defer c.group.mu.Unlock() + conn := c.down[id] + if conn == nil { + return nil + } + return conn +} + +func addDownConn(c *client, id string, remote *upConnection) (*downConnection, error) { + pc, err := groups.api.NewPeerConnection(iceConfiguration()) + if err != nil { + return nil, err + } + + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + sendICE(c, id, candidate) + }) + + pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { + log.Printf("Got track on downstream connection") + }) + + if c.down == nil { + c.down = make(map[string]*downConnection) + } + conn := &downConnection{id: id, pc: pc, remote: remote} + + c.group.mu.Lock() + defer c.group.mu.Unlock() + if c.down[id] != nil { + conn.pc.Close() + return nil, errors.New("Adding duplicate connection") + } + c.down[id] = conn + return conn, nil +} + +func delDownConn(c *client, id string) { + c.group.mu.Lock() + defer c.group.mu.Unlock() + + if c.down == nil { + log.Printf("Deleting unknown connection") + return + } + conn := c.down[id]; + if conn == nil { + log.Printf("Deleting unknown connection") + return + } + conn.pc.Close() + delete(c.down, id) +} + +func addDownTrack(c *client, id string, track *webrtc.Track, remote *upConnection) (*downConnection, *webrtc.RTPSender, error) { + conn := getDownConn(c, id) + if conn == nil { + var err error + conn, err = addDownConn(c, id, remote) + if err != nil { + return nil, nil, err + } + } + + s, err := conn.pc.AddTrack(track) + if err != nil { + return nil, nil, err + } + + go rtcpListener(c.group, conn, s) + + return conn, s, nil +} + +func rtcpListener(g *group, c *downConnection, s *webrtc.RTPSender) { + for { + ps, err := s.ReadRTCP() + if err != nil { + if err != io.EOF { + log.Printf("ReadRTCP: %v", err) + } + return + } + + for _, p := range ps { + switch p := p.(type) { + case *rtcp.PictureLossIndication: + err := sendPli(g, s.Track(), c.remote) + if err != nil { + log.Printf("sendPli: %v", err) + } + case *rtcp.ReceiverEstimatedMaximumBitrate: + bitrate := uint32(math.MaxInt32) + if p.Bitrate < math.MaxInt32 { + bitrate = uint32(p.Bitrate) + } + atomic.StoreUint32(&c.maxBitrate, bitrate) + case *rtcp.ReceiverReport: + default: + log.Printf("RTCP: %T", p) + } + } + } +} + +func trackKinds(down *downConnection) (audio bool, video bool) { + if down.pc == nil { + return + } + + for _, s := range down.pc.GetSenders() { + track := s.Track() + if track == nil { + continue + } + switch track.Kind() { + case webrtc.RTPCodecTypeAudio: + audio = true + case webrtc.RTPCodecTypeVideo: + video = true + } + } + return +} + +func splitBitrate(bitrate uint32, audio, video bool) (uint32, uint32) { + if audio && !video { + return bitrate, 0 + } + if !audio && video { + return 0, bitrate + } + + if bitrate < 6000 { + return 6000, 0 + } + + if bitrate < 12000 { + return bitrate, 0 + } + audioRate := 8000 + (bitrate-8000)/4 + if audioRate > 96000 { + audioRate = 96000 + } + return audioRate, bitrate - audioRate +} + +func updateBitrate(g *group, up *upConnection) (uint32, uint32) { + audio := uint32(math.MaxInt32) + video := uint32(math.MaxInt32) + g.Range(func(c *client) bool { + for _, down := range c.down { + if down.remote == up { + bitrate := atomic.LoadUint32(&down.maxBitrate) + if bitrate == 0 { + bitrate = 256000 + } else if bitrate < 6000 { + bitrate = 6000 + } + hasAudio, hasVideo := trackKinds(down) + a, v := splitBitrate(bitrate, hasAudio, hasVideo) + if a < audio { + audio = a + } + if v < video { + video = v + } + } + } + return true + }) + up.maxAudioBitrate = audio + up.maxVideoBitrate = video + return audio, video +} + +func sendPli(g *group, local *webrtc.Track, up *upConnection) error { + var track *webrtc.Track + + for _, p := range up.pairs { + if p.local == local { + track = p.remote + break + } + } + + if track == nil { + return errors.New("attempted to send PLI for unknown track") + } + + return up.pc.WriteRTCP([]rtcp.Packet{ + &rtcp.PictureLossIndication{MediaSSRC: track.SSRC()}, + }) +} + +func countMediaStreams(data string) (int, error) { + desc := sdp.NewJSEPSessionDescription(false) + err := desc.Unmarshal(data) + if err != nil { + return 0, err + } + return len(desc.MediaDescriptions), nil +} + +func negotiate(c *client, id string, pc *webrtc.PeerConnection) error { + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + err = pc.SetLocalDescription(offer) + if err != nil { + return err + } + return c.write(clientMessage{ + Type: "offer", + Id: id, + Offer: &offer, + }) +} + +func sendICE(c *client, id string, candidate *webrtc.ICECandidate) error { + if candidate == nil { + return nil + } + cand := candidate.ToJSON() + return c.write(clientMessage{ + Type: "ice", + Id: id, + Candidate: &cand, + }) +} + +func gotOffer(c *client, offer webrtc.SessionDescription, id string) error { + var err error + up, ok := c.up[id] + if !ok { + up, err = addUpConn(c, id) + if err != nil { + return err + } + } + if c.username != "" { + up.label = c.username + } + n, err := countMediaStreams(offer.SDP) + if err != nil { + log.Printf("Couldn't parse SDP: %v", err) + n = 2 + } + up.streamCount = n + err = up.pc.SetRemoteDescription(offer) + if err != nil { + return err + } + + answer, err := up.pc.CreateAnswer(nil) + if err != nil { + return err + } + + err = up.pc.SetLocalDescription(answer) + if err != nil { + return err + } + + return c.write(clientMessage{ + Type: "answer", + Id: id, + Answer: &answer, + }) +} + +func gotAnswer(c *client, answer webrtc.SessionDescription, id string) error { + conn := getDownConn(c, id) + if conn == nil { + return protocolError("unknown id in answer") + } + err := conn.pc.SetRemoteDescription(answer) + if err != nil { + return err + } + return nil +} + +func gotICE(c *client, candidate *webrtc.ICECandidateInit, id string) error { + var pc *webrtc.PeerConnection + down := getDownConn(c, id) + if down != nil { + pc = down.pc + } else { + up := getUpConn(c, id) + if up == nil { + return errors.New("unknown id in ICE") + } + pc = up.pc + } + return pc.AddICECandidate(*candidate) +} + +func clientLoop(c *client, conn *websocket.Conn) error { + read := make(chan interface{}, 1) + go clientReader(conn, read, c.done) + + defer func() { + if c.down != nil { + for id := range c.down { + c.write(clientMessage{ + Type: "close", + Id: id, + }) + delDownConn(c, id) + } + } + + if c.up != nil { + for id := range c.up { + delUpConn(c, id) + } + } + }() + + g := c.group + + for _, cc := range g.getClients(c) { + cc.action(pushTracksAction{c}) + } + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case m, ok := <-read: + if !ok { + return errors.New("reader died") + } + switch m := m.(type) { + case clientMessage: + err := handleClientMessage(c, m) + if err != nil { + return err + } + case error: + return m + } + case a := <-c.actionCh: + switch a := a.(type) { + case addTrackAction: + down, _, err := + addDownTrack( + c, a.id, a.track, + a.remote) + if err != nil { + return err + } + if a.done { + err = negotiate(c, a.id, down.pc) + if err != nil { + return err + } + } + case delPCAction: + c.write(clientMessage{ + Type: "close", + Id: a.id, + }) + delDownConn(c, a.id) + case addLabelAction: + c.write(clientMessage{ + Type: "label", + Id: a.id, + Value: a.label, + }) + case pushTracksAction: + for _, u := range c.up { + var done bool + for i, p := range u.pairs { + done = i >= u.streamCount - 1 + a.c.action(addTrackAction{ + u.id, p.local, u, + done, + }) + } + if done && u.label != "" { + a.c.action(addLabelAction{ + u.id, u.label, + }) + + } + } + default: + log.Printf("unexpected action %T", a) + return errors.New("unexpected action") + } + case <-ticker.C: + sendRateUpdate(c) + } + } +} + +func handleClientMessage(c *client, m clientMessage) error { + switch m.Type { + case "offer": + if m.Offer == nil { + return protocolError("null offer") + } + err := gotOffer(c, *m.Offer, m.Id) + if err != nil { + return err + } + case "answer": + if m.Answer == nil { + return protocolError("null answer") + } + err := gotAnswer(c, *m.Answer, m.Id) + if err != nil { + return err + } + case "close": + delUpConn(c, m.Id) + case "ice": + if m.Candidate == nil { + return protocolError("null candidate") + } + err := gotICE(c, m.Candidate, m.Id) + if err != nil { + log.Printf("ICE: %v", err) + } + case "chat": + clients := c.group.getClients(c) + for _, cc := range clients { + cc.write(m) + } + default: + log.Printf("unexpected message: %v", m.Type) + return protocolError("unexpected message") + } + return nil +} + +func sendRateUpdate(c *client) { + for _, u := range c.up { + oldaudio := u.maxAudioBitrate + oldvideo := u.maxVideoBitrate + audio, video := updateBitrate(c.group, u) + if audio != oldaudio || video != oldvideo { + c.write(clientMessage{ + Type: "maxbitrate", + Id: u.id, + AudioRate: int(audio), + VideoRate: int(video), + }) + } + } +} + +func clientReader(conn *websocket.Conn, read chan<- interface{}, done <-chan struct{}) { + defer close(read) + for { + var m clientMessage + err := conn.ReadJSON(&m) + if err != nil { + select { + case read <- err: + return + case <-done: + return + } + } + select { + case read <- m: + case <-done: + return + } + } +} + +func clientWriter(conn *websocket.Conn, ch <-chan interface{}, done chan<- struct{}) { + defer func() { + close(done) + conn.Close() + }() + + for { + m, ok := <-ch + if !ok { + break + } + err := conn.SetWriteDeadline( + time.Now().Add(2 * time.Second)) + if err != nil { + return + } + switch m := m.(type) { + case clientMessage: + err := conn.WriteJSON(m) + if err != nil { + return + } + case closeMessage: + err := conn.WriteMessage(websocket.CloseMessage, m.data) + if err != nil { + return + } + default: + log.Printf("clientWiter: unexpected message %T", m) + return + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9a2b2a2 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module sfu + +go 1.13 + +require ( + github.com/gorilla/websocket v1.4.2 + github.com/pion/rtcp v1.2.1 + github.com/pion/sdp v1.3.0 + github.com/pion/webrtc/v2 v2.2.5 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9f5f1a6 --- /dev/null +++ b/go.sum @@ -0,0 +1,115 @@ +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= +github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= +github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= +github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pion/datachannel v1.4.16 h1:dvuDC0IBMUDQvwO+gRu0Dv+W5j7rrgNpCmtheb6iYnc= +github.com/pion/datachannel v1.4.16/go.mod h1:gRGhxZv7X2/30Qxes4WEXtimKBXcwj/3WsDtBlHnvJY= +github.com/pion/dtls/v2 v2.0.0-rc.9 h1:wPb0JKmYoleAM2o8vQSPaUM+geJq7l0AdeUlPsg19ec= +github.com/pion/dtls/v2 v2.0.0-rc.9/go.mod h1:6eFkFvpo0T+odQ+39HFEtOO7LX5cUlFqXdSo4ucZtGg= +github.com/pion/dtls/v2 v2.0.0-rc.10 h1:WM+LVyR3f7hfxMLE0zhydwxSesboH/TXDnqv+32uiHo= +github.com/pion/dtls/v2 v2.0.0-rc.10/go.mod h1:VkY5VL2wtsQQOG60xQ4lkV5pdn0wwBBTzCfRJqXhp3A= +github.com/pion/ice v0.7.12 h1:Lsh4f0Uvh/vOCXSyj+w5C736LrKt66qAKeA2LFwSkn0= +github.com/pion/ice v0.7.12/go.mod h1:yLt/9LAJEZXFtnOBdpq5YGaOF9SsDjVGCvzF3MF4k5k= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY= +github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0= +github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA= +github.com/pion/quic v0.1.1/go.mod h1:zEU51v7ru8Mp4AUBJvj6psrSth5eEFNnVQK5K48oV3k= +github.com/pion/rtcp v1.2.1 h1:S3yG4KpYAiSmBVqKAfgRa5JdwBNj4zK3RLUa8JYdhak= +github.com/pion/rtcp v1.2.1/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= +github.com/pion/rtp v1.3.2 h1:Yfzf1mU4Zmg7XWHitzYe2i+l+c68iO+wshzIUW44p1c= +github.com/pion/rtp v1.3.2/go.mod h1:q9wPnA96pu2urCcW/sK/RiDn597bhGoAQQ+y2fDwHuY= +github.com/pion/rtp v1.4.0 h1:EkeHEXKuJhZoRUxtL2Ie80vVg9gBH+poT9UoL8M14nw= +github.com/pion/rtp v1.4.0/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE= +github.com/pion/sctp v1.7.6 h1:8qZTdJtbKfAns/Hv5L0PAj8FyXcsKhMH1pKUCGisQg4= +github.com/pion/sctp v1.7.6/go.mod h1:ichkYQ5tlgCQwEwvgfdcAolqx1nHbYCxo4D7zK/K0X8= +github.com/pion/sdp v1.3.0 h1:21lpgEILHyolpsIrbCBagZaAPj4o057cFjzaFebkVOs= +github.com/pion/sdp v1.3.0/go.mod h1:ceA2lTyftydQTuCIbUNoH77aAt6CiQJaRpssA4Gee8I= +github.com/pion/sdp/v2 v2.3.6 h1:jmhawd6iJy6HeHlAhlXJAGYxnDCighvgeCexm4c3UVk= +github.com/pion/sdp/v2 v2.3.6/go.mod h1:+ZZf35r1+zbaWYiZLfPutWfx58DAWcGb2QsS3D/s9M8= +github.com/pion/srtp v1.3.1 h1:WNDLN41ST0P6cXRpzx97JJW//vChAEo1+Etdqo+UMnM= +github.com/pion/srtp v1.3.1/go.mod h1:nxEytDDGTN+eNKJ1l5gzOCWQFuksgijorsSlgEjc40Y= +github.com/pion/stun v0.3.3 h1:brYuPl9bN9w/VM7OdNzRSLoqsnwlyNvD9MVeJrHjDQw= +github.com/pion/stun v0.3.3/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M= +github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8= +github.com/pion/transport v0.10.0 h1:9M12BSneJm6ggGhJyWpDveFOstJsTiQjkLf4M44rm80= +github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= +github.com/pion/turn/v2 v2.0.3 h1:SJUUIbcPoehlyZgMyIUbBBDhI03sBx32x3JuSIBKBWA= +github.com/pion/turn/v2 v2.0.3/go.mod h1:kl1hmT3NxcLynpXVnwJgObL8C9NaCyPTeqI2DcCpSZs= +github.com/pion/webrtc/v2 v2.2.5 h1:JHHv4fKeBJlVAziqq9QByCiH+g9Jnru9epmanRQqFG4= +github.com/pion/webrtc/v2 v2.2.5/go.mod h1:uiygdBNqK4PfZu2BxjzVV5xkzqiAMlKT4r4NAFXCyqc= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw= +golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a h1:GuSPYbZzB5/dcLNCwLQLsg3obCJtX9IJhpXkvY7kzk0= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/group.go b/group.go new file mode 100644 index 0000000..533c916 --- /dev/null +++ b/group.go @@ -0,0 +1,239 @@ +// Copyright (c) 2020 by Juliusz Chroboczek. + +// This is not open source software. Copy it, and I'll break into your +// house and tell your three year-old that Santa doesn't exist. + +package main + +import ( + "log" + "sync" + + "github.com/pion/webrtc/v2" +) + +type trackPair struct { + remote, local *webrtc.Track +} + +type upConnection struct { + id string + label string + pc *webrtc.PeerConnection + maxAudioBitrate uint32 + maxVideoBitrate uint32 + streamCount int + pairs []trackPair +} + +type downConnection struct { + id string + pc *webrtc.PeerConnection + remote *upConnection + maxBitrate uint32 +} + +type client struct { + group *group + id string + username string + done chan struct{} + writeCh chan interface{} + writerDone chan struct{} + actionCh chan interface{} + down map[string]*downConnection + up map[string]*upConnection +} + +type group struct { + name string + public bool + + mu sync.Mutex + clients []*client +} + +type delPCAction struct { + id string +} + +type addTrackAction struct { + id string + track *webrtc.Track + remote *upConnection + done bool +} + +type addLabelAction struct { + id string + label string +} + +type getUpAction struct { + ch chan<- string +} + +type pushTracksAction struct { + c *client +} + +var groups struct { + mu sync.Mutex + groups map[string]*group + api *webrtc.API +} + +func addGroup(name string) (*group, error) { + groups.mu.Lock() + defer groups.mu.Unlock() + + if groups.groups == nil { + groups.groups = make(map[string]*group) + m := webrtc.MediaEngine{} + m.RegisterCodec(webrtc.NewRTPVP8Codec( + webrtc.DefaultPayloadTypeVP8, 90000)) + m.RegisterCodec(webrtc.NewRTPOpusCodec( + webrtc.DefaultPayloadTypeOpus, 48000)) + groups.api = webrtc.NewAPI( + webrtc.WithMediaEngine(m), + ) + } + + g := groups.groups[name] + + if g == nil { + g = &group{ + name: name, + } + groups.groups[name] = g + } + + return g, nil +} + +func delGroup(name string) bool { + groups.mu.Lock() + defer groups.mu.Unlock() + + g := groups.groups[name] + if g == nil { + return true + } + + if len(g.clients) != 0 { + return false + } + + delete(groups.groups, name) + return true +} + +type userid struct { + id string + username string +} + +func addClient(name string, client *client) (*group, []userid, error) { + g, err := addGroup(name) + if err != nil { + return nil, nil, err + } + + var users []userid + g.mu.Lock() + defer g.mu.Unlock() + for _, c := range g.clients { + users = append(users, userid{c.id, c.username}) + } + g.clients = append(g.clients, client) + return g, users, nil +} + +func delClient(c *client) { + c.group.mu.Lock() + defer c.group.mu.Unlock() + g := c.group + for i, cc := range g.clients { + if cc == c { + g.clients = + append(g.clients[:i], g.clients[i+1:]...) + c.group = nil + return + } + } + log.Printf("Deleting unknown client") + c.group = nil +} + +func (g *group) getClients(except *client) []*client { + g.mu.Lock() + defer g.mu.Unlock() + clients := make([]*client, 0, len(g.clients)) + for _, c := range g.clients { + if c != except { + clients = append(clients, c) + } + } + return clients +} + +func (g *group) Range(f func(c *client) bool) { + g.mu.Lock() + defer g.mu.Unlock() + for _, c := range g.clients { + ok := f(c) + if(!ok){ + break; + } + } +} + +type writerDeadError int + +func (err writerDeadError) Error() string { + return "client writer died" +} + +func (c *client) write(m clientMessage) error { + select { + case c.writeCh <- m: + return nil + case <-c.writerDone: + return writerDeadError(0) + } +} + +type clientDeadError int + +func (err clientDeadError) Error() string { + return "client dead" +} + +func (c *client) action(m interface{}) error { + select { + case c.actionCh <- m: + return nil + case <-c.done: + return clientDeadError(0) + } +} + +type publicGroup struct { + Name string `json:"name"` + ClientCount int `json:"clientCount"` +} + +func getPublicGroups() []publicGroup { + gs := make([]publicGroup, 0) + groups.mu.Lock() + defer groups.mu.Unlock() + for _, g := range groups.groups { + if g.public { + gs = append(gs, publicGroup{ + Name: g.name, + ClientCount: len(g.clients), + }) + } + } + return gs +} diff --git a/sfu.go b/sfu.go new file mode 100644 index 0000000..b0a720a --- /dev/null +++ b/sfu.go @@ -0,0 +1,108 @@ +// Copyright (c) 2020 by Juliusz Chroboczek. + +// This is not open source software. Copy it, and I'll break into your +// house and tell your three year-old that Santa doesn't exist. + +package main + +import ( + "encoding/json" + "flag" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/gorilla/websocket" +) + +var httpAddr string +var staticRoot string +var dataDir string +var iceFilename string + +func main() { + flag.StringVar(&httpAddr, "http", ":8443", "web server `address`") + flag.StringVar(&staticRoot, "static", "./static/", + "web server root `directory`") + flag.StringVar(&dataDir, "data", "./data/", + "data `directory`") + flag.Parse() + iceFilename = filepath.Join(staticRoot, "ice-servers.json") + + http.Handle("/", mungeHandler{http.FileServer(http.Dir(staticRoot))}) + http.HandleFunc("/group/", + func(w http.ResponseWriter, r *http.Request) { + mungeHeader(w) + http.ServeFile(w, r, staticRoot+"/sfu.html") + }) + http.HandleFunc("/ws", wsHandler) + http.HandleFunc("/public-groups.json", publicHandler) + + go func() { + server := &http.Server{ + Addr: httpAddr, + ReadTimeout: 60 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, + } + var err error + log.Printf("Listening on %v", httpAddr) + err = server.ListenAndServeTLS( + filepath.Join(dataDir, "cert.pem"), + filepath.Join(dataDir, "key.pem"), + ) + log.Fatalf("ListenAndServeTLS: %v", err) + }() + + terminate := make(chan os.Signal, 1) + signal.Notify(terminate, syscall.SIGINT) + <-terminate +} + +func mungeHeader(w http.ResponseWriter) { + w.Header().Add("Content-Security-Policy", + "connect-src ws: wss: 'self'; img-src data: 'self'; default-src 'self'") +} + +type mungeHandler struct { + h http.Handler +} + +func (h mungeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + mungeHeader(w) + h.h.ServeHTTP(w, r) +} + +func publicHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + w.Header().Set("cache-control", "no-cache") + + if r.Method == "HEAD" { + return + } + + g := getPublicGroups() + e := json.NewEncoder(w) + e.Encode(g) + return +} + +var upgrader websocket.Upgrader + +func wsHandler(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Websocket upgrade: %v", err) + return + } + go func() { + err := startClient(conn) + if err != nil { + log.Printf("client: %v", err) + } + }() +} diff --git a/static/common.css b/static/common.css new file mode 100644 index 0000000..5222644 --- /dev/null +++ b/static/common.css @@ -0,0 +1,15 @@ +body { + font: 14px "Lato", Arial, sans-serif; +} + +h1 { + font-size: 160%; +} + +.signature { + border-top: solid; + margin-top: 2em; + padding-top: 0em; + border-width: thin; + clear: both; +} diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..cc03697 --- /dev/null +++ b/static/index.html @@ -0,0 +1,32 @@ + + + + SFU + + + + + + +

SFU

+ +
+ + +
+
+ +
+

Public groups

+ +
+
+ + + + + + + + + diff --git a/static/mainpage.css b/static/mainpage.css new file mode 100644 index 0000000..34eeb50 --- /dev/null +++ b/static/mainpage.css @@ -0,0 +1,8 @@ +.groups { +} + +.nogroups { + display: none; +} + + diff --git a/static/mainpage.js b/static/mainpage.js new file mode 100644 index 0000000..2d07971 --- /dev/null +++ b/static/mainpage.js @@ -0,0 +1,54 @@ +// Copyright (c) 2019-2020 by Juliusz Chroboczek. + +// This is not open source software. Copy it, and I'll break into your +// house and tell your three year-old that Santa doesn't exist. + +'use strict'; + +document.getElementById('groupform').onsubmit = function(e) { + e.preventDefault(); + let group = document.getElementById('group').value.trim(); + + location.href = '/group/' + group; +} + +async function listPublicGroups() { + let div = document.getElementById('public-groups'); + let table = document.getElementById('public-groups-table'); + + let l; + try { + l = await (await fetch('/public-groups.json')).json(); + } catch(e) { + console.error(e); + l = []; + } + + if (l.length === 0) { + table.textContent = '(No groups found.)'; + div.classList.remove('groups'); + div.classList.add('nogroups'); + return; + } + + div.classList.remove('nogroups'); + div.classList.add('groups'); + + for(let i = 0; i < l.length; i++) { + let group = l[i]; + let tr = document.createElement('tr'); + let td = document.createElement('td'); + let a = document.createElement('a'); + a.textContent = group.name; + a.href = '/group/' + encodeURIComponent(group.name); + td.appendChild(a); + tr.appendChild(td); + let td2 = document.createElement('td'); + td2.textContent = `(${group.clientCount} clients)`; + tr.appendChild(td2); + table.appendChild(tr); + } +} + + +listPublicGroups(); diff --git a/static/sfu.css b/static/sfu.css new file mode 100644 index 0000000..900bb1b --- /dev/null +++ b/static/sfu.css @@ -0,0 +1,181 @@ +#title { + text-align: center; +} + +h1 { + white-space: nowrap; +} + +#header { + margin-left: 2%; +} + +#statdiv { + white-space: nowrap; + margin-bottom: 2pt; +} + +#errspan { + margin-left: 1em; +} + +.connected { + color: green; +} + +.disconnected { + background-color: red; + font-weight: bold; +} + +.userform { + display: inline +} + +.userform-invisible { + display: none; +} + +.disconnect-invisible { + display: none; +} + +.error { + color: red; + font-weight: bold; +} + +.noerror { + display: none; +} + +#main { + display: flex; +} + +#users { + width: 5%; + margin-left: 2%; + border: 1px solid; +} + +#anonymous-users { + white-space: nowrap; +} + +#chatbox { + width: 100%; +} + +#chat { + display: flex; + width: 20%; + margin-left: 0.3em; + border: 1px solid; + height: 85vh; +} + +#inputform { + display: flex; +} + +#box { + height: 95%; + overflow: auto; +} + +.message, message-me { + margin: 0 0.5em 0 0.5em; +} + +.message-user { + font-weight: bold; +} + +.message-content { + line-height: 1.5; + margin-left: 1em; +} + +.message-me-asterisk { + margin-right: 0.5em; +} + +.message-me-user { + font-weight: bold; + margin-right: 0.5em; +} + +.message-me-content { +} + +#input { + width: 100%; + border: none; + resize: none; + overflow-y: hidden; +} + +#input:focus { + outline: none; +} + +#inputbutton { + background-color: white; + border: none; + margin-right: 0.2em; + font-size: 1.5em; +} + +#inputbutton:focus { + outline: none; +} + +#resizer { + width: 8px; + margin-left: -8px; + z-index: 1000; +} + +#resizer:hover { + cursor: ew-resize; +} + +#peers { + margin-left: 1%; + margin-right: 1%; + white-space: nowrap; + display: flex; + flex-wrap: wrap; + margin-bottom: 4px; +} + +.peer { + display: flex; + flex-direction: column; + margin-right: 5px; + margin-left: 5px; + margin-bottom: 10px; + max-height: 50%; +} + +.media { + height: 400px; + margin: auto; + min-width: 4em; +} + +.label { + text-align: center; + height: 2em; + margin-top: 5px; +} + +#inputform { + width: 100%; +} + +#input { + width: 85%; + border: 1px solid; +} diff --git a/static/sfu.html b/static/sfu.html new file mode 100644 index 0000000..a331a2e --- /dev/null +++ b/static/sfu.html @@ -0,0 +1,57 @@ + + + + SFU + + + + + + +

SFU

+ + + +
+
+
+
+
+
+ + +
+
+
+
+
+ +
+
+ + + + diff --git a/static/sfu.js b/static/sfu.js new file mode 100644 index 0000000..4e88e2a --- /dev/null +++ b/static/sfu.js @@ -0,0 +1,794 @@ +// Copyright (c) 2020 by Juliusz Chroboczek. + +// This is not open source software. Copy it, and I'll break into your +// house and tell your three year-old that Santa doesn't exist. + +'use strict'; + +let myid; + +let group; + +let socket; + +let up = {}, down = {}; + +let iceServers = []; + +function toHex(array) { + let a = new Uint8Array(array); + let s = ''; + function hex(x) { + let h = x.toString(16); + if(h.length < 2) + h = '0' + h; + return h; + } + return a.reduce((x, y) => x + hex(y), ''); +} + +function randomid() { + let a = new Uint8Array(16); + crypto.getRandomValues(a); + return toHex(a); +} + +function Connection(id, pc) { + this.id = id; + this.label = null; + this.pc = pc; + this.stream = null; + this.iceCandidates = []; +} + +Connection.prototype.close = function() { + this.pc.close(); + send({ + type: 'close', + id: this.id, + }); +} + +function setUserPass(username, password) { + window.sessionStorage.setItem( + 'userpass', + JSON.stringify({username: username, password: password}), + ); +} + +function getUserPass() { + let userpass = window.sessionStorage.getItem('userpass'); + if(!userpass) + return null; + return JSON.parse(userpass); +} + +function getUsername() { + let userpass = getUserPass(); + if(!userpass) + return null; + return userpass.username; +} + +function setConnected(connected) { + let statspan = document.getElementById('statspan'); + let userform = document.getElementById('userform'); + let diconnect = document.getElementById('disconnectbutton'); + if(connected) { + statspan.textContent = 'Connected'; + statspan.classList.remove('disconnected'); + statspan.classList.add('connected'); + userform.classList.add('userform-invisible'); + userform.classList.remove('userform'); + disconnectbutton.classList.remove('disconnect-invisible'); + } else { + let userpass = getUserPass(); + document.getElementById('username').value = + userpass ? userpass.username : ''; + document.getElementById('password').value = + userpass ? userpass.password : ''; + statspan.textContent = 'Disconnected'; + statspan.classList.remove('connected'); + statspan.classList.add('disconnected'); + userform.classList.add('userform'); + userform.classList.remove('userform-invisible'); + disconnectbutton.classList.add('disconnect-invisible'); + } +} + +document.getElementById('presenterbox').onchange = function(e) { + e.preventDefault(); + setLocalMedia(); +} + +document.getElementById('sharebox').onchange = function(e) { + e.preventDefault(); + setShareMedia(); +} + +let localMediaId = null; + +async function setLocalMedia() { + if(!getUserPass()) + return; + + if(!document.getElementById('presenterbox').checked) { + if(localMediaId) { + up[localMediaId].close(); + delete(up[localMediaId]); + delMedia(localMediaId) + localMediaId = null; + } + return; + } + + if(!localMediaId) { + let constraints = {audio: true, video: true}; + let opts = {video: true, audio: true}; + let stream = null; + try { + stream = await navigator.mediaDevices.getUserMedia(constraints); + } catch(e) { + console.error(e); + return; + } + localMediaId = await newUpStream(); + + let c = up[localMediaId]; + c.stream = stream; + stream.getTracks().forEach(t => { + c.pc.addTrack(t, stream); + }); + await setMedia(localMediaId); + } +} + +let shareMediaId = null; + +async function setShareMedia() { + if(!getUserPass()) + return; + + if(!document.getElementById('sharebox').checked) { + if(shareMediaId) { + up[shareMediaId].close(); + delete(up[shareMediaId]); + delMedia(shareMediaId) + shareMediaId = null; + } + return; + } + if(!shareMediaId) { + let constraints = {audio: true, video: true}; + let opts = {video: true, audio: true}; + let stream = null; + try { + stream = await navigator.mediaDevices.getDisplayMedia({}); + } catch(e) { + console.error(e); + return; + } + shareMediaId = await newUpStream(); + + let c = up[shareMediaId]; + c.stream = stream; + stream.getTracks().forEach(t => { + c.pc.addTrack(t, stream); + }); + await setMedia(shareMediaId); + } +} + +function setMedia(id) { + let mine = true; + let c = up[id]; + if(!c) { + c = down[id]; + mine = false; + } + if(!c) + throw new Error('Unknown connection'); + + let peersdiv = document.getElementById('peers'); + + let div = document.getElementById('peer-' + id); + if(!div) { + div = document.createElement('div'); + div.id = 'peer-' + id; + div.classList.add('peer'); + peersdiv.appendChild(div); + } + + let media = document.getElementById('media-' + id); + if(!media) { + media = document.createElement('video'); + media.id = 'media-' + id; + media.classList.add('media'); + media.autoplay = true; + media.playsinline = true; + media.controls = true; + if(mine) + media.muted = true; + div.appendChild(media); + } + + let label = document.getElementById('label-' + id); + if(!label) { + label = document.createElement('div'); + label.id = 'label-' + id; + label.classList.add('label'); + div.appendChild(label) + } + + media.srcObject = c.stream; + setLabel(id); +} + +function delMedia(id) { + let mediadiv = document.getElementById('peers'); + let peer = document.getElementById('peer-' + id); + let media = document.getElementById('media-' + id); + + media.srcObject = null; + mediadiv.removeChild(peer); +} + +function setLabel(id) { + let label = document.getElementById('label-' + id); + if(!label) + return; + let l = down[id] ? down[id].label : null; + label.textContent = l ? l : ''; +} + +function serverConnect() { + if(socket) { + socket.close(1000, 'Reconnecting'); + socket = null; + setConnected(false); + } + + try { + socket = new WebSocket( + `ws${location.protocol === 'https:' ? 's' : ''}://${location.host}/ws`, + ); + } catch(e) { + console.error(e); + setConnected(false); + return Promise.reject(e); + } + + return new Promise((resolve, reject) => { + socket.onerror = function(e) { + console.error(e); + reject(e.error ? e.error : e); + }; + socket.onopen = function(e) { + resetUsers(); + setConnected(true); + let up = getUserPass(); + send({ + type: 'handshake', + id: myid, + group: group, + username: up.username, + password: up.password, + }); + resolve(); + }; + socket.onclose = function(e) { + setConnected(false); + document.getElementById('presenterbox').checked = false; + setLocalMedia(); + document.getElementById('sharebox').checked = false; + setShareMedia(); + reject(new Error('websocket close ' + e.code + ' ' + e.reason)); + }; + socket.onmessage = function(e) { + let m = JSON.parse(e.data); + switch(m.type) { + case 'offer': + gotOffer(m.id, m.offer); + break; + case 'answer': + gotAnswer(m.id, m.answer); + break; + case 'close': + gotClose(m.id); + break; + case 'ice': + gotICE(m.id, m.candidate); + break; + case 'maxbitrate': + setMaxBitrate(m.id, m.audiorate, m.videorate); + break; + case 'label': + gotLabel(m.id, m.value); + break; + case 'user': + gotUser(m.id, m.username, m.del); + break; + case 'chat': + addToChatbox(m.id, m.username, m.value, m.me); + break; + default: + console.warn('Unexpected server message', m.type); + return; + } + }; + }); +} + +async function gotOffer(id, offer) { + let c = down[id]; + if(!c) { + let pc = new RTCPeerConnection({ + iceServers: iceServers, + }); + c = new Connection(id, pc); + down[id] = c; + + c.pc.onicecandidate = function(e) { + if(!e.candidate) + return; + send({type: 'ice', + id: id, + candidate: e.candidate, + }); + } + + c.pc.ontrack = function(e) { + c.stream = e.streams[0]; + setMedia(id); + } + } + + await c.pc.setRemoteDescription(offer); + await addIceCandidates(c); + let answer = await c.pc.createAnswer(); + if(!answer) + throw new Error("Didn't create answer") + await c.pc.setLocalDescription(answer); + send({ + type: 'answer', + id: id, + answer: answer, + }); +} + +function gotLabel(id, label) { + let c = down[id]; + if(!c) + throw new Error('Got label for unknown id'); + + c.label = label; + setLabel(id); +} + +async function gotAnswer(id, answer) { + let c = up[id]; + if(!c) + throw new Error('unknown up stream'); + await c.pc.setRemoteDescription(answer); + await addIceCandidates(c); +} + +function gotClose(id) { + let c = down[id]; + if(!c) + throw new Error('unknown down stream'); + delete(down[id]); + c.close(); + delMedia(id); +} + +async function gotICE(id, candidate) { + let conn = up[id]; + if(!conn) + conn = down[id]; + if(!conn) + throw new Error('unknown stream'); + if(conn.pc.remoteDescription) + await conn.pc.addIceCandidate(candidate).catch(console.warn); + else + conn.iceCandidates.push(candidate) +} + +let maxaudiorate, maxvideorate; + +async function setMaxBitrate(id, audio, video) { + let conn = up[id]; + if(!conn) + throw new Error("Setting bitrate of unknown id"); + + let senders = conn.pc.getSenders(); + for(let i = 0; i < senders.length; i++) { + let s = senders[i]; + if(!s.track) + return; + let p = s.getParameters(); + let bitrate; + if(s.track.kind == 'audio') + bitrate = audio; + else if(s.track.kind == 'video') + bitrate = video; + for(let j = 0; j < p.encodings.length; j++) { + let e = p.encodings[j]; + if(bitrate) + e.maxBitrate = bitrate; + else + delete(e.maxBitrate); + await s.setParameters(p); + } + } +} + +async function addIceCandidates(conn) { + let promises = [] + conn.iceCandidates.forEach(c => { + promises.push(conn.pc.addIceCandidate(c).catch(console.warn)); + }); + conn.iceCandidates = []; + return await Promise.all(promises); +} + +function send(m) { + if(!m) + throw(new Error('Sending null message')); + return socket.send(JSON.stringify(m)) +} + +let users = {}; + +function addUser(id, name) { + if(!name) + name = null; + if(id in users) + throw new Error('Duplicate user id'); + users[id] = name; + + let div = document.getElementById('users'); + let anon = document.getElementById('anonymous-users'); + let user = document.createElement('div'); + user.id = 'user-' + id; + user.textContent = name ? name : '(anon)'; + div.appendChild(user); +} + +function delUser(id, name) { + if(!name) + name = null; + if(!id in users) + throw new Error('Unknown user id'); + if(users[id] !== name) + throw new Error('Inconsistent user name'); + delete(users[id]); + let div = document.getElementById('users'); + let user = document.getElementById('user-' + id); + div.removeChild(user); +} + +function resetUsers() { + for(let id in users) + delUser(id, users[id]); +} + +function gotUser(id, name, del) { + if(del) + delUser(id, name); + else + addUser(id, name); +} + +const urlRegexp = /https?:\/\/[-a-zA-Z0-9@:%/._\+~#=?]+[-a-zA-Z0-9@:%/_\+~#=]/g; + +function formatLine(line) { + let r = new RegExp(urlRegexp); + let result = []; + let pos = 0; + while(true) { + let m = r.exec(line); + if(!m) + break; + result.push(document.createTextNode(line.slice(pos, m.index))); + let a = document.createElement('a'); + a.href = m[0]; + a.textContent = m[0]; + a.target = '_blank'; + a.rel = 'noreferrer noopener'; + result.push(a); + pos = m.index + m[0].length; + } + result.push(document.createTextNode(line.slice(pos))); + return result; +} + +function formatLines(lines) { + let elts = []; + if(lines.length > 0) + elts = formatLine(lines[0]); + for(let i = 1; i < lines.length; i++) { + elts.push(document.createElement('br')); + elts = elts.concat(formatLine(lines[i])); + } + let elt = document.createElement('p'); + elts.forEach(e => elt.appendChild(e)); + return elt; +} + +let lastMessage = {}; + +function addToChatbox(peerId, nick, message, me){ + let container = document.createElement('div'); + container.classList.add('message'); + if(!me) { + let p = formatLines(message.split('\n')); + if (lastMessage.nick !== nick || lastMessage.peerId !== peerId) { + let user = document.createElement('p'); + user.textContent = nick; + user.classList.add('message-user'); + container.appendChild(user); + } + p.classList.add('message-content'); + container.appendChild(p); + lastMessage.nick = nick; + lastMessage.peerId = peerId; + } else { + let asterisk = document.createElement('span'); + asterisk.textContent = '*'; + asterisk.classList.add('message-me-asterisk'); + let user = document.createElement('span'); + user.textContent = nick; + user.classList.add('message-me-user'); + let content = document.createElement('span'); + formatLine(message).forEach(elt => { + content.appendChild(elt); + }); + content.classList.add('message-me-content'); + container.appendChild(asterisk); + container.appendChild(user); + container.appendChild(content); + container.classList.add('message-me'); + delete(lastMessage.nick); + delete(lastMessage.peerId); + } + + document.getElementById('box').appendChild(container); + + if(box.scrollHeight > box.clientHeight) { + box.scrollTop = box.scrollHeight - box.clientHeight; + } + + return message; +} + +function handleInput() { + let username = getUsername(); + if(!username) { + displayError("Sorry, you're anonymous, you cannot chat"); + return; + } + + let input = document.getElementById('input'); + let data = input.value; + input.value = ''; + + let message, me; + + if(data === '') + return; + + if(data.charAt(0) === '/') { + if(data.charAt(1) === '/') { + message = data.substring(1); + me = false; + } else { + let space, cmd, rest; + space = data.indexOf(' '); + if(space < 0) { + cmd = data; + rest = ''; + } else { + cmd = data.slice(0, space); + rest = data.slice(space + 1).trim(); + } + + switch(cmd) { + case '/nick': + setNick(rest); + storeNick(rest); + return; + case '/me': + message = rest; + me = true; + break; + default: + displayError('Uknown command ' + cmd); + return; + } + } + } else { + message = data; + me = false; + } + + addToChatbox(myid, username, message, me); + send({ + type: 'chat', + username: username, + value: message, + me: me, + }); +} + +document.getElementById('inputform').onsubmit = function(e) { + e.preventDefault(); + handleInput(); +}; + +document.getElementById('input').onkeypress = function(e) { + if(e.key === 'Enter' && !e.ctrlKey && !e.shiftKey && !e.metaKey) { + e.preventDefault(); + handleInput(); + } +} + +function chatResizer(e) { + e.preventDefault(); + let chat = document.getElementById('chat'); + let start_x = e.clientX; + let start_width = parseFloat( + document.defaultView.getComputedStyle(chat).width.replace('px', ''), + ); + let inputbutton = document.getElementById('inputbutton'); + function start_drag(e) { + let width = start_width + e.clientX - start_x; + if(width < 40) + inputbutton.style.display = 'none'; + else + inputbutton.style.display = 'inline'; + chat.style.width = width + 'px'; + } + function stop_drag(e) { + document.documentElement.removeEventListener( + 'mousemove', start_drag, false, + ); + document.documentElement.removeEventListener( + 'mouseup', stop_drag, false, + ); + } + + document.documentElement.addEventListener( + 'mousemove', start_drag, false, + ); + document.documentElement.addEventListener( + 'mouseup', stop_drag, false, + ); +} + +document.getElementById('resizer').addEventListener('mousedown', chatResizer, false); + +async function newUpStream() { + let id = randomid(); + if(up[id]) + throw new Error('Eek!'); + let pc = new RTCPeerConnection({ + iceServers: iceServers, + }); + if(!pc) + throw new Error("Couldn't create peer connection") + up[id] = new Connection(id, pc); + + pc.onnegotiationneeded = e => negotiate(id); + + pc.onicecandidate = function(e) { + if(!e.candidate) + return; + send({type: 'ice', + id: id, + candidate: e.candidate, + }); + } + + pc.ontrack = console.error; + + return id; +} + +async function negotiate(id) { + let c = up[id]; + if(!c) + throw new Error('unknown connection'); + let offer = await c.pc.createOffer({}); + if(!offer) + throw(new Error("Didn't create offer")); + await c.pc.setLocalDescription(offer); + send({ + type: 'offer', + id: id, + offer: offer, + }); +} + +let errorTimeout = null; + +function setErrorTimeout(ms) { + if(errorTimeout) { + clearTimeout(errorTimeout); + errorTimeout = null; + } + if(ms) { + errorTimeout = setTimeout(clearError, ms); + } +} + +function displayError(message) { + let errspan = document.getElementById('errspan'); + errspan.textContent = message; + errspan.classList.remove('noerror'); + errspan.classList.add('error'); + setErrorTimeout(8000); +} + +function displayWarning(message) { + // don't overwrite real errors + if(!errorTimeout) + return displayError(message); +} + +function clearError() { + let errspan = document.getElementById('errspan'); + errspan.textContent = ''; + errspan.classList.remove('error'); + errspan.classList.add('noerror'); + setErrorTimeout(null); +} + +async function getIceServers() { + let r = await fetch('/ice-servers.json'); + if(!r.ok) + throw new Error("Couldn't fetch ICE servers: " + + r.status + ' ' + r.statusText); + let servers = await r.json(); + if(!(servers instanceof Array)) + throw new Error("couldn't parse ICE servers"); + iceServers = servers; +} + +async function doConnect() { + await serverConnect(); + await setLocalMedia(); + await setShareMedia(); +} + +document.getElementById('userform').onsubmit = async function(e) { + e.preventDefault(); + let username = document.getElementById('username').value.trim(); + let password = document.getElementById('password').value; + setUserPass(username, password); + await doConnect(); +} + +document.getElementById('disconnectbutton').onclick = function(e) { + socket.close(); +} + + +function start() { + group = decodeURIComponent(location.pathname.replace(/^\/[a-z]*\//, '')); + let title = document.getElementById('title'); + if(group !== '') + title.textContent = group.charAt(0).toUpperCase() + group.slice(1); + + myid = randomid(); + + getIceServers().catch(console.error).then(c => { + document.getElementById('connectbutton').disabled = false; + }).then(c => { + let userpass = getUserPass(); + if(userpass) + doConnect(); + }); +} + +start();