diff --git a/group.go b/group.go index fcfe359..343b827 100644 --- a/group.go +++ b/group.go @@ -10,6 +10,7 @@ import ( "log" "os" "path/filepath" + "sort" "strings" "sync" "sync/atomic" @@ -241,6 +242,24 @@ func addGroup(name string, desc *groupDescription) (*group, error) { return g, nil } +func getGroupNames() []string { + groups.mu.Lock() + defer groups.mu.Unlock() + + names := make([]string, 0, len(groups.groups)) + for name := range groups.groups { + names = append(names, name) + } + return names +} + +func getGroup(name string) *group { + groups.mu.Lock() + defer groups.mu.Unlock() + + return groups.groups[name] +} + func delGroupUnlocked(name string) bool { g := groups.groups[name] if g == nil { @@ -598,3 +617,97 @@ func readPublicGroups() { } } } + +type groupStats struct { + name string + clients []clientStats +} + +type clientStats struct { + id string + up, down []connStats +} + +type connStats struct { + id string + tracks []trackStats +} + +type trackStats struct { + bitrate uint64 + loss uint8 +} + +func getGroupStats() []groupStats { + names := getGroupNames() + + gs := make([]groupStats, 0, len(names)) + for _, name := range names { + g := getGroup(name) + if g == nil { + continue + } + clients := g.getClients(nil) + stats := groupStats{ + name: name, + clients: make([]clientStats, 0, len(clients)), + } + for _, c := range clients { + cs := getClientStats(c) + stats.clients = append(stats.clients, cs) + } + sort.Slice(stats.clients, func(i, j int) bool { + return stats.clients[i].id < stats.clients[j].id + }) + gs = append(gs, stats) + } + sort.Slice(gs, func(i, j int) bool { + return gs[i].name < gs[j].name + }) + + return gs +} + +func getClientStats(c *client) clientStats { + c.mu.Lock() + defer c.mu.Unlock() + + cs := clientStats{ + id: c.id, + } + + for _, up := range c.up { + conns := connStats{id: up.id} + for _, t := range up.tracks { + expected, lost, _ := t.cache.GetStats(false) + if expected == 0 { + expected = 1 + } + conns.tracks = append(conns.tracks, trackStats{ + bitrate: atomic.LoadUint64(&t.maxBitrate), + loss: uint8(lost * 100 / expected), + }) + } + cs.up = append(cs.up, conns) + } + sort.Slice(cs.up, func(i, j int) bool { + return cs.up[i].id < cs.up[j].id + }) + + for _, down := range c.down { + conns := connStats{id: down.id} + for _, t := range down.tracks { + loss := atomic.LoadUint32(&t.loss) + conns.tracks = append(conns.tracks, trackStats{ + bitrate: atomic.LoadUint64(&t.maxBitrate.bitrate), + loss: uint8((loss * 100) / 256), + }) + } + cs.down = append(cs.down, conns) + } + sort.Slice(cs.down, func(i, j int) bool { + return cs.down[i].id < cs.down[j].id + }) + + return cs +} diff --git a/sfu.go b/sfu.go index d6ceece..edb2c13 100644 --- a/sfu.go +++ b/sfu.go @@ -6,13 +6,19 @@ package main import ( + "bufio" "encoding/json" + "errors" "flag" + "fmt" + "html" + "io" "log" "net/http" "os" "os/signal" "path/filepath" + "strings" "syscall" "time" @@ -44,6 +50,7 @@ func main() { }) http.HandleFunc("/ws", wsHandler) http.HandleFunc("/public-groups.json", publicHandler) + http.HandleFunc("/stats", statsHandler) go readPublicGroups() @@ -96,6 +103,102 @@ func publicHandler(w http.ResponseWriter, r *http.Request) { return } +func getPassword() (string, string, error) { + f, err := os.Open(filepath.Join(dataDir, "passwd")) + if err != nil { + return "", "", err + } + defer f.Close() + + r := bufio.NewReader(f) + + s, err := r.ReadString('\n') + if err != nil { + return "", "", err + } + + l := strings.SplitN(strings.TrimSpace(s), ":", 2) + if len(l) != 2 { + return "", "", errors.New("couldn't parse passwords") + } + + return l[0], l[1], nil +} + +func statsHandler(w http.ResponseWriter, r *http.Request) { + bail := func() { + w.Header().Set("www-authenticate", "basic realm=\"stats\"") + http.Error(w, "Haha!", http.StatusUnauthorized) + } + + u, p, err := getPassword() + if err != nil { + log.Printf("Passwd: %v", err) + bail() + return + } + + username, password, ok := r.BasicAuth() + if !ok || username != u || password != p { + bail() + return + } + + w.Header().Set("content-type", "text/html; charset=utf-8") + w.Header().Set("cache-control", "no-cache") + if r.Method == "HEAD" { + return + } + + stats := getGroupStats() + + fmt.Fprintf(w, "\n\n") + fmt.Fprintf(w, "Stats\n") + fmt.Fprintf(w, "\n") + + printBitrate := func(w io.Writer, rate uint64) error { + var err error + if rate != 0 && rate != ^uint64(0) { + _, err = fmt.Fprintf(w, "%v", rate) + } + return err + } + + printTrack := func(w io.Writer, t trackStats) { + fmt.Fprintf(w, "") + fmt.Fprintf(w, "") + printBitrate(w, t.bitrate) + fmt.Fprintf(w, "") + fmt.Fprintf(w, "%d%%\n", + t.loss, + ) + } + + for _, gs := range stats { + fmt.Fprintf(w, "

%v

\n", html.EscapeString(gs.name)) + fmt.Fprintf(w, "") + for _, cs := range gs.clients { + fmt.Fprintf(w, "\n", cs.id) + for _, up := range cs.up { + fmt.Fprintf(w, "\n", + up.id) + for _, t := range up.tracks { + printTrack(w, t) + } + } + for _, up := range cs.down { + fmt.Fprintf(w, "\n", + up.id) + for _, t := range up.tracks { + printTrack(w, t) + } + } + } + fmt.Fprintf(w, "
%v
Up%v
Down %v
\n") + } + fmt.Fprintf(w, "\n") +} + var upgrader websocket.Upgrader func wsHandler(w http.ResponseWriter, r *http.Request) {