From 5205c0773b547a629eb10ef609e4ce16ba461db3 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Sat, 2 May 2020 23:41:47 +0200 Subject: [PATCH] Delete upstream connections on ICE failure. --- client.go | 62 +++++++++++++++++++++++++++++++++++++++++-------------- group.go | 4 ++++ 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index 93101fe..bf5c8bd 100644 --- a/client.go +++ b/client.go @@ -281,6 +281,12 @@ func addUpConn(c *client, id string) (*upConnection, error) { sendICE(c, id, candidate) }) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed { + c.action(connectionFailedAction{id: id}) + } + }) + go rtcpUpSender(c, conn) pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { @@ -445,19 +451,17 @@ func rtcpUpSender(c *client, conn *upConnection) { } } -func delUpConn(c *client, id string) { +func delUpConn(c *client, id string) bool { c.mu.Lock() defer c.mu.Unlock() if c.up == nil { - log.Printf("Deleting unknown connection") - return + return false } conn := c.up[id] if conn == nil { - log.Printf("Deleting unknown connection") - return + return false } type clientId struct { @@ -483,6 +487,7 @@ func delUpConn(c *client, id string) { conn.pc.Close() delete(c.up, id) + return true } func getDownConn(c *client, id string) *downConnection { @@ -513,6 +518,12 @@ func addDownConn(c *client, id string, remote *upConnection) (*downConnection, e log.Printf("Got track on downstream connection") }) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed { + c.action(connectionFailedAction{id: id}) + } + }) + if c.down == nil { c.down = make(map[string]*downConnection) } @@ -528,18 +539,16 @@ func addDownConn(c *client, id string, remote *upConnection) (*downConnection, e return conn, nil } -func delDownConn(c *client, id string) { +func delDownConn(c *client, id string) bool { c.mu.Lock() defer c.mu.Unlock() if c.down == nil { - log.Printf("Deleting unknown connection") - return + return false } conn := c.down[id] if conn == nil { - log.Printf("Deleting unknown connection") - return + return false } for _, track := range conn.tracks { @@ -551,6 +560,7 @@ func delDownConn(c *client, id string) { } conn.pc.Close() delete(c.down, id) + return true } func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConnection) (*downConnection, *webrtc.RTPSender, error) { @@ -956,11 +966,13 @@ func clientLoop(c *client, conn *websocket.Conn) error { } } case delConnAction: - c.write(clientMessage{ - Type: "close", - Id: a.id, - }) - delDownConn(c, a.id) + found := delDownConn(c, a.id) + if found { + c.write(clientMessage{ + Type: "close", + Id: a.id, + }) + } case addLabelAction: c.write(clientMessage{ Type: "label", @@ -983,6 +995,21 @@ func clientLoop(c *client, conn *websocket.Conn) error { } } + case connectionFailedAction: + found := delUpConn(c, a.id) + if found { + c.write(clientMessage{ + Type: "error", + Value: "connection failed", + }) + c.write(clientMessage{ + Type: "abort", + Id: a.id, + }) + continue + } + // What should we do if a downstream + // connection fails? Renegotiate? case permissionsChangedAction: c.write(clientMessage{ Type: "permissions", @@ -1048,7 +1075,10 @@ func handleClientMessage(c *client, m clientMessage) error { return err } case "close": - delUpConn(c, m.Id) + found := delUpConn(c, m.Id) + if !found { + log.Printf("Deleting unknown up connection %v", m.Id) + } case "ice": if m.Candidate == nil { return protocolError("null candidate") diff --git a/group.go b/group.go index 32390d8..5a195ef 100644 --- a/group.go +++ b/group.go @@ -164,6 +164,10 @@ type pushTracksAction struct { c *client } +type connectionFailedAction struct { + id string +} + type permissionsChangedAction struct{} type kickAction struct{}