diff --git a/disk.go b/disk.go index fd364e3..8b85090 100644 --- a/disk.go +++ b/disk.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" "github.com/at-wat/ebml-go/webm" @@ -15,8 +16,10 @@ import ( ) type diskClient struct { - group *group - id string + group *group + id string + + mu sync.Mutex down []*diskConn closed bool } @@ -34,6 +37,9 @@ func (client *diskClient) getUsername() string { } func (client *diskClient) Close() error { + client.mu.Lock() + defer client.mu.Unlock() + for _, down := range client.down { down.Close() } @@ -43,6 +49,9 @@ func (client *diskClient) Close() error { } func (client *diskClient) pushConn(conn *upConnection, tracks []*upTrack, label string) error { + client.mu.Lock() + defer client.mu.Unlock() + if client.closed { return errors.New("disk client is closed") } @@ -65,14 +74,17 @@ func (client *diskClient) pushConn(conn *upConnection, tracks []*upTrack, label var _ client = &diskClient{} type diskConn struct { - directory string - label string + directory string + label string + + mu sync.Mutex file *os.File remote *upConnection tracks []*diskTrack width, height uint32 } +// called locked func (conn *diskConn) reopen() error { for _, t := range conn.tracks { if t.writer != nil { @@ -94,11 +106,18 @@ func (conn *diskConn) reopen() error { func (conn *diskConn) Close() error { conn.remote.delLocal(conn) + conn.mu.Lock() + tracks := make([]*diskTrack, 0, len(conn.tracks)) for _, t := range conn.tracks { if t.writer != nil { t.writer.Close() t.writer = nil } + tracks = append(tracks, t) + } + conn.mu.Unlock() + + for _, t := range tracks { t.remote.delLocal(t) } return nil @@ -131,11 +150,12 @@ func openDiskFile(directory, label string) (*os.File, error) { } type diskTrack struct { - remote *upTrack + remote *upTrack + conn *diskConn + writer webm.BlockWriteCloser builder *samplebuilder.SampleBuilder timestamp uint32 - conn *diskConn } func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTrack) (*diskConn, error) { @@ -174,7 +194,6 @@ func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTr } } - up.addLocal(&conn) return &conn, nil @@ -194,6 +213,10 @@ func clonePacket(packet *rtp.Packet) *rtp.Packet { } func (t *diskTrack) WriteRTP(packet *rtp.Packet) error { + // since we call initWriter, we take the connection lock for simplicity. + t.conn.mu.Lock() + defer t.conn.mu.Unlock() + if t.builder == nil { return nil } @@ -243,6 +266,7 @@ func (t *diskTrack) WriteRTP(packet *rtp.Packet) error { } } +// called locked func (t *diskTrack) initWriter(data []byte) error { switch t.remote.track.Codec().Name { case webrtc.VP8: @@ -262,6 +286,7 @@ func (t *diskTrack) initWriter(data []byte) error { return nil } +// called locked func (conn *diskConn) initWriter(width, height uint32) error { if conn.file != nil && width == conn.width && height == conn.height { return nil diff --git a/group.go b/group.go index 3b745b3..ccbcdd9 100644 --- a/group.go +++ b/group.go @@ -352,6 +352,7 @@ type groupDescription struct { Public bool `json:"public,omitempty"` MaxClients int `json:"max-clients,omitempty"` AllowAnonymous bool `json:"allow-anonymous,omitempty"` + AllowRecording bool `json:"allow-recording,omitempty"` Op []groupUser `json:"op,omitempty"` Presenter []groupUser `json:"presenter,omitempty"` Other []groupUser `json:"other,omitempty"` @@ -402,6 +403,7 @@ func getDescription(name string) (*groupDescription, error) { type userPermission struct { Op bool `json:"op,omitempty"` Present bool `json:"present,omitempty"` + Record bool `json:"record,omitempty"` } func getPermission(desc *groupDescription, user, pass string) (userPermission, error) { @@ -413,6 +415,9 @@ func getPermission(desc *groupDescription, user, pass string) (userPermission, e if good { p.Op = true p.Present = true + if desc.AllowRecording { + p.Record = true + } return p, nil } return p, userError("not authorised") diff --git a/static/sfu.js b/static/sfu.js index 7825152..0fe9793 100644 --- a/static/sfu.js +++ b/static/sfu.js @@ -933,7 +933,17 @@ function handleInput() { return; } send({ - type: cmd === '/lock' ? 'lock' : 'unlock', + type: cmd.slice(1), + }); + return; + case '/record': + case '/unrecord': + if(!permissions.record) { + displayError("You're not allowed to record"); + return; + } + send({ + type: cmd.slice(1), }); return; case '/op': diff --git a/webclient.go b/webclient.go index 9bca1fd..dca06a6 100644 --- a/webclient.go +++ b/webclient.go @@ -1223,19 +1223,20 @@ func (c *webClient) setRequested(requested map[string]uint32) error { c.requested = requested - go func() { - clients := c.group.getClients(c) - for _, cc := range clients { - ccc, ok := cc.(*webClient) - if ok { - ccc.action(pushConnsAction{c}) - } - } - }() - + go pushConns(c) return nil } +func pushConns(c client) { + clients := c.getGroup().getClients(c) + for _, cc := range clients { + ccc, ok := cc.(*webClient) + if ok { + ccc.action(pushConnsAction{c}) + } + } +} + func (c *webClient) isRequested(label string) bool { return c.requested[label] != 0 } @@ -1473,8 +1474,12 @@ func setPermissions(g *group, id string, perm string) error { switch perm { case "op": c.permissions.Op = true + if g.description.AllowRecording { + c.permissions.Record = true + } case "unop": c.permissions.Op = false + c.permissions.Record = false case "present": c.permissions.Present = true case "unpresent": @@ -1582,6 +1587,37 @@ func handleClientMessage(c *webClient, m clientMessage) error { locked = 1 } atomic.StoreUint32(&c.group.locked, locked) + case "record": + if !c.permissions.Record { + return c.error(userError("not authorised")) + } + for _, cc := range c.group.getClients(c) { + _, ok := cc.(*diskClient) + if ok { + return c.error(userError("already recording")) + } + } + disk := &diskClient{ + group: c.group, + id: "recording", + } + _, _, err := addClient(c.group.name, disk, "", "") + if err != nil { + disk.Close() + return c.error(err) + } + go pushConns(disk) + case "unrecord": + if !c.permissions.Record { + return c.error(userError("not authorised")) + } + for _, cc := range c.group.getClients(c) { + disk, ok := cc.(*diskClient) + if ok { + disk.Close() + delClient(disk) + } + } case "kick": if !c.permissions.Op { return c.error(userError("not authorised"))