From 3b505a89fe5867adca6cd142cbc206fa873ddcb8 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Sun, 14 Feb 2021 16:34:43 +0100 Subject: [PATCH] Factor out handling actions. --- rtpconn/webclient.go | 301 ++++++++++++++++++++++--------------------- 1 file changed, 154 insertions(+), 147 deletions(-) diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index f532108..29899e2 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -827,153 +827,9 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { return m } case a := <-c.actionCh: - switch a := a.(type) { - case pushConnAction: - g := c.group - if g == nil || a.group != g { - return nil - } - var tracks []conn.UpTrack - if a.conn != nil { - tracks = make([]conn.UpTrack, - 0, len(a.tracks), - ) - for _, t := range a.tracks { - if c.isRequested(t.Label()) { - tracks = append( - tracks, t, - ) - } - } - } - - if len(tracks) == 0 { - closeDownConn(c, a.id, "") - if a.replace != "" { - closeDownConn( - c, a.replace, "", - ) - } - continue - } - - down, _, err := addDownConn(c, a.conn) - if err != nil { - return err - } - err = replaceTracks(down, tracks, a.conn) - if err != nil { - return err - } - if a.replace != "" { - err := delDownConn(c, a.replace) - if err != nil { - log.Printf("Replace: %v", err) - } - } - err = negotiate( - c, down, false, a.replace, - ) - if err != nil { - log.Printf( - "Negotiation failed: %v", - err) - closeDownConn(c, down.id, - "negotiation failed") - continue - } - case pushConnsAction: - g := c.group - if g == nil || a.group != g { - return nil - } - for _, u := range c.up { - if !u.complete() { - continue - } - tracks := u.getTracks() - replace := u.getReplace(false) - - ts := make([]conn.UpTrack, len(tracks)) - for i, t := range tracks { - ts[i] = t - } - go func(u *rtpUpConnection, - ts []conn.UpTrack, - replace string) { - err := a.client.PushConn( - g, u.id, u, ts, replace, - ) - if err != nil { - log.Printf( - "PushConn: %v", - err, - ) - } - }(u, ts, replace) - } - case connectionFailedAction: - if down := getDownConn(c, a.id); down != nil { - err := negotiate(c, down, true, "") - if err != nil { - return err - } - tracks := make( - []conn.UpTrack, len(down.tracks), - ) - for i, t := range down.tracks { - tracks[i] = t.remote - } - go c.PushConn( - c.group, - down.remote.Id(), down.remote, - tracks, "", - ) - } else if up := getUpConn(c, a.id); up != nil { - c.write(clientMessage{ - Type: "renegotiate", - Id: a.id, - }) - } else { - log.Printf("Attempting to renegotiate " + - "unknown connection") - } - - case permissionsChangedAction: - g := c.Group() - if g == nil { - return errors.New("Permissions changed in no group") - } - perms := c.permissions - c.write(clientMessage{ - Type: "joined", - Kind: "change", - Group: g.Name(), - Username: c.username, - Permissions: &perms, - RTCConfiguration: ice.ICEConfiguration(), - }) - if !c.permissions.Present { - up := getUpConns(c) - for _, u := range up { - err := delUpConn( - c, u.id, c.id, true, - ) - if err == nil { - failUpConnection( - c, u.id, - "permission denied", - ) - } - } - } - case kickAction: - return group.KickError{ - a.id, a.username, a.message, - } - default: - log.Printf("unexpected action %T", a) - return errors.New("unexpected action") + err := handleAction(c, a) + if err != nil { + return err } case <-ticker.C: if time.Since(readTime) > 75*time.Second { @@ -994,6 +850,157 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { } } +func handleAction(c *webClient, a interface{}) error { + switch a := a.(type) { + case pushConnAction: + g := c.group + if g == nil || a.group != g { + return nil + } + var tracks []conn.UpTrack + if a.conn != nil { + tracks = make([]conn.UpTrack, + 0, len(a.tracks), + ) + for _, t := range a.tracks { + if c.isRequested(t.Label()) { + tracks = append( + tracks, t, + ) + } + } + } + + if len(tracks) == 0 { + closeDownConn(c, a.id, "") + if a.replace != "" { + closeDownConn( + c, a.replace, "", + ) + } + return nil + } + + down, _, err := addDownConn(c, a.conn) + if err != nil { + return err + } + err = replaceTracks(down, tracks, a.conn) + if err != nil { + return err + } + if a.replace != "" { + err := delDownConn(c, a.replace) + if err != nil { + log.Printf("Replace: %v", err) + } + } + err = negotiate( + c, down, false, a.replace, + ) + if err != nil { + log.Printf( + "Negotiation failed: %v", + err) + closeDownConn(c, down.id, + "negotiation failed") + } + case pushConnsAction: + g := c.group + if g == nil || a.group != g { + return nil + } + for _, u := range c.up { + if !u.complete() { + continue + } + tracks := u.getTracks() + replace := u.getReplace(false) + + ts := make([]conn.UpTrack, len(tracks)) + for i, t := range tracks { + ts[i] = t + } + go func(u *rtpUpConnection, + ts []conn.UpTrack, + replace string) { + err := a.client.PushConn( + g, u.id, u, ts, replace, + ) + if err != nil { + log.Printf( + "PushConn: %v", + err, + ) + } + }(u, ts, replace) + } + case connectionFailedAction: + if down := getDownConn(c, a.id); down != nil { + err := negotiate(c, down, true, "") + if err != nil { + return err + } + tracks := make( + []conn.UpTrack, len(down.tracks), + ) + for i, t := range down.tracks { + tracks[i] = t.remote + } + go c.PushConn( + c.group, + down.remote.Id(), down.remote, + tracks, "", + ) + } else if up := getUpConn(c, a.id); up != nil { + c.write(clientMessage{ + Type: "renegotiate", + Id: a.id, + }) + } else { + log.Printf("Attempting to renegotiate " + + "unknown connection") + } + + case permissionsChangedAction: + g := c.Group() + if g == nil { + return errors.New("Permissions changed in no group") + } + perms := c.permissions + c.write(clientMessage{ + Type: "joined", + Kind: "change", + Group: g.Name(), + Username: c.username, + Permissions: &perms, + RTCConfiguration: ice.ICEConfiguration(), + }) + if !c.permissions.Present { + up := getUpConns(c) + for _, u := range up { + err := delUpConn( + c, u.id, c.id, true, + ) + if err == nil { + failUpConnection( + c, u.id, + "permission denied", + ) + } + } + } + case kickAction: + return group.KickError{ + a.id, a.username, a.message, + } + default: + log.Printf("unexpected action %T", a) + return errors.New("unexpected action") + } + return nil +} + func failUpConnection(c *webClient, id string, message string) error { if id != "" { err := c.write(clientMessage{