diff --git a/conn.go b/conn.go index 5e998fd..c11aebf 100644 --- a/conn.go +++ b/conn.go @@ -50,17 +50,19 @@ func (up *upTrack) notifyLocal(add bool, track downTrack) { } } -func (up *upTrack) addLocal(local downTrack) { +func (up *upTrack) addLocal(local downTrack) error { up.mu.Lock() for _, t := range up.local { if t == local { up.mu.Unlock() - return + return nil } } up.local = append(up.local, local) up.mu.Unlock() + up.notifyLocal(true, local) + return nil } func (up *upTrack) delLocal(local downTrack) bool { @@ -107,20 +109,26 @@ type upConnection struct { labels map[string]string iceCandidates []*webrtc.ICECandidateInit - mu sync.Mutex - local []downConnection + mu sync.Mutex + closed bool + local []downConnection } -func (up *upConnection) addLocal(local downConnection) { +var ErrConnectionClosed = errors.New("connection is closed") + +func (up *upConnection) addLocal(local downConnection) error { up.mu.Lock() defer up.mu.Unlock() + if up.closed { + return ErrConnectionClosed + } for _, t := range up.local { if t == local { - up.mu.Unlock() - return + return nil } } up.local = append(up.local, local) + return nil } func (up *upConnection) delLocal(local downConnection) bool { @@ -143,6 +151,21 @@ func (up *upConnection) getLocal() []downConnection { return local } +func (up *upConnection) Close() error { + up.mu.Lock() + defer up.mu.Unlock() + + go func(local []downConnection) { + for _, l := range local { + l.Close() + } + }(up.local) + + up.local = nil + up.closed = true + return up.pc.Close() +} + func (up *upConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { if up.pc.RemoteDescription() != nil { return up.pc.AddICECandidate(*candidate) diff --git a/disk.go b/disk.go index 8b85090..097dd72 100644 --- a/disk.go +++ b/disk.go @@ -194,7 +194,10 @@ func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTr } } - up.addLocal(&conn) + err := up.addLocal(&conn) + if err != nil { + return nil, err + } return &conn, nil } diff --git a/webclient.go b/webclient.go index dca06a6..346fe54 100644 --- a/webclient.go +++ b/webclient.go @@ -675,14 +675,7 @@ func delUpConn(c *webClient, id string) bool { } } - local := conn.getLocal() - go func() { - for _, l := range local { - l.Close() - } - }() - - conn.pc.Close() + conn.Close() delete(c.up, id) return true } @@ -744,9 +737,13 @@ func addDownConn(c *webClient, id string, remote *upConnection) (*rtpDownConnect conn.pc.Close() return nil, errors.New("Adding duplicate connection") } - c.down[id] = conn + err = remote.addLocal(conn) + if err != nil { + conn.pc.Close() + return nil, err + } - remote.addLocal(conn) + c.down[id] = conn return conn, nil }