From bfdc22ff838f0b622efe7f06a27f5fc9c0693fce Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Sat, 23 May 2020 02:22:43 +0200 Subject: [PATCH] Maintain local connections explicitly. --- client.go | 31 ++++++++++--------------------- conn.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 6d32472..7676e0e 100644 --- a/client.go +++ b/client.go @@ -612,28 +612,12 @@ func delUpConn(c *client, id string) bool { } } - type clientId struct { - client *client - id string - } - cids := make([]clientId, 0) - - clients := c.group.getClients(c) - for _, cc := range clients { - cc.mu.Lock() - for _, otherconn := range cc.down { - if otherconn.remote == conn { - cids = append(cids, clientId{cc, otherconn.id}) - } + local := conn.getLocal() + go func() { + for _, l := range local { + l.Close() } - cc.mu.Unlock() - } - - go func(cids []clientId) { - for _, cid := range cids { - cid.client.action(delConnAction{cid.id}) - } - }(cids) + }() conn.pc.Close() delete(c.up, id) @@ -691,17 +675,21 @@ func addDownConn(c *client, id string, remote *upConnection) (*downConnection, e } conn := &downConnection{ id: id, + client: c, pc: pc, remote: remote, } c.mu.Lock() defer c.mu.Unlock() + if c.down[id] != nil || (c.up != nil && c.up[id] != nil) { conn.pc.Close() return nil, errors.New("Adding duplicate connection") } c.down[id] = conn + + remote.addLocal(conn) return conn, nil } @@ -717,6 +705,7 @@ func delDownConn(c *client, id string) bool { return false } + conn.remote.delLocal(conn) for _, track := range conn.tracks { found := track.remote.delLocal(track) if !found { diff --git a/conn.go b/conn.go index f0b5611..20456f3 100644 --- a/conn.go +++ b/conn.go @@ -100,12 +100,47 @@ type upConnection struct { tracks []*upTrack labels map[string]string iceCandidates []*webrtc.ICECandidateInit + + mu sync.Mutex + local []*downConnection } func (up *upConnection) getPC() *webrtc.PeerConnection { return up.pc } +func (up *upConnection) addLocal(local *downConnection) { + up.mu.Lock() + defer up.mu.Unlock() + for _, t := range up.local { + if t == local { + up.mu.Unlock() + return + } + } + up.local = append(up.local, local) +} + +func (up *upConnection) delLocal(local *downConnection) bool { + up.mu.Lock() + defer up.mu.Unlock() + for i, l := range up.local { + if l == local { + up.local = append(up.local[:i], up.local[i+1:]...) + return true + } + } + return false +} + +func (up *upConnection) getLocal() []*downConnection { + up.mu.Lock() + defer up.mu.Unlock() + local := make([]*downConnection, len(up.local)) + copy(local, up.local) + return local +} + func (up *upConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { if up.pc.RemoteDescription() != nil { return up.pc.AddICECandidate(*candidate) @@ -224,12 +259,17 @@ func (down *downTrack) GetMaxBitrate(now uint64) uint64 { type downConnection struct { id string + client *client pc *webrtc.PeerConnection remote *upConnection tracks []*downTrack iceCandidates []*webrtc.ICECandidateInit } +func (down *downConnection) Close() error { + return down.client.action(delConnAction{down.id}) +} + func (down *downConnection) getPC() *webrtc.PeerConnection { return down.pc }