1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-22 16:45:58 +01:00

Move group and client to their own package.

This commit is contained in:
Juliusz Chroboczek 2020-09-13 11:56:35 +02:00
parent d9cf32eda7
commit c608723394
9 changed files with 258 additions and 255 deletions

View file

@ -1,29 +0,0 @@
package main
import (
"sfu/conn"
)
type clientCredentials struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type clientPermissions struct {
Op bool `json:"op,omitempty"`
Present bool `json:"present,omitempty"`
Record bool `json:"record,omitempty"`
}
type client interface {
Group() *group
Id() string
Credentials() clientCredentials
SetPermissions(clientPermissions)
pushConn(id string, conn conn.Up, tracks []conn.UpTrack, label string) error
pushClient(id, username string, add bool) error
}
type kickable interface {
kick(message string) error
}

21
disk.go
View file

@ -16,10 +16,11 @@ import (
"github.com/pion/webrtc/v3/pkg/media/samplebuilder" "github.com/pion/webrtc/v3/pkg/media/samplebuilder"
"sfu/conn" "sfu/conn"
"sfu/group"
) )
type diskClient struct { type diskClient struct {
group *group group *group.Group
id string id string
mu sync.Mutex mu sync.Mutex
@ -41,11 +42,11 @@ func newId() string {
return s return s
} }
func NewDiskClient(g *group) *diskClient { func NewDiskClient(g *group.Group) *diskClient {
return &diskClient{group: g, id: newId()} return &diskClient{group: g, id: newId()}
} }
func (client *diskClient) Group() *group { func (client *diskClient) Group() *group.Group {
return client.group return client.group
} }
@ -53,15 +54,15 @@ func (client *diskClient) Id() string {
return client.id return client.id
} }
func (client *diskClient) Credentials() clientCredentials { func (client *diskClient) Credentials() group.ClientCredentials {
return clientCredentials{"RECORDING", ""} return group.ClientCredentials{"RECORDING", ""}
} }
func (client *diskClient) SetPermissions(perms clientPermissions) { func (client *diskClient) SetPermissions(perms group.ClientPermissions) {
return return
} }
func (client *diskClient) pushClient(id, username string, add bool) error { func (client *diskClient) PushClient(id, username string, add bool) error {
return nil return nil
} }
@ -79,11 +80,11 @@ func (client *diskClient) Close() error {
func (client *diskClient) kick(message string) error { func (client *diskClient) kick(message string) error {
err := client.Close() err := client.Close()
delClient(client) group.DelClient(client)
return err return err
} }
func (client *diskClient) pushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error { func (client *diskClient) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error {
client.mu.Lock() client.mu.Lock()
defer client.mu.Unlock() defer client.mu.Unlock()
@ -101,7 +102,7 @@ func (client *diskClient) pushConn(id string, up conn.Up, tracks []conn.UpTrack,
return nil return nil
} }
directory := filepath.Join(recordingsDir, client.group.name) directory := filepath.Join(recordingsDir, client.group.Name())
err := os.MkdirAll(directory, 0700) err := os.MkdirAll(directory, 0700)
if err != nil { if err != nil {
return err return err

29
group/client.go Normal file
View file

@ -0,0 +1,29 @@
package group
import (
"sfu/conn"
)
type ClientCredentials struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type ClientPermissions struct {
Op bool `json:"op,omitempty"`
Present bool `json:"present,omitempty"`
Record bool `json:"record,omitempty"`
}
type Client interface {
Group() *Group
Id() string
Credentials() ClientCredentials
SetPermissions(ClientPermissions)
PushConn(id string, conn conn.Up, tracks []conn.UpTrack, label string) error
PushClient(id, username string, add bool) error
}
type Kickable interface {
Kick(message string) error
}

View file

@ -3,7 +3,7 @@
// This is not open source software. Copy it, and I'll break into your // This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist. // house and tell your three year-old that Santa doesn't exist.
package main package group
import ( import (
"encoding/json" "encoding/json"
@ -18,18 +18,60 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
) )
type chatHistoryEntry struct { var Directory string
id string
user string type UserError string
kind string
value string func (err UserError) Error() string {
return string(err)
}
type ProtocolError string
func (err ProtocolError) Error() string {
return string(err)
}
var IceFilename string
var iceConf webrtc.Configuration
var iceOnce sync.Once
func IceConfiguration() webrtc.Configuration {
iceOnce.Do(func() {
var iceServers []webrtc.ICEServer
file, err := os.Open(IceFilename)
if err != nil {
log.Printf("Open %v: %v", IceFilename, err)
return
}
defer file.Close()
d := json.NewDecoder(file)
err = d.Decode(&iceServers)
if err != nil {
log.Printf("Get ICE configuration: %v", err)
return
}
iceConf = webrtc.Configuration{
ICEServers: iceServers,
}
})
return iceConf
}
type ChatHistoryEntry struct {
Id string
User string
Kind string
Value string
} }
const ( const (
minBitrate = 200000 MinBitrate = 200000
) )
type group struct { type Group struct {
name string name string
mu sync.Mutex mu sync.Mutex
@ -37,64 +79,60 @@ type group struct {
// indicates that the group no longer exists, but it still has clients // indicates that the group no longer exists, but it still has clients
dead bool dead bool
locked bool locked bool
clients map[string]client clients map[string]Client
history []chatHistoryEntry history []ChatHistoryEntry
} }
func (g *group) Locked() bool { func (g *Group) Name() string {
return g.name
}
func (g *Group) Locked() bool {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.locked return g.locked
} }
func (g *group) SetLocked(locked bool) { func (g *Group) SetLocked(locked bool) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
g.locked = locked g.locked = locked
} }
func (g *group) Public() bool { func (g *Group) Public() bool {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.description.Public return g.description.Public
} }
func (g *group) Redirect() string { func (g *Group) Redirect() string {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.description.Redirect return g.description.Redirect
} }
func (g *group) AllowRecording() bool { func (g *Group) AllowRecording() bool {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.description.AllowRecording return g.description.AllowRecording
} }
func (g *group) Redirect() string {
return g.description.Redirect
}
func (g *group) AllowRecording() bool {
return g.description.AllowRecording
}
var groups struct { var groups struct {
mu sync.Mutex mu sync.Mutex
groups map[string]*group groups map[string]*Group
api *webrtc.API api *webrtc.API
} }
func (g *group) API() *webrtc.API { func (g *Group) API() *webrtc.API {
return groups.api return groups.api
} }
func addGroup(name string, desc *groupDescription) (*group, error) { func Add(name string, desc *groupDescription) (*Group, error) {
groups.mu.Lock() groups.mu.Lock()
defer groups.mu.Unlock() defer groups.mu.Unlock()
if groups.groups == nil { if groups.groups == nil {
groups.groups = make(map[string]*group) groups.groups = make(map[string]*Group)
s := webrtc.SettingEngine{} s := webrtc.SettingEngine{}
m := webrtc.MediaEngine{} m := webrtc.MediaEngine{}
m.RegisterCodec(webrtc.NewRTPVP8CodecExt( m.RegisterCodec(webrtc.NewRTPVP8CodecExt(
@ -121,15 +159,15 @@ func addGroup(name string, desc *groupDescription) (*group, error) {
g := groups.groups[name] g := groups.groups[name]
if g == nil { if g == nil {
if desc == nil { if desc == nil {
desc, err = getDescription(name) desc, err = GetDescription(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
g = &group{ g = &Group{
name: name, name: name,
description: desc, description: desc,
clients: make(map[string]client), clients: make(map[string]Client),
} }
groups.groups[name] = g groups.groups[name] = g
return g, nil return g, nil
@ -155,7 +193,7 @@ func addGroup(name string, desc *groupDescription) (*group, error) {
return nil, err return nil, err
} }
if changed { if changed {
desc, err := getDescription(name) desc, err := GetDescription(name)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
log.Printf("Reading group %v: %v", log.Printf("Reading group %v: %v",
@ -175,7 +213,7 @@ func addGroup(name string, desc *groupDescription) (*group, error) {
return g, nil return g, nil
} }
func rangeGroups(f func(g *group) bool) { func Range(f func(g *Group) bool) {
groups.mu.Lock() groups.mu.Lock()
defer groups.mu.Unlock() defer groups.mu.Unlock()
@ -187,17 +225,17 @@ func rangeGroups(f func(g *group) bool) {
} }
} }
func getGroupNames() []string { func GetNames() []string {
names := make([]string, 0) names := make([]string, 0)
rangeGroups(func(g *group) bool { Range(func(g *Group) bool {
names = append(names, g.name) names = append(names, g.name)
return true return true
}) })
return names return names
} }
func getGroup(name string) *group { func Get(name string) *Group {
groups.mu.Lock() groups.mu.Lock()
defer groups.mu.Unlock() defer groups.mu.Unlock()
@ -218,8 +256,8 @@ func delGroupUnlocked(name string) bool {
return true return true
} }
func addClient(name string, c client) (*group, error) { func AddClient(name string, c Client) (*Group, error) {
g, err := addGroup(name, nil) g, err := Add(name, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -227,7 +265,7 @@ func addClient(name string, c client) (*group, error) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
perms, err := getPermission(g.description, c.Credentials()) perms, err := g.description.GetPermission(c.Credentials())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -235,37 +273,34 @@ func addClient(name string, c client) (*group, error) {
c.SetPermissions(perms) c.SetPermissions(perms)
if !perms.Op && g.locked { if !perms.Op && g.locked {
return nil, userError("group is locked") return nil, UserError("group is locked")
} }
if !perms.Op && g.description.MaxClients > 0 { if !perms.Op && g.description.MaxClients > 0 {
if len(g.clients) >= g.description.MaxClients { if len(g.clients) >= g.description.MaxClients {
return nil, userError("too many users") return nil, UserError("too many users")
} }
} }
if g.clients[c.Id()] != nil { if g.clients[c.Id()] != nil {
return nil, protocolError("duplicate client id") return nil, ProtocolError("duplicate client id")
} }
g.clients[c.Id()] = c g.clients[c.Id()] = c
go func(clients []client) { go func(clients []Client) {
u := c.Credentials().Username u := c.Credentials().Username
c.pushClient(c.Id(), u, true) c.PushClient(c.Id(), u, true)
for _, cc := range clients { for _, cc := range clients {
uu := cc.Credentials().Username uu := cc.Credentials().Username
err := c.pushClient(cc.Id(), uu, true) c.PushClient(cc.Id(), uu, true)
if err == ErrClientDead { cc.PushClient(c.Id(), u, true)
return
}
cc.pushClient(c.Id(), u, true)
} }
}(g.getClientsUnlocked(c)) }(g.getClientsUnlocked(c))
return g, nil return g, nil
} }
func delClient(c client) { func DelClient(c Client) {
g := c.Group() g := c.Group()
if g == nil { if g == nil {
return return
@ -279,21 +314,21 @@ func delClient(c client) {
} }
delete(g.clients, c.Id()) delete(g.clients, c.Id())
go func(clients []client) { go func(clients []Client) {
for _, cc := range clients { for _, cc := range clients {
cc.pushClient(c.Id(), c.Credentials().Username, false) cc.PushClient(c.Id(), c.Credentials().Username, false)
} }
}(g.getClientsUnlocked(nil)) }(g.getClientsUnlocked(nil))
} }
func (g *group) getClients(except client) []client { func (g *Group) GetClients(except Client) []Client {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.getClientsUnlocked(except) return g.getClientsUnlocked(except)
} }
func (g *group) getClientsUnlocked(except client) []client { func (g *Group) getClientsUnlocked(except Client) []Client {
clients := make([]client, 0, len(g.clients)) clients := make([]Client, 0, len(g.clients))
for _, c := range g.clients { for _, c := range g.clients {
if c != except { if c != except {
clients = append(clients, c) clients = append(clients, c)
@ -302,13 +337,13 @@ func (g *group) getClientsUnlocked(except client) []client {
return clients return clients
} }
func (g *group) getClient(id string) client { func (g *Group) GetClient(id string) Client {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
return g.getClientUnlocked(id) return g.getClientUnlocked(id)
} }
func (g *group) getClientUnlocked(id string) client { func (g *Group) getClientUnlocked(id string) Client {
for idd, c := range g.clients { for idd, c := range g.clients {
if idd == id { if idd == id {
return c return c
@ -317,7 +352,7 @@ func (g *group) getClientUnlocked(id string) client {
return nil return nil
} }
func (g *group) Range(f func(c client) bool) { func (g *Group) Range(f func(c Client) bool) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
for _, c := range g.clients { for _, c := range g.clients {
@ -328,11 +363,11 @@ func (g *group) Range(f func(c client) bool) {
} }
} }
func (g *group) shutdown(message string) { func (g *Group) Shutdown(message string) {
g.Range(func(c client) bool { g.Range(func(c Client) bool {
cc, ok := c.(kickable) cc, ok := c.(Kickable)
if ok { if ok {
cc.kick(message) cc.Kick(message)
} }
return true return true
}) })
@ -340,13 +375,13 @@ func (g *group) shutdown(message string) {
const maxChatHistory = 20 const maxChatHistory = 20
func (g *group) clearChatHistory() { func (g *Group) ClearChatHistory() {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
g.history = nil g.history = nil
} }
func (g *group) addToChatHistory(id, user, kind, value string) { func (g *Group) AddToChatHistory(id, user, kind, value string) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
@ -355,20 +390,20 @@ func (g *group) addToChatHistory(id, user, kind, value string) {
g.history = g.history[:len(g.history)-1] g.history = g.history[:len(g.history)-1]
} }
g.history = append(g.history, g.history = append(g.history,
chatHistoryEntry{id: id, user: user, kind: kind, value: value}, ChatHistoryEntry{Id: id, User: user, Kind: kind, Value: value},
) )
} }
func (g *group) getChatHistory() []chatHistoryEntry { func (g *Group) GetChatHistory() []ChatHistoryEntry {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
h := make([]chatHistoryEntry, len(g.history)) h := make([]ChatHistoryEntry, len(g.history))
copy(h, g.history) copy(h, g.history)
return h return h
} }
func matchUser(user clientCredentials, users []clientCredentials) (bool, bool) { func matchUser(user ClientCredentials, users []ClientCredentials) (bool, bool) {
for _, u := range users { for _, u := range users {
if u.Username == "" { if u.Username == "" {
if u.Password == "" || u.Password == user.Password { if u.Password == "" || u.Password == user.Password {
@ -391,13 +426,13 @@ type groupDescription struct {
MaxClients int `json:"max-clients,omitempty"` MaxClients int `json:"max-clients,omitempty"`
AllowAnonymous bool `json:"allow-anonymous,omitempty"` AllowAnonymous bool `json:"allow-anonymous,omitempty"`
AllowRecording bool `json:"allow-recording,omitempty"` AllowRecording bool `json:"allow-recording,omitempty"`
Op []clientCredentials `json:"op,omitempty"` Op []ClientCredentials `json:"op,omitempty"`
Presenter []clientCredentials `json:"presenter,omitempty"` Presenter []ClientCredentials `json:"presenter,omitempty"`
Other []clientCredentials `json:"other,omitempty"` Other []ClientCredentials `json:"other,omitempty"`
} }
func descriptionChanged(name string, old *groupDescription) (bool, error) { func descriptionChanged(name string, old *groupDescription) (bool, error) {
fi, err := os.Stat(filepath.Join(groupsDir, name+".json")) fi, err := os.Stat(filepath.Join(Directory, name+".json"))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -407,8 +442,8 @@ func descriptionChanged(name string, old *groupDescription) (bool, error) {
return false, err return false, err
} }
func getDescription(name string) (*groupDescription, error) { func GetDescription(name string) (*groupDescription, error) {
r, err := os.Open(filepath.Join(groupsDir, name+".json")) r, err := os.Open(filepath.Join(Directory, name+".json"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -432,10 +467,10 @@ func getDescription(name string) (*groupDescription, error) {
return &desc, nil return &desc, nil
} }
func getPermission(desc *groupDescription, creds clientCredentials) (clientPermissions, error) { func (desc *groupDescription) GetPermission (creds ClientCredentials) (ClientPermissions, error) {
var p clientPermissions var p ClientPermissions
if !desc.AllowAnonymous && creds.Username == "" { if !desc.AllowAnonymous && creds.Username == "" {
return p, userError("anonymous users not allowed in this group, please choose a username") return p, UserError("anonymous users not allowed in this group, please choose a username")
} }
if found, good := matchUser(creds, desc.Op); found { if found, good := matchUser(creds, desc.Op); found {
if good { if good {
@ -446,34 +481,34 @@ func getPermission(desc *groupDescription, creds clientCredentials) (clientPermi
} }
return p, nil return p, nil
} }
return p, userError("not authorised") return p, UserError("not authorised")
} }
if found, good := matchUser(creds, desc.Presenter); found { if found, good := matchUser(creds, desc.Presenter); found {
if good { if good {
p.Present = true p.Present = true
return p, nil return p, nil
} }
return p, userError("not authorised") return p, UserError("not authorised")
} }
if found, good := matchUser(creds, desc.Other); found { if found, good := matchUser(creds, desc.Other); found {
if good { if good {
return p, nil return p, nil
} }
return p, userError("not authorised") return p, UserError("not authorised")
} }
return p, userError("not authorised") return p, UserError("not authorised")
} }
type publicGroup struct { type Public struct {
Name string `json:"name"` Name string `json:"name"`
ClientCount int `json:"clientCount"` ClientCount int `json:"clientCount"`
} }
func getPublicGroups() []publicGroup { func GetPublic() []Public {
gs := make([]publicGroup, 0) gs := make([]Public, 0)
rangeGroups(func(g *group) bool { Range(func(g *Group) bool {
if g.Public() { if g.Public() {
gs = append(gs, publicGroup{ gs = append(gs, Public{
Name: g.name, Name: g.name,
ClientCount: len(g.clients), ClientCount: len(g.clients),
}) })
@ -486,8 +521,8 @@ func getPublicGroups() []publicGroup {
return gs return gs
} }
func readPublicGroups() { func ReadPublicGroups() {
dir, err := os.Open(groupsDir) dir, err := os.Open(Directory)
if err != nil { if err != nil {
return return
} }
@ -504,7 +539,7 @@ func readPublicGroups() {
continue continue
} }
name := fi.Name()[:len(fi.Name())-5] name := fi.Name()[:len(fi.Name())-5]
desc, err := getDescription(name) desc, err := GetDescription(name)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
log.Printf("Reading group %v: %v", name, err) log.Printf("Reading group %v: %v", name, err)
@ -512,7 +547,7 @@ func readPublicGroups() {
continue continue
} }
if desc.Public { if desc.Public {
addGroup(name, desc) Add(name, desc)
} }
} }
} }

View file

@ -20,6 +20,7 @@ import (
"sfu/conn" "sfu/conn"
"sfu/estimator" "sfu/estimator"
"sfu/group"
"sfu/jitter" "sfu/jitter"
"sfu/packetcache" "sfu/packetcache"
"sfu/rtptime" "sfu/rtptime"
@ -110,8 +111,8 @@ type rtpDownConnection struct {
iceCandidates []*webrtc.ICECandidateInit iceCandidates []*webrtc.ICECandidateInit
} }
func newDownConn(c client, id string, remote conn.Up) (*rtpDownConnection, error) { func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
pc, err := c.Group().API().NewPeerConnection(iceConfiguration()) pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -371,8 +372,8 @@ func (up *rtpUpConnection) complete() bool {
return true return true
} }
func newUpConn(c client, id string) (*rtpUpConnection, error) { func newUpConn(c group.Client, id string) (*rtpUpConnection, error) {
pc, err := c.Group().API().NewPeerConnection(iceConfiguration()) pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -448,9 +449,9 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) {
up.mu.Unlock() up.mu.Unlock()
if complete { if complete {
clients := c.Group().getClients(c) clients := c.Group().GetClients(c)
for _, cc := range clients { for _, cc := range clients {
cc.pushConn(up.id, up, tracks, up.label) cc.PushConn(up.id, up, tracks, up.label)
} }
go rtcpUpSender(up) go rtcpUpSender(up)
} }
@ -750,8 +751,8 @@ func sendUpRTCP(conn *rtpUpConnection) error {
rate = r rate = r
} }
} }
if rate < minBitrate { if rate < group.MinBitrate {
rate = minBitrate rate = group.MinBitrate
} }
var ssrcs []uint32 var ssrcs []uint32

10
sfu.go
View file

@ -14,14 +14,14 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"syscall" "syscall"
"sfu/group"
) )
var httpAddr string var httpAddr string
var staticRoot string var staticRoot string
var dataDir string var dataDir string
var groupsDir string
var recordingsDir string var recordingsDir string
var iceFilename string
func main() { func main() {
var cpuprofile, memprofile, mutexprofile string var cpuprofile, memprofile, mutexprofile string
@ -31,7 +31,7 @@ func main() {
"web server root `directory`") "web server root `directory`")
flag.StringVar(&dataDir, "data", "./data/", flag.StringVar(&dataDir, "data", "./data/",
"data `directory`") "data `directory`")
flag.StringVar(&groupsDir, "groups", "./groups/", flag.StringVar(&group.Directory, "groups", "./groups/",
"group description `directory`") "group description `directory`")
flag.StringVar(&recordingsDir, "recordings", "./recordings/", flag.StringVar(&recordingsDir, "recordings", "./recordings/",
"recordings `directory`") "recordings `directory`")
@ -81,9 +81,9 @@ func main() {
}() }()
} }
iceFilename = filepath.Join(dataDir, "ice-servers.json") group.IceFilename = filepath.Join(dataDir, "ice-servers.json")
go readPublicGroups() go group.ReadPublicGroups()
webserver() webserver()
terminate := make(chan os.Signal, 1) terminate := make(chan os.Signal, 1)

View file

@ -5,6 +5,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"sfu/group"
"sfu/rtptime" "sfu/rtptime"
) )
@ -33,15 +34,15 @@ type trackStats struct {
} }
func getGroupStats() []groupStats { func getGroupStats() []groupStats {
names := getGroupNames() names := group.GetNames()
gs := make([]groupStats, 0, len(names)) gs := make([]groupStats, 0, len(names))
for _, name := range names { for _, name := range names {
g := getGroup(name) g := group.Get(name)
if g == nil { if g == nil {
continue continue
} }
clients := g.getClients(nil) clients := g.GetClients(nil)
stats := groupStats{ stats := groupStats{
name: name, name: name,
clients: make([]clientStats, 0, len(clients)), clients: make([]clientStats, 0, len(clients)),

View file

@ -19,56 +19,19 @@ import (
"sfu/conn" "sfu/conn"
"sfu/estimator" "sfu/estimator"
"sfu/group"
) )
var iceConf webrtc.Configuration
var iceOnce sync.Once
func iceConfiguration() webrtc.Configuration {
iceOnce.Do(func() {
var iceServers []webrtc.ICEServer
file, err := os.Open(iceFilename)
if err != nil {
log.Printf("Open %v: %v", iceFilename, err)
return
}
defer file.Close()
d := json.NewDecoder(file)
err = d.Decode(&iceServers)
if err != nil {
log.Printf("Get ICE configuration: %v", err)
return
}
iceConf = webrtc.Configuration{
ICEServers: iceServers,
}
})
return iceConf
}
type protocolError string
func (err protocolError) Error() string {
return string(err)
}
type userError string
func (err userError) Error() string {
return string(err)
}
func errorToWSCloseMessage(err error) (string, []byte) { func errorToWSCloseMessage(err error) (string, []byte) {
var code int var code int
var text string var text string
switch e := err.(type) { switch e := err.(type) {
case *websocket.CloseError: case *websocket.CloseError:
code = websocket.CloseNormalClosure code = websocket.CloseNormalClosure
case protocolError: case group.ProtocolError:
code = websocket.CloseProtocolError code = websocket.CloseProtocolError
text = string(e) text = string(e)
case userError: case group.UserError:
code = websocket.CloseNormalClosure code = websocket.CloseNormalClosure
text = string(e) text = string(e)
default: default:
@ -84,10 +47,10 @@ func isWSNormalError(err error) bool {
} }
type webClient struct { type webClient struct {
group *group group *group.Group
id string id string
credentials clientCredentials credentials group.ClientCredentials
permissions clientPermissions permissions group.ClientPermissions
requested map[string]uint32 requested map[string]uint32
done chan struct{} done chan struct{}
writeCh chan interface{} writeCh chan interface{}
@ -99,7 +62,7 @@ type webClient struct {
up map[string]*rtpUpConnection up map[string]*rtpUpConnection
} }
func (c *webClient) Group() *group { func (c *webClient) Group() *group.Group {
return c.group return c.group
} }
@ -107,15 +70,15 @@ func (c *webClient) Id() string {
return c.id return c.id
} }
func (c *webClient) Credentials() clientCredentials { func (c *webClient) Credentials() group.ClientCredentials {
return c.credentials return c.credentials
} }
func (c *webClient) SetPermissions(perms clientPermissions) { func (c *webClient) SetPermissions(perms group.ClientPermissions) {
c.permissions = perms c.permissions = perms
} }
func (c *webClient) pushClient(id, username string, add bool) error { func (c *webClient) PushClient(id, username string, add bool) error {
kind := "add" kind := "add"
if !add { if !add {
kind = "delete" kind = "delete"
@ -181,7 +144,7 @@ type clientMessage struct {
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
Username string `json:"username,omitempty"` Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
Permissions clientPermissions `json:"permissions,omitempty"` Permissions group.ClientPermissions `json:"permissions,omitempty"`
Group string `json:"group,omitempty"` Group string `json:"group,omitempty"`
Value string `json:"value,omitempty"` Value string `json:"value,omitempty"`
Offer *webrtc.SessionDescription `json:"offer,omitempty"` Offer *webrtc.SessionDescription `json:"offer,omitempty"`
@ -265,11 +228,11 @@ func delUpConn(c *webClient, id string) bool {
delete(c.up, id) delete(c.up, id)
c.mu.Unlock() c.mu.Unlock()
go func(clients []client) { go func(clients []group.Client) {
for _, c := range clients { for _, c := range clients {
c.pushConn(conn.id, nil, nil, "") c.PushConn(conn.id, nil, nil, "")
} }
}(c.Group().getClients(c)) }(c.Group().GetClients(c))
conn.pc.Close() conn.pc.Close()
return true return true
@ -512,7 +475,7 @@ func gotOffer(c *webClient, id string, offer webrtc.SessionDescription, renegoti
func gotAnswer(c *webClient, id string, answer webrtc.SessionDescription) error { func gotAnswer(c *webClient, id string, answer webrtc.SessionDescription) error {
down := getDownConn(c, id) down := getDownConn(c, id)
if down == nil { if down == nil {
return protocolError("unknown id in answer") return group.ProtocolError("unknown id in answer")
} }
err := down.pc.SetRemoteDescription(answer) err := down.pc.SetRemoteDescription(answer)
if err != nil { if err != nil {
@ -555,8 +518,8 @@ func (c *webClient) setRequested(requested map[string]uint32) error {
return nil return nil
} }
func pushConns(c client) { func pushConns(c group.Client) {
clients := c.Group().getClients(c) clients := c.Group().GetClients(c)
for _, cc := range clients { for _, cc := range clients {
ccc, ok := cc.(*webClient) ccc, ok := cc.(*webClient)
if ok { if ok {
@ -602,7 +565,7 @@ func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rt
return down, nil return down, nil
} }
func (c *webClient) pushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error { func (c *webClient) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error {
err := c.action(pushConnAction{id, up, tracks}) err := c.action(pushConnAction{id, up, tracks})
if err != nil { if err != nil {
return err return err
@ -666,7 +629,7 @@ func startClient(conn *websocket.Conn) (err error) {
c := &webClient{ c := &webClient{
id: m.Id, id: m.Id,
credentials: clientCredentials{ credentials: group.ClientCredentials{
m.Username, m.Username,
m.Password, m.Password,
}, },
@ -703,24 +666,24 @@ func startClient(conn *websocket.Conn) (err error) {
} }
if m.Type != "join" { if m.Type != "join" {
return protocolError("you must join a group first") return group.ProtocolError("you must join a group first")
} }
g, err := addClient(m.Group, c) g, err := group.AddClient(m.Group, c)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
err = userError("group does not exist") err = group.UserError("group does not exist")
} }
return return
} }
if redirect := g.Redirect(); redirect != "" { if redirect := g.Redirect(); redirect != "" {
// We normally redirect at the HTTP level, but the group // We normally redirect at the HTTP level, but the group
// description could have been edited in the meantime. // description could have been edited in the meantime.
err = userError("group is now at " + redirect) err = group.UserError("group is now at " + redirect)
return return
} }
c.group = g c.group = g
defer delClient(c) defer group.DelClient(c)
return clientLoop(c, conn) return clientLoop(c, conn)
} }
@ -737,7 +700,7 @@ type addLabelAction struct {
} }
type pushConnsAction struct { type pushConnsAction struct {
c client c group.Client
} }
type connectionFailedAction struct { type connectionFailedAction struct {
@ -768,14 +731,14 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
Permissions: c.permissions, Permissions: c.permissions,
}) })
h := c.group.getChatHistory() h := c.group.GetChatHistory()
for _, m := range h { for _, m := range h {
err := c.write(clientMessage{ err := c.write(clientMessage{
Type: "chat", Type: "chat",
Id: m.id, Id: m.Id,
Username: m.user, Username: m.User,
Value: m.value, Value: m.Value,
Kind: m.kind, Kind: m.Kind,
}) })
if err != nil { if err != nil {
return err return err
@ -853,7 +816,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
for i, t := range tracks { for i, t := range tracks {
ts[i] = t ts[i] = t
} }
go a.c.pushConn(u.id, u, ts, u.label) go a.c.PushConn(u.id, u, ts, u.label)
} }
case connectionFailedAction: case connectionFailedAction:
if down := getDownConn(c, a.id); down != nil { if down := getDownConn(c, a.id); down != nil {
@ -867,7 +830,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
for i, t := range down.tracks { for i, t := range down.tracks {
tracks[i] = t.remote tracks[i] = t.remote
} }
go c.pushConn( go c.PushConn(
down.remote.Id(), down.remote, down.remote.Id(), down.remote,
tracks, down.remote.Label(), tracks, down.remote.Label(),
) )
@ -899,7 +862,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
} }
} }
case kickAction: case kickAction:
return userError(a.message) return group.UserError(a.message)
default: default:
log.Printf("unexpected action %T", a) log.Printf("unexpected action %T", a)
return errors.New("unexpected action") return errors.New("unexpected action")
@ -931,7 +894,7 @@ func failConnection(c *webClient, id string, message string) error {
} }
} }
if message != "" { if message != "" {
err := c.error(userError(message)) err := c.error(group.UserError(message))
if err != nil { if err != nil {
return err return err
} }
@ -939,15 +902,15 @@ func failConnection(c *webClient, id string, message string) error {
return nil return nil
} }
func setPermissions(g *group, id string, perm string) error { func setPermissions(g *group.Group, id string, perm string) error {
client := g.getClient(id) client := g.GetClient(id)
if client == nil { if client == nil {
return userError("no such user") return group.UserError("no such user")
} }
c, ok := client.(*webClient) c, ok := client.(*webClient)
if !ok { if !ok {
return userError("this is not a real user") return group.UserError("this is not a real user")
} }
switch perm { switch perm {
@ -964,7 +927,7 @@ func setPermissions(g *group, id string, perm string) error {
case "unpresent": case "unpresent":
c.permissions.Present = false c.permissions.Present = false
default: default:
return userError("unknown permission") return group.UserError("unknown permission")
} }
return c.action(permissionsChangedAction{}) return c.action(permissionsChangedAction{})
} }
@ -973,18 +936,18 @@ func (c *webClient) kick(message string) error {
return c.action(kickAction{message}) return c.action(kickAction{message})
} }
func kickClient(g *group, id string, message string) error { func kickClient(g *group.Group, id string, message string) error {
client := g.getClient(id) client := g.GetClient(id)
if client == nil { if client == nil {
return userError("no such user") return group.UserError("no such user")
} }
c, ok := client.(kickable) c, ok := client.(group.Kickable)
if !ok { if !ok {
return userError("this client is not kickable") return group.UserError("this client is not kickable")
} }
return c.kick(message) return c.Kick(message)
} }
func handleClientMessage(c *webClient, m clientMessage) error { func handleClientMessage(c *webClient, m clientMessage) error {
@ -1000,10 +963,10 @@ func handleClientMessage(c *webClient, m clientMessage) error {
Type: "abort", Type: "abort",
Id: m.Id, Id: m.Id,
}) })
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
if m.Offer == nil { if m.Offer == nil {
return protocolError("null offer") return group.ProtocolError("null offer")
} }
err := gotOffer( err := gotOffer(
c, m.Id, *m.Offer, m.Kind == "renegotiate", m.Labels, c, m.Id, *m.Offer, m.Kind == "renegotiate", m.Labels,
@ -1014,7 +977,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
} }
case "answer": case "answer":
if m.Answer == nil { if m.Answer == nil {
return protocolError("null answer") return group.ProtocolError("null answer")
} }
err := gotAnswer(c, m.Id, *m.Answer) err := gotAnswer(c, m.Id, *m.Answer)
if err != nil { if err != nil {
@ -1037,15 +1000,15 @@ func handleClientMessage(c *webClient, m clientMessage) error {
} }
case "ice": case "ice":
if m.Candidate == nil { if m.Candidate == nil {
return protocolError("null candidate") return group.ProtocolError("null candidate")
} }
err := gotICE(c, m.Candidate, m.Id) err := gotICE(c, m.Candidate, m.Id)
if err != nil { if err != nil {
log.Printf("ICE: %v", err) log.Printf("ICE: %v", err)
} }
case "chat": case "chat":
c.group.addToChatHistory(m.Id, m.Username, m.Kind, m.Value) c.group.AddToChatHistory(m.Id, m.Username, m.Kind, m.Value)
clients := c.group.getClients(nil) clients := c.group.GetClients(nil)
for _, cc := range clients { for _, cc := range clients {
cc, ok := cc.(*webClient) cc, ok := cc.(*webClient)
if ok { if ok {
@ -1055,9 +1018,9 @@ func handleClientMessage(c *webClient, m clientMessage) error {
case "groupaction": case "groupaction":
switch m.Kind { switch m.Kind {
case "clearchat": case "clearchat":
c.group.clearChatHistory() c.group.ClearChatHistory()
m := clientMessage{Type: "clearchat"} m := clientMessage{Type: "clearchat"}
clients := c.group.getClients(nil) clients := c.group.GetClients(nil)
for _, cc := range clients { for _, cc := range clients {
cc, ok := cc.(*webClient) cc, ok := cc.(*webClient)
if ok { if ok {
@ -1066,21 +1029,21 @@ func handleClientMessage(c *webClient, m clientMessage) error {
} }
case "lock", "unlock": case "lock", "unlock":
if !c.permissions.Op { if !c.permissions.Op {
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
c.group.SetLocked(m.Kind == "lock") c.group.SetLocked(m.Kind == "lock")
case "record": case "record":
if !c.permissions.Record { if !c.permissions.Record {
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
for _, cc := range c.group.getClients(c) { for _, cc := range c.group.GetClients(c) {
_, ok := cc.(*diskClient) _, ok := cc.(*diskClient)
if ok { if ok {
return c.error(userError("already recording")) return c.error(group.UserError("already recording"))
} }
} }
disk := NewDiskClient(c.group) disk := NewDiskClient(c.group)
_, err := addClient(c.group.name, disk) _, err := group.AddClient(c.group.Name(), disk)
if err != nil { if err != nil {
disk.Close() disk.Close()
return c.error(err) return c.error(err)
@ -1088,23 +1051,23 @@ func handleClientMessage(c *webClient, m clientMessage) error {
go pushConns(disk) go pushConns(disk)
case "unrecord": case "unrecord":
if !c.permissions.Record { if !c.permissions.Record {
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
for _, cc := range c.group.getClients(c) { for _, cc := range c.group.GetClients(c) {
disk, ok := cc.(*diskClient) disk, ok := cc.(*diskClient)
if ok { if ok {
disk.Close() disk.Close()
delClient(disk) group.DelClient(disk)
} }
} }
default: default:
return protocolError("unknown group action") return group.ProtocolError("unknown group action")
} }
case "useraction": case "useraction":
switch m.Kind { switch m.Kind {
case "op", "unop", "present", "unpresent": case "op", "unop", "present", "unpresent":
if !c.permissions.Op { if !c.permissions.Op {
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
err := setPermissions(c.group, m.Id, m.Kind) err := setPermissions(c.group, m.Id, m.Kind)
if err != nil { if err != nil {
@ -1112,7 +1075,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
} }
case "kick": case "kick":
if !c.permissions.Op { if !c.permissions.Op {
return c.error(userError("not authorised")) return c.error(group.UserError("not authorised"))
} }
message := m.Value message := m.Value
if message == "" { if message == "" {
@ -1123,7 +1086,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
return c.error(err) return c.error(err)
} }
default: default:
return protocolError("unknown user action") return group.ProtocolError("unknown user action")
} }
case "pong": case "pong":
// nothing // nothing
@ -1133,7 +1096,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
}) })
default: default:
log.Printf("unexpected message: %v", m.Type) log.Printf("unexpected message: %v", m.Type)
return protocolError("unexpected message") return group.ProtocolError("unexpected message")
} }
return nil return nil
} }
@ -1227,7 +1190,7 @@ func (c *webClient) close(data []byte) error {
func (c *webClient) error(err error) error { func (c *webClient) error(err error) error {
switch e := err.(type) { switch e := err.(type) {
case userError: case group.UserError:
return c.write(clientMessage{ return c.write(clientMessage{
Type: "usermessage", Type: "usermessage",
Kind: "error", Kind: "error",

View file

@ -18,6 +18,8 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"sfu/group"
) )
var server *http.Server var server *http.Server
@ -47,8 +49,8 @@ func webserver() {
IdleTimeout: 120 * time.Second, IdleTimeout: 120 * time.Second,
} }
server.RegisterOnShutdown(func() { server.RegisterOnShutdown(func() {
rangeGroups(func (g *group) bool { group.Range(func (g *group.Group) bool {
go g.shutdown("server is shutting down") go g.Shutdown("server is shutting down")
return true return true
}) })
}) })
@ -139,7 +141,7 @@ func groupHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
g, err := addGroup(name, nil) g, err := group.Add(name, nil)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
notFound(w) notFound(w)
@ -168,7 +170,7 @@ func publicHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
g := getPublicGroups() g := group.GetPublic()
e := json.NewEncoder(w) e := json.NewEncoder(w)
e.Encode(g) e.Encode(g)
return return
@ -409,8 +411,8 @@ func handleGroupAction(w http.ResponseWriter, r *http.Request, group string) {
} }
} }
func checkGroupPermissions(w http.ResponseWriter, r *http.Request, group string) bool { func checkGroupPermissions(w http.ResponseWriter, r *http.Request, groupname string) bool {
desc, err := getDescription(group) desc, err := group.GetDescription(groupname)
if err != nil { if err != nil {
return false return false
} }
@ -420,7 +422,7 @@ func checkGroupPermissions(w http.ResponseWriter, r *http.Request, group string)
return false return false
} }
p, err := getPermission(desc, clientCredentials{user, pass}) p, err := desc.GetPermission(group.ClientCredentials{user, pass})
if err != nil || !p.Record { if err != nil || !p.Record {
return false return false
} }