1
Fork 0

Add primitive monitoring interface.

This commit is contained in:
Juliusz Chroboczek 2020-04-29 16:08:07 +02:00
parent 17941fa96d
commit 3f7439fc48
2 changed files with 216 additions and 0 deletions

113
group.go
View File

@ -10,6 +10,7 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
@ -241,6 +242,24 @@ func addGroup(name string, desc *groupDescription) (*group, error) {
return g, nil
}
func getGroupNames() []string {
groups.mu.Lock()
defer groups.mu.Unlock()
names := make([]string, 0, len(groups.groups))
for name := range groups.groups {
names = append(names, name)
}
return names
}
func getGroup(name string) *group {
groups.mu.Lock()
defer groups.mu.Unlock()
return groups.groups[name]
}
func delGroupUnlocked(name string) bool {
g := groups.groups[name]
if g == nil {
@ -598,3 +617,97 @@ func readPublicGroups() {
}
}
}
type groupStats struct {
name string
clients []clientStats
}
type clientStats struct {
id string
up, down []connStats
}
type connStats struct {
id string
tracks []trackStats
}
type trackStats struct {
bitrate uint64
loss uint8
}
func getGroupStats() []groupStats {
names := getGroupNames()
gs := make([]groupStats, 0, len(names))
for _, name := range names {
g := getGroup(name)
if g == nil {
continue
}
clients := g.getClients(nil)
stats := groupStats{
name: name,
clients: make([]clientStats, 0, len(clients)),
}
for _, c := range clients {
cs := getClientStats(c)
stats.clients = append(stats.clients, cs)
}
sort.Slice(stats.clients, func(i, j int) bool {
return stats.clients[i].id < stats.clients[j].id
})
gs = append(gs, stats)
}
sort.Slice(gs, func(i, j int) bool {
return gs[i].name < gs[j].name
})
return gs
}
func getClientStats(c *client) clientStats {
c.mu.Lock()
defer c.mu.Unlock()
cs := clientStats{
id: c.id,
}
for _, up := range c.up {
conns := connStats{id: up.id}
for _, t := range up.tracks {
expected, lost, _ := t.cache.GetStats(false)
if expected == 0 {
expected = 1
}
conns.tracks = append(conns.tracks, trackStats{
bitrate: atomic.LoadUint64(&t.maxBitrate),
loss: uint8(lost * 100 / expected),
})
}
cs.up = append(cs.up, conns)
}
sort.Slice(cs.up, func(i, j int) bool {
return cs.up[i].id < cs.up[j].id
})
for _, down := range c.down {
conns := connStats{id: down.id}
for _, t := range down.tracks {
loss := atomic.LoadUint32(&t.loss)
conns.tracks = append(conns.tracks, trackStats{
bitrate: atomic.LoadUint64(&t.maxBitrate.bitrate),
loss: uint8((loss * 100) / 256),
})
}
cs.down = append(cs.down, conns)
}
sort.Slice(cs.down, func(i, j int) bool {
return cs.down[i].id < cs.down[j].id
})
return cs
}

103
sfu.go
View File

@ -6,13 +6,19 @@
package main
import (
"bufio"
"encoding/json"
"errors"
"flag"
"fmt"
"html"
"io"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
@ -44,6 +50,7 @@ func main() {
})
http.HandleFunc("/ws", wsHandler)
http.HandleFunc("/public-groups.json", publicHandler)
http.HandleFunc("/stats", statsHandler)
go readPublicGroups()
@ -96,6 +103,102 @@ func publicHandler(w http.ResponseWriter, r *http.Request) {
return
}
func getPassword() (string, string, error) {
f, err := os.Open(filepath.Join(dataDir, "passwd"))
if err != nil {
return "", "", err
}
defer f.Close()
r := bufio.NewReader(f)
s, err := r.ReadString('\n')
if err != nil {
return "", "", err
}
l := strings.SplitN(strings.TrimSpace(s), ":", 2)
if len(l) != 2 {
return "", "", errors.New("couldn't parse passwords")
}
return l[0], l[1], nil
}
func statsHandler(w http.ResponseWriter, r *http.Request) {
bail := func() {
w.Header().Set("www-authenticate", "basic realm=\"stats\"")
http.Error(w, "Haha!", http.StatusUnauthorized)
}
u, p, err := getPassword()
if err != nil {
log.Printf("Passwd: %v", err)
bail()
return
}
username, password, ok := r.BasicAuth()
if !ok || username != u || password != p {
bail()
return
}
w.Header().Set("content-type", "text/html; charset=utf-8")
w.Header().Set("cache-control", "no-cache")
if r.Method == "HEAD" {
return
}
stats := getGroupStats()
fmt.Fprintf(w, "<!DOCTYPE html>\n<html><head>\n")
fmt.Fprintf(w, "<title>Stats</title>\n")
fmt.Fprintf(w, "<head><body>\n")
printBitrate := func(w io.Writer, rate uint64) error {
var err error
if rate != 0 && rate != ^uint64(0) {
_, err = fmt.Fprintf(w, "%v", rate)
}
return err
}
printTrack := func(w io.Writer, t trackStats) {
fmt.Fprintf(w, "<tr><td></td><td></td><td></td>")
fmt.Fprintf(w, "<td>")
printBitrate(w, t.bitrate)
fmt.Fprintf(w, "</td>")
fmt.Fprintf(w, "<td>%d%%</td></tr>\n",
t.loss,
)
}
for _, gs := range stats {
fmt.Fprintf(w, "<p>%v</p>\n", html.EscapeString(gs.name))
fmt.Fprintf(w, "<table>")
for _, cs := range gs.clients {
fmt.Fprintf(w, "<tr><td>%v</td></tr>\n", cs.id)
for _, up := range cs.up {
fmt.Fprintf(w, "<tr><td></td><td>Up</td><td>%v</td></tr>\n",
up.id)
for _, t := range up.tracks {
printTrack(w, t)
}
}
for _, up := range cs.down {
fmt.Fprintf(w, "<tr><td></td><td>Down</td><td> %v</td></tr>\n",
up.id)
for _, t := range up.tracks {
printTrack(w, t)
}
}
}
fmt.Fprintf(w, "</table>\n")
}
fmt.Fprintf(w, "</body></html>\n")
}
var upgrader websocket.Upgrader
func wsHandler(w http.ResponseWriter, r *http.Request) {