1
Fork 0

Implement recording.

This commit is contained in:
Juliusz Chroboczek 2020-05-30 00:23:54 +02:00
parent c1ab839f02
commit 0a2c4eb381
4 changed files with 94 additions and 18 deletions

39
disk.go
View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/at-wat/ebml-go/webm" "github.com/at-wat/ebml-go/webm"
@ -15,8 +16,10 @@ import (
) )
type diskClient struct { type diskClient struct {
group *group group *group
id string id string
mu sync.Mutex
down []*diskConn down []*diskConn
closed bool closed bool
} }
@ -34,6 +37,9 @@ func (client *diskClient) getUsername() string {
} }
func (client *diskClient) Close() error { func (client *diskClient) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
for _, down := range client.down { for _, down := range client.down {
down.Close() down.Close()
} }
@ -43,6 +49,9 @@ func (client *diskClient) Close() error {
} }
func (client *diskClient) pushConn(conn *upConnection, tracks []*upTrack, label string) error { func (client *diskClient) pushConn(conn *upConnection, tracks []*upTrack, label string) error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closed { if client.closed {
return errors.New("disk client is closed") return errors.New("disk client is closed")
} }
@ -65,14 +74,17 @@ func (client *diskClient) pushConn(conn *upConnection, tracks []*upTrack, label
var _ client = &diskClient{} var _ client = &diskClient{}
type diskConn struct { type diskConn struct {
directory string directory string
label string label string
mu sync.Mutex
file *os.File file *os.File
remote *upConnection remote *upConnection
tracks []*diskTrack tracks []*diskTrack
width, height uint32 width, height uint32
} }
// called locked
func (conn *diskConn) reopen() error { func (conn *diskConn) reopen() error {
for _, t := range conn.tracks { for _, t := range conn.tracks {
if t.writer != nil { if t.writer != nil {
@ -94,11 +106,18 @@ func (conn *diskConn) reopen() error {
func (conn *diskConn) Close() error { func (conn *diskConn) Close() error {
conn.remote.delLocal(conn) conn.remote.delLocal(conn)
conn.mu.Lock()
tracks := make([]*diskTrack, 0, len(conn.tracks))
for _, t := range conn.tracks { for _, t := range conn.tracks {
if t.writer != nil { if t.writer != nil {
t.writer.Close() t.writer.Close()
t.writer = nil t.writer = nil
} }
tracks = append(tracks, t)
}
conn.mu.Unlock()
for _, t := range tracks {
t.remote.delLocal(t) t.remote.delLocal(t)
} }
return nil return nil
@ -131,11 +150,12 @@ func openDiskFile(directory, label string) (*os.File, error) {
} }
type diskTrack struct { type diskTrack struct {
remote *upTrack remote *upTrack
conn *diskConn
writer webm.BlockWriteCloser writer webm.BlockWriteCloser
builder *samplebuilder.SampleBuilder builder *samplebuilder.SampleBuilder
timestamp uint32 timestamp uint32
conn *diskConn
} }
func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTrack) (*diskConn, error) { func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTrack) (*diskConn, error) {
@ -174,7 +194,6 @@ func newDiskConn(directory, label string, up *upConnection, remoteTracks []*upTr
} }
} }
up.addLocal(&conn) up.addLocal(&conn)
return &conn, nil return &conn, nil
@ -194,6 +213,10 @@ func clonePacket(packet *rtp.Packet) *rtp.Packet {
} }
func (t *diskTrack) WriteRTP(packet *rtp.Packet) error { func (t *diskTrack) WriteRTP(packet *rtp.Packet) error {
// since we call initWriter, we take the connection lock for simplicity.
t.conn.mu.Lock()
defer t.conn.mu.Unlock()
if t.builder == nil { if t.builder == nil {
return nil return nil
} }
@ -243,6 +266,7 @@ func (t *diskTrack) WriteRTP(packet *rtp.Packet) error {
} }
} }
// called locked
func (t *diskTrack) initWriter(data []byte) error { func (t *diskTrack) initWriter(data []byte) error {
switch t.remote.track.Codec().Name { switch t.remote.track.Codec().Name {
case webrtc.VP8: case webrtc.VP8:
@ -262,6 +286,7 @@ func (t *diskTrack) initWriter(data []byte) error {
return nil return nil
} }
// called locked
func (conn *diskConn) initWriter(width, height uint32) error { func (conn *diskConn) initWriter(width, height uint32) error {
if conn.file != nil && width == conn.width && height == conn.height { if conn.file != nil && width == conn.width && height == conn.height {
return nil return nil

View File

@ -352,6 +352,7 @@ type groupDescription struct {
Public bool `json:"public,omitempty"` Public bool `json:"public,omitempty"`
MaxClients int `json:"max-clients,omitempty"` MaxClients int `json:"max-clients,omitempty"`
AllowAnonymous bool `json:"allow-anonymous,omitempty"` AllowAnonymous bool `json:"allow-anonymous,omitempty"`
AllowRecording bool `json:"allow-recording,omitempty"`
Op []groupUser `json:"op,omitempty"` Op []groupUser `json:"op,omitempty"`
Presenter []groupUser `json:"presenter,omitempty"` Presenter []groupUser `json:"presenter,omitempty"`
Other []groupUser `json:"other,omitempty"` Other []groupUser `json:"other,omitempty"`
@ -402,6 +403,7 @@ func getDescription(name string) (*groupDescription, error) {
type userPermission struct { type userPermission struct {
Op bool `json:"op,omitempty"` Op bool `json:"op,omitempty"`
Present bool `json:"present,omitempty"` Present bool `json:"present,omitempty"`
Record bool `json:"record,omitempty"`
} }
func getPermission(desc *groupDescription, user, pass string) (userPermission, error) { func getPermission(desc *groupDescription, user, pass string) (userPermission, error) {
@ -413,6 +415,9 @@ func getPermission(desc *groupDescription, user, pass string) (userPermission, e
if good { if good {
p.Op = true p.Op = true
p.Present = true p.Present = true
if desc.AllowRecording {
p.Record = true
}
return p, nil return p, nil
} }
return p, userError("not authorised") return p, userError("not authorised")

View File

@ -933,7 +933,17 @@ function handleInput() {
return; return;
} }
send({ send({
type: cmd === '/lock' ? 'lock' : 'unlock', type: cmd.slice(1),
});
return;
case '/record':
case '/unrecord':
if(!permissions.record) {
displayError("You're not allowed to record");
return;
}
send({
type: cmd.slice(1),
}); });
return; return;
case '/op': case '/op':

View File

@ -1223,19 +1223,20 @@ func (c *webClient) setRequested(requested map[string]uint32) error {
c.requested = requested c.requested = requested
go func() { go pushConns(c)
clients := c.group.getClients(c)
for _, cc := range clients {
ccc, ok := cc.(*webClient)
if ok {
ccc.action(pushConnsAction{c})
}
}
}()
return nil return nil
} }
func pushConns(c client) {
clients := c.getGroup().getClients(c)
for _, cc := range clients {
ccc, ok := cc.(*webClient)
if ok {
ccc.action(pushConnsAction{c})
}
}
}
func (c *webClient) isRequested(label string) bool { func (c *webClient) isRequested(label string) bool {
return c.requested[label] != 0 return c.requested[label] != 0
} }
@ -1473,8 +1474,12 @@ func setPermissions(g *group, id string, perm string) error {
switch perm { switch perm {
case "op": case "op":
c.permissions.Op = true c.permissions.Op = true
if g.description.AllowRecording {
c.permissions.Record = true
}
case "unop": case "unop":
c.permissions.Op = false c.permissions.Op = false
c.permissions.Record = false
case "present": case "present":
c.permissions.Present = true c.permissions.Present = true
case "unpresent": case "unpresent":
@ -1582,6 +1587,37 @@ func handleClientMessage(c *webClient, m clientMessage) error {
locked = 1 locked = 1
} }
atomic.StoreUint32(&c.group.locked, locked) atomic.StoreUint32(&c.group.locked, locked)
case "record":
if !c.permissions.Record {
return c.error(userError("not authorised"))
}
for _, cc := range c.group.getClients(c) {
_, ok := cc.(*diskClient)
if ok {
return c.error(userError("already recording"))
}
}
disk := &diskClient{
group: c.group,
id: "recording",
}
_, _, err := addClient(c.group.name, disk, "", "")
if err != nil {
disk.Close()
return c.error(err)
}
go pushConns(disk)
case "unrecord":
if !c.permissions.Record {
return c.error(userError("not authorised"))
}
for _, cc := range c.group.getClients(c) {
disk, ok := cc.(*diskClient)
if ok {
disk.Close()
delClient(disk)
}
}
case "kick": case "kick":
if !c.permissions.Op { if !c.permissions.Op {
return c.error(userError("not authorised")) return c.error(userError("not authorised"))