From 7393ca8473691090d5b592f9fc95c40463527817 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Tue, 12 May 2020 18:27:40 +0200 Subject: [PATCH] Make all communication between client threads asynchronous. We used to deadlock with large numbers of tracks. This should fix that. --- client.go | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index d7b0088..1c32b6f 100644 --- a/client.go +++ b/client.go @@ -497,9 +497,11 @@ func delUpConn(c *client, id string) bool { cc.mu.Unlock() } - for _, cid := range cids { - cid.client.action(delConnAction{cid.id}) - } + go func(cids []clientId) { + for _, cid := range cids { + cid.client.action(delConnAction{cid.id}) + } + }(cids) conn.pc.Close() delete(c.up, id) @@ -953,9 +955,12 @@ func (c *client) setRequested(audio, video bool) error { c.requestedAudio = audio c.requestedVideo = video - for _, cc := range c.group.getClients(c) { - cc.action(pushTracksAction{c}) - } + go func() { + clients := c.group.getClients(c) + for _, cc := range clients { + cc.action(pushTracksAction{c}) + } + }() return nil } @@ -971,6 +976,17 @@ func (c *client) requested(kind webrtc.RTPCodecType) bool { } } +func pushTracks(c *client, conn *upConnection, tracks []*upTrack, done bool, label string) { + for i, t := range tracks { + c.action(addTrackAction{t, conn, done && i == len(tracks)-1}) + } + + if done && label != "" { + c.action(addLabelAction{conn.id, conn.label}) + + } +} + func clientLoop(c *client, conn *websocket.Conn) error { read := make(chan interface{}, 1) go clientReader(conn, read, c.done) @@ -1063,19 +1079,13 @@ func clientLoop(c *client, conn *websocket.Conn) error { }) case pushTracksAction: for _, u := range c.up { - var done bool - for i, t := range u.tracks { - done = i >= u.trackCount-1 - a.c.action(addTrackAction{ - t, u, done, - }) - } - if done && u.label != "" { - a.c.action(addLabelAction{ - u.id, u.label, - }) - - } + tracks := make([]*upTrack, len(u.tracks)) + copy(tracks, u.tracks) + go pushTracks( + a.c, u, tracks, + len(tracks) >= u.trackCount-1, + u.label, + ) } case connectionFailedAction: found := delUpConn(c, a.id)