1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-12 19:55:59 +01:00

Use an unbounded buffer for client actions.

We no longer risk deadlocking when sending actions, which in turn
enables pushing connections synchronously.
This commit is contained in:
Juliusz Chroboczek 2021-02-14 17:24:42 +01:00
parent 3b505a89fe
commit 183ab4fed7

View file

@ -64,9 +64,10 @@ type webClient struct {
writerDone chan struct{} writerDone chan struct{}
actionCh chan interface{} actionCh chan interface{}
mu sync.Mutex mu sync.Mutex
down map[string]*rtpDownConnection down map[string]*rtpDownConnection
up map[string]*rtpUpConnection up map[string]*rtpUpConnection
actions []interface{}
} }
func (c *webClient) Group() *group.Group { func (c *webClient) Group() *group.Group {
@ -276,14 +277,12 @@ func delUpConn(c *webClient, id string, userId string, push bool) error {
conn.pc.Close() conn.pc.Close()
if push && g != nil { if push && g != nil {
go func(clients []group.Client) { for _, c := range g.GetClients(c) {
for _, c := range clients { err := c.PushConn(g, id, nil, nil, replace)
err := c.PushConn(g, id, nil, nil, replace) if err != nil {
if err != nil { log.Printf("PushConn: %v", err)
log.Printf("PushConn: %v", err)
}
} }
}(g.GetClients(c)) }
} }
return nil return nil
@ -681,7 +680,7 @@ func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error {
func (c *webClient) setRequested(requested map[string]uint32) error { func (c *webClient) setRequested(requested map[string]uint32) error {
c.requested = requested c.requested = requested
go pushConns(c, c.group) pushConns(c, c.group)
return nil return nil
} }
@ -811,6 +810,13 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
} }
for { for {
c.mu.Lock()
actions := c.actions
c.actions = nil
c.mu.Unlock()
for _, a := range actions {
handleAction(c, a)
}
select { select {
case m, ok := <-read: case m, ok := <-read:
if !ok { if !ok {
@ -921,19 +927,10 @@ func handleAction(c *webClient, a interface{}) error {
for i, t := range tracks { for i, t := range tracks {
ts[i] = t ts[i] = t
} }
go func(u *rtpUpConnection, err := a.client.PushConn(g, u.id, u, ts, replace)
ts []conn.UpTrack, if err != nil {
replace string) { log.Printf("PushConn: %v", err)
err := a.client.PushConn( }
g, u.id, u, ts, replace,
)
if err != nil {
log.Printf(
"PushConn: %v",
err,
)
}
}(u, ts, replace)
} }
case connectionFailedAction: case connectionFailedAction:
if down := getDownConn(c, a.id); down != nil { if down := getDownConn(c, a.id); down != nil {
@ -947,7 +944,7 @@ func handleAction(c *webClient, a interface{}) error {
for i, t := range down.tracks { for i, t := range down.tracks {
tracks[i] = t.remote tracks[i] = t.remote
} }
go c.PushConn( c.PushConn(
c.group, c.group,
down.remote.Id(), down.remote, down.remote.Id(), down.remote,
tracks, "", tracks, "",
@ -1393,7 +1390,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
disk.Close() disk.Close()
return c.error(err) return c.error(err)
} }
go pushConns(disk, c.group) pushConns(disk, c.group)
case "unrecord": case "unrecord":
if !c.permissions.Record { if !c.permissions.Record {
return c.error(group.UserError("not authorised")) return c.error(group.UserError("not authorised"))
@ -1551,13 +1548,20 @@ func (c *webClient) Warn(oponly bool, message string) error {
var ErrClientDead = errors.New("client is dead") var ErrClientDead = errors.New("client is dead")
func (c *webClient) action(m interface{}) error { func (c *webClient) action(a interface{}) error {
select { c.mu.Lock()
case c.actionCh <- m: defer c.mu.Unlock()
return nil if len(c.actions) == 0 {
case <-c.done: select {
return ErrClientDead case c.actionCh <- a:
return nil
case <-c.done:
return ErrClientDead
default:
}
} }
c.actions = append(c.actions, a)
return nil
} }
func (c *webClient) write(m clientMessage) error { func (c *webClient) write(m clientMessage) error {