1
Fork 0
galene/group.go

794 lines
16 KiB
Go
Raw Normal View History

2020-04-24 19:38:21 +02:00
// Copyright (c) 2020 by Juliusz Chroboczek.
// This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist.
package main
import (
2020-04-25 02:25:51 +02:00
"encoding/json"
2020-04-24 19:38:21 +02:00
"log"
2020-04-25 02:25:51 +02:00
"os"
"path/filepath"
2020-04-29 16:08:07 +02:00
"sort"
2020-04-25 04:08:43 +02:00
"strings"
2020-04-25 17:36:35 +02:00
"sync"
2020-04-28 15:26:50 +02:00
"sync/atomic"
"time"
2020-04-24 19:38:21 +02:00
2020-04-30 20:15:52 +02:00
"sfu/estimator"
"sfu/jitter"
"sfu/mono"
"sfu/packetcache"
2020-04-24 19:38:21 +02:00
"github.com/pion/webrtc/v2"
)
type upTrack struct {
track *webrtc.Track
rate *estimator.Estimator
cache *packetcache.Cache
jitter *jitter.Estimator
maxBitrate uint64
lastPLI uint64
lastSenderReport uint32
lastSenderReportTime uint32
mu sync.Mutex
local []*downTrack
}
func (up *upTrack) addLocal(local *downTrack) {
up.mu.Lock()
defer up.mu.Unlock()
up.local = append(up.local, local)
}
func (up *upTrack) delLocal(local *downTrack) bool {
up.mu.Lock()
defer up.mu.Unlock()
for i, l := range up.local {
if l == local {
up.local = append(up.local[:i], up.local[i+1:]...)
return true
}
}
return false
}
func (up *upTrack) getLocal() []*downTrack {
up.mu.Lock()
defer up.mu.Unlock()
local := make([]*downTrack, len(up.local))
copy(local, up.local)
return local
2020-04-24 19:38:21 +02:00
}
func (up *upTrack) hasRtcpFb(tpe, parameter string) bool {
for _, fb := range up.track.Codec().RTCPFeedback {
if fb.Type == tpe && fb.Parameter == parameter {
return true
}
}
return false
}
2020-04-24 19:38:21 +02:00
type upConnection struct {
2020-04-26 01:33:18 +02:00
id string
label string
pc *webrtc.PeerConnection
trackCount int
tracks []*upTrack
2020-04-26 01:33:18 +02:00
}
type bitrate struct {
bitrate uint64
microseconds uint64
2020-04-26 01:33:18 +02:00
}
2020-04-28 15:26:50 +02:00
const receiverReportTimeout = 8000000
func (br *bitrate) Set(bitrate uint64, now uint64) {
// this is racy -- a reader might read the
// data between the two writes. This shouldn't
// matter, we'll recover at the next sample.
atomic.StoreUint64(&br.bitrate, bitrate)
atomic.StoreUint64(&br.microseconds, now)
}
func (br *bitrate) Get(now uint64) uint64 {
ts := atomic.LoadUint64(&br.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return ^uint64(0)
}
return atomic.LoadUint64(&br.bitrate)
}
type receiverStats struct {
loss uint32
jitter uint32
microseconds uint64
}
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
atomic.StoreUint32(&s.loss, uint32(loss))
atomic.StoreUint32(&s.jitter, jitter)
atomic.StoreUint64(&s.microseconds, now)
}
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
ts := atomic.LoadUint64(&s.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return 0, 0
}
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
}
2020-04-26 01:33:18 +02:00
type downTrack struct {
track *webrtc.Track
remote *upTrack
maxLossBitrate *bitrate
maxREMBBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
}
func (down *downTrack) GetMaxBitrate(now uint64) uint64 {
br1 := down.maxLossBitrate.Get(now)
br2 := down.maxREMBBitrate.Get(now)
if br1 < br2 {
return br1
}
return br2
2020-04-24 19:38:21 +02:00
}
type downConnection struct {
2020-04-26 01:33:18 +02:00
id string
pc *webrtc.PeerConnection
remote *upConnection
tracks []*downTrack
2020-04-24 19:38:21 +02:00
}
type client struct {
2020-05-09 19:39:34 +02:00
group *group
id string
username string
permissions userPermission
requestedAudio bool
requestedVideo bool
done chan struct{}
writeCh chan interface{}
writerDone chan struct{}
actionCh chan interface{}
mu sync.Mutex
down map[string]*downConnection
up map[string]*upConnection
2020-04-24 19:38:21 +02:00
}
2020-04-25 21:16:49 +02:00
type chatHistoryEntry struct {
id string
user string
value string
me bool
}
const (
minVideoRate = 38400
minAudioRate = 9600
)
2020-04-24 19:38:21 +02:00
type group struct {
2020-04-25 02:25:51 +02:00
name string
dead bool
2020-04-25 02:25:51 +02:00
description *groupDescription
videoCount uint32
2020-04-24 19:38:21 +02:00
mu sync.Mutex
2020-04-25 21:29:21 +02:00
clients map[string]*client
2020-04-25 21:16:49 +02:00
history []chatHistoryEntry
2020-04-24 19:38:21 +02:00
}
type delConnAction struct {
2020-04-24 19:38:21 +02:00
id string
}
type addTrackAction struct {
track *upTrack
2020-04-24 19:38:21 +02:00
remote *upConnection
done bool
}
type addLabelAction struct {
id string
label string
}
type getUpAction struct {
ch chan<- string
}
type pushTracksAction struct {
c *client
}
type connectionFailedAction struct {
id string
}
type permissionsChangedAction struct{}
2020-04-25 17:36:35 +02:00
type kickAction struct{}
2020-04-24 19:38:21 +02:00
var groups struct {
mu sync.Mutex
groups map[string]*group
api *webrtc.API
}
2020-04-25 04:08:43 +02:00
func addGroup(name string, desc *groupDescription) (*group, error) {
2020-04-24 19:38:21 +02:00
groups.mu.Lock()
defer groups.mu.Unlock()
if groups.groups == nil {
groups.groups = make(map[string]*group)
2020-04-27 03:06:45 +02:00
s := webrtc.SettingEngine{}
2020-04-27 03:08:03 +02:00
s.SetTrickle(true)
2020-04-24 19:38:21 +02:00
m := webrtc.MediaEngine{}
m.RegisterCodec(webrtc.NewRTPVP8CodecExt(
webrtc.DefaultPayloadTypeVP8, 90000,
[]webrtc.RTCPFeedback{
2020-04-27 03:08:03 +02:00
{"goog-remb", ""},
{"nack", ""},
2020-04-27 03:08:03 +02:00
{"nack", "pli"},
},
"",
))
2020-04-24 19:38:21 +02:00
m.RegisterCodec(webrtc.NewRTPOpusCodec(
webrtc.DefaultPayloadTypeOpus, 48000,
))
2020-04-24 19:38:21 +02:00
groups.api = webrtc.NewAPI(
2020-04-27 03:06:45 +02:00
webrtc.WithSettingEngine(s),
2020-04-24 19:38:21 +02:00
webrtc.WithMediaEngine(m),
)
}
2020-04-25 04:08:43 +02:00
var err error
2020-04-24 19:38:21 +02:00
g := groups.groups[name]
if g == nil {
2020-04-25 17:36:35 +02:00
if desc == nil {
2020-04-25 04:08:43 +02:00
desc, err = getDescription(name)
if err != nil {
return nil, err
}
2020-04-25 02:25:51 +02:00
}
2020-04-24 19:38:21 +02:00
g = &group{
2020-04-25 02:25:51 +02:00
name: name,
description: desc,
2020-04-25 21:29:21 +02:00
clients: make(map[string]*client),
2020-04-24 19:38:21 +02:00
}
groups.groups[name] = g
2020-04-25 04:08:43 +02:00
} else if desc != nil {
g.description = desc
} else if g.dead || time.Since(g.description.loadTime) > 5*time.Second {
changed, err := descriptionChanged(name, g.description)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("Reading group %v: %v", name, err)
}
g.dead = true
delGroupUnlocked(name)
return nil, err
}
if changed {
desc, err := getDescription(name)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("Reading group %v: %v",
name, err)
}
g.dead = true
delGroupUnlocked(name)
return nil, err
}
g.dead = false
g.description = desc
} else {
g.description.loadTime = time.Now()
}
2020-04-24 19:38:21 +02:00
}
return g, nil
}
2020-04-29 16:08:07 +02:00
func getGroupNames() []string {
groups.mu.Lock()
defer groups.mu.Unlock()
names := make([]string, 0, len(groups.groups))
for name := range groups.groups {
names = append(names, name)
}
return names
}
func getGroup(name string) *group {
groups.mu.Lock()
defer groups.mu.Unlock()
return groups.groups[name]
}
2020-04-25 02:01:04 +02:00
func delGroupUnlocked(name string) bool {
2020-04-24 19:38:21 +02:00
g := groups.groups[name]
if g == nil {
return true
}
if len(g.clients) != 0 {
return false
}
delete(groups.groups, name)
return true
}
type userid struct {
id string
username string
}
2020-04-25 02:25:51 +02:00
func addClient(name string, client *client, user, pass string) (*group, []userid, error) {
2020-04-25 04:08:43 +02:00
g, err := addGroup(name, nil)
2020-04-24 19:38:21 +02:00
if err != nil {
return nil, nil, err
}
2020-04-25 02:25:51 +02:00
perms, err := getPermission(g.description, user, pass)
if err != nil {
return nil, nil, err
}
client.permissions = perms
2020-04-24 19:38:21 +02:00
g.mu.Lock()
defer g.mu.Unlock()
2020-04-25 21:29:21 +02:00
2020-04-25 18:09:31 +02:00
if !perms.Op && g.description.MaxClients > 0 {
2020-04-25 02:37:41 +02:00
if len(g.clients) >= g.description.MaxClients {
return nil, nil, userError("too many users")
}
}
2020-04-25 21:29:21 +02:00
if g.clients[client.id] != nil {
return nil, nil, protocolError("duplicate client id")
}
var users []userid
2020-04-24 19:38:21 +02:00
for _, c := range g.clients {
users = append(users, userid{c.id, c.username})
}
2020-04-25 21:29:21 +02:00
g.clients[client.id] = client
2020-04-24 19:38:21 +02:00
return g, users, nil
}
func delClient(c *client) {
c.group.mu.Lock()
defer c.group.mu.Unlock()
g := c.group
2020-04-25 21:29:21 +02:00
if g.clients[c.id] != c {
log.Printf("Deleting unknown client")
return
}
delete(g.clients, c.id)
if len(g.clients) == 0 && !g.description.Public {
delGroupUnlocked(g.name)
2020-04-24 19:38:21 +02:00
}
}
func (g *group) getClients(except *client) []*client {
g.mu.Lock()
defer g.mu.Unlock()
clients := make([]*client, 0, len(g.clients))
for _, c := range g.clients {
if c != except {
clients = append(clients, c)
}
}
return clients
}
2020-04-25 17:36:35 +02:00
func (g *group) getClientUnlocked(id string) *client {
for _, c := range g.clients {
if c.id == id {
return c
}
}
return nil
}
2020-04-24 19:38:21 +02:00
func (g *group) Range(f func(c *client) bool) {
g.mu.Lock()
defer g.mu.Unlock()
for _, c := range g.clients {
ok := f(c)
2020-04-25 02:25:51 +02:00
if !ok {
break
2020-04-24 19:38:21 +02:00
}
}
}
2020-04-25 21:16:49 +02:00
const maxChatHistory = 20
2020-04-30 19:13:10 +02:00
func (g *group) clearChatHistory() {
g.mu.Lock()
defer g.mu.Unlock()
g.history = nil
}
2020-04-25 21:16:49 +02:00
func (g *group) addToChatHistory(id, user, value string, me bool) {
g.mu.Lock()
defer g.mu.Unlock()
if len(g.history) >= maxChatHistory {
copy(g.history, g.history[1:])
g.history = g.history[:len(g.history)-1]
}
g.history = append(g.history,
chatHistoryEntry{id: id, user: user, value: value, me: me},
)
}
func (g *group) getChatHistory() []chatHistoryEntry {
g.mu.Lock()
g.mu.Unlock()
h := make([]chatHistoryEntry, len(g.history))
copy(h, g.history)
return h
}
2020-04-24 19:38:21 +02:00
type writerDeadError int
func (err writerDeadError) Error() string {
return "client writer died"
}
func (c *client) write(m clientMessage) error {
select {
case c.writeCh <- m:
return nil
case <-c.writerDone:
return writerDeadError(0)
}
}
2020-04-25 17:30:19 +02:00
func (c *client) error(err error) error {
switch e := err.(type) {
case userError:
return c.write(clientMessage{
Type: "error",
Value: "The server said: " + string(e),
2020-04-25 17:30:19 +02:00
})
default:
return err
}
}
2020-04-24 19:38:21 +02:00
type clientDeadError int
func (err clientDeadError) Error() string {
return "client dead"
}
func (c *client) action(m interface{}) error {
select {
case c.actionCh <- m:
return nil
case <-c.done:
return clientDeadError(0)
}
}
2020-04-25 02:25:51 +02:00
type groupUser struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
func matchUser(user, pass string, users []groupUser) (bool, bool) {
for _, u := range users {
if u.Username == "" {
if u.Password == "" || u.Password == pass {
return true, true
}
} else if u.Username == user {
2020-04-25 02:25:51 +02:00
return true, (u.Password == "" || u.Password == pass)
}
}
return false, false
}
type groupDescription struct {
loadTime time.Time `json:"-"`
modTime time.Time `json:"-"`
fileSize int64 `json:"-"`
2020-04-25 02:25:51 +02:00
Public bool `json:"public,omitempty"`
2020-04-25 02:37:41 +02:00
MaxClients int `json:"max-clients,omitempty"`
2020-04-25 02:25:51 +02:00
AllowAnonymous bool `json:"allow-anonymous,omitempty"`
2020-04-25 18:09:31 +02:00
Op []groupUser `json:"op,omitempty"`
2020-04-25 02:25:51 +02:00
Presenter []groupUser `json:"presenter,omitempty"`
Other []groupUser `json:"other,omitempty"`
}
func descriptionChanged(name string, old *groupDescription) (bool, error) {
fi, err := os.Stat(filepath.Join(groupsDir, name+".json"))
if err != nil {
if os.IsNotExist(err) {
err = userError("group does not exist")
}
return false, err
}
if fi.Size() != old.fileSize || fi.ModTime() != old.modTime {
return true, err
}
return false, err
}
2020-04-25 02:25:51 +02:00
func getDescription(name string) (*groupDescription, error) {
r, err := os.Open(filepath.Join(groupsDir, name+".json"))
if err != nil {
if os.IsNotExist(err) {
err = userError("group does not exist")
}
return nil, err
}
defer r.Close()
var desc groupDescription
fi, err := r.Stat()
if err != nil {
return nil, err
}
desc.fileSize = fi.Size()
desc.modTime = fi.ModTime()
2020-04-25 02:25:51 +02:00
d := json.NewDecoder(r)
err = d.Decode(&desc)
if err != nil {
return nil, err
}
desc.loadTime = time.Now()
2020-04-25 02:25:51 +02:00
return &desc, nil
}
type userPermission struct {
2020-04-25 18:09:31 +02:00
Op bool `json:"op,omitempty"`
2020-04-25 02:25:51 +02:00
Present bool `json:"present,omitempty"`
}
func getPermission(desc *groupDescription, user, pass string) (userPermission, error) {
var p userPermission
if !desc.AllowAnonymous && user == "" {
2020-04-25 16:54:20 +02:00
return p, userError("anonymous users not allowed in this group, please choose a username")
2020-04-25 02:25:51 +02:00
}
2020-04-25 18:09:31 +02:00
if found, good := matchUser(user, pass, desc.Op); found {
2020-04-25 02:25:51 +02:00
if good {
2020-04-25 18:09:31 +02:00
p.Op = true
2020-04-25 02:25:51 +02:00
p.Present = true
return p, nil
}
2020-04-25 17:36:35 +02:00
return p, userError("not authorised")
2020-04-25 02:25:51 +02:00
}
if found, good := matchUser(user, pass, desc.Presenter); found {
if good {
p.Present = true
return p, nil
}
2020-04-25 17:36:35 +02:00
return p, userError("not authorised")
2020-04-25 02:25:51 +02:00
}
if found, good := matchUser(user, pass, desc.Other); found {
if good {
return p, nil
}
2020-04-25 17:36:35 +02:00
return p, userError("not authorised")
2020-04-25 02:25:51 +02:00
}
2020-04-25 17:36:35 +02:00
return p, userError("not authorised")
}
func setPermission(g *group, id string, perm string) error {
g.mu.Lock()
defer g.mu.Unlock()
c := g.getClientUnlocked(id)
if c == nil {
return userError("no such user")
}
switch perm {
case "op":
2020-04-25 18:09:31 +02:00
c.permissions.Op = true
2020-04-25 17:36:35 +02:00
case "unop":
2020-04-25 18:09:31 +02:00
c.permissions.Op = false
2020-04-25 17:36:35 +02:00
case "present":
c.permissions.Present = true
case "unpresent":
c.permissions.Present = false
default:
return userError("unknown permission")
}
return c.action(permissionsChangedAction{})
2020-04-25 17:36:35 +02:00
}
func kickClient(g *group, id string) error {
g.mu.Lock()
defer g.mu.Unlock()
c := g.getClientUnlocked(id)
if c == nil {
return userError("no such user")
}
return c.action(kickAction{})
2020-04-25 02:25:51 +02:00
}
2020-04-24 19:38:21 +02:00
type publicGroup struct {
Name string `json:"name"`
ClientCount int `json:"clientCount"`
}
func getPublicGroups() []publicGroup {
gs := make([]publicGroup, 0)
groups.mu.Lock()
defer groups.mu.Unlock()
for _, g := range groups.groups {
2020-04-25 02:25:51 +02:00
if g.description.Public {
2020-04-24 19:38:21 +02:00
gs = append(gs, publicGroup{
Name: g.name,
ClientCount: len(g.clients),
})
}
}
2020-04-30 22:32:44 +02:00
sort.Slice(gs, func(i, j int) bool {
return gs[i].Name < gs[j].Name
})
2020-04-24 19:38:21 +02:00
return gs
}
2020-04-25 04:08:43 +02:00
func readPublicGroups() {
dir, err := os.Open(groupsDir)
if err != nil {
return
}
defer dir.Close()
fis, err := dir.Readdir(-1)
if err != nil {
log.Printf("readPublicGroups: %v", err)
return
}
for _, fi := range fis {
if !strings.HasSuffix(fi.Name(), ".json") {
continue
}
2020-04-25 17:36:35 +02:00
name := fi.Name()[:len(fi.Name())-5]
2020-04-25 04:08:43 +02:00
desc, err := getDescription(name)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("Reading group %v: %v", name, err)
}
2020-04-25 04:08:43 +02:00
continue
}
if desc.Public {
addGroup(name, desc)
}
}
}
2020-04-29 16:08:07 +02:00
type groupStats struct {
name string
clients []clientStats
}
type clientStats struct {
id string
up, down []connStats
}
type connStats struct {
id string
tracks []trackStats
}
type trackStats struct {
2020-04-30 20:33:04 +02:00
bitrate uint64
maxBitrate uint64
loss uint8
2020-05-02 16:32:34 +02:00
jitter time.Duration
2020-04-29 16:08:07 +02:00
}
func getGroupStats() []groupStats {
names := getGroupNames()
gs := make([]groupStats, 0, len(names))
for _, name := range names {
g := getGroup(name)
if g == nil {
continue
}
clients := g.getClients(nil)
stats := groupStats{
name: name,
clients: make([]clientStats, 0, len(clients)),
}
for _, c := range clients {
cs := getClientStats(c)
stats.clients = append(stats.clients, cs)
}
sort.Slice(stats.clients, func(i, j int) bool {
return stats.clients[i].id < stats.clients[j].id
})
gs = append(gs, stats)
}
sort.Slice(gs, func(i, j int) bool {
return gs[i].name < gs[j].name
})
return gs
}
func getClientStats(c *client) clientStats {
c.mu.Lock()
defer c.mu.Unlock()
cs := clientStats{
id: c.id,
}
for _, up := range c.up {
conns := connStats{id: up.id}
for _, t := range up.tracks {
expected, lost, _, _ := t.cache.GetStats(false)
2020-04-29 16:08:07 +02:00
if expected == 0 {
expected = 1
}
2020-05-02 16:32:34 +02:00
loss := uint8(lost * 100 / expected)
jitter := time.Duration(t.jitter.Jitter()) *
(time.Second / time.Duration(t.jitter.HZ()))
2020-04-29 16:08:07 +02:00
conns.tracks = append(conns.tracks, trackStats{
2020-04-30 20:33:04 +02:00
bitrate: uint64(t.rate.Estimate()) * 8,
maxBitrate: atomic.LoadUint64(&t.maxBitrate),
2020-05-02 16:32:34 +02:00
loss: loss,
jitter: jitter,
2020-04-29 16:08:07 +02:00
})
}
cs.up = append(cs.up, conns)
}
sort.Slice(cs.up, func(i, j int) bool {
return cs.up[i].id < cs.up[j].id
})
for _, down := range c.down {
conns := connStats{id: down.id}
for _, t := range down.tracks {
loss, jitter := t.stats.Get(mono.Microseconds())
j := time.Duration(jitter) * time.Second /
2020-05-02 16:32:34 +02:00
time.Duration(t.track.Codec().ClockRate)
2020-04-29 16:08:07 +02:00
conns.tracks = append(conns.tracks, trackStats{
2020-04-30 20:33:04 +02:00
bitrate: uint64(t.rate.Estimate()) * 8,
maxBitrate: t.GetMaxBitrate(mono.Microseconds()),
loss: uint8(uint32(loss) * 100 / 256),
jitter: j,
2020-04-29 16:08:07 +02:00
})
}
cs.down = append(cs.down, conns)
}
sort.Slice(cs.down, func(i, j int) bool {
return cs.down[i].id < cs.down[j].id
})
return cs
}