From 7ce61a1100915f644b25bb395889782ead6a2748 Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Fri, 18 Sep 2020 10:14:57 +0200 Subject: [PATCH] Move stats code into its own module. Move RTP-specific code into its own file. --- rtpstats.go | 74 ++++++++++++++++++++++++++++ stats.go | 131 ------------------------------------------------- stats/stats.go | 73 +++++++++++++++++++++++++++ webserver.go | 47 +++++++++--------- 4 files changed, 171 insertions(+), 154 deletions(-) create mode 100644 rtpstats.go delete mode 100644 stats.go create mode 100644 stats/stats.go diff --git a/rtpstats.go b/rtpstats.go new file mode 100644 index 0000000..4e66784 --- /dev/null +++ b/rtpstats.go @@ -0,0 +1,74 @@ +package main + +import ( + "sort" + "sync/atomic" + "time" + + "sfu/rtptime" + "sfu/stats" +) + +func (c *webClient) GetStats() *stats.Client { + c.mu.Lock() + defer c.mu.Unlock() + + cs := stats.Client{ + Id: c.id, + } + + for _, up := range c.up { + conns := stats.Conn{ + Id: up.id, + } + tracks := up.getTracks() + for _, t := range tracks { + expected, lost, _, _ := t.cache.GetStats(false) + if expected == 0 { + expected = 1 + } + loss := uint8(lost * 100 / expected) + jitter := time.Duration(t.jitter.Jitter()) * + (time.Second / time.Duration(t.jitter.HZ())) + rate, _ := t.rate.Estimate() + conns.Tracks = append(conns.Tracks, stats.Track{ + Bitrate: uint64(rate) * 8, + Loss: loss, + Jitter: jitter, + }) + } + cs.Up = append(cs.Up, conns) + } + sort.Slice(cs.Up, func(i, j int) bool { + return cs.Up[i].Id < cs.Up[j].Id + }) + + jiffies := rtptime.Jiffies() + for _, down := range c.down { + conns := stats.Conn{ + Id: down.id, + MaxBitrate: down.GetMaxBitrate(jiffies), + } + for _, t := range down.tracks { + rate, _ := t.rate.Estimate() + rtt := rtptime.ToDuration(atomic.LoadUint64(&t.rtt), + rtptime.JiffiesPerSec) + loss, jitter := t.stats.Get(jiffies) + j := time.Duration(jitter) * time.Second / + time.Duration(t.track.Codec().ClockRate) + conns.Tracks = append(conns.Tracks, stats.Track{ + Bitrate: uint64(rate) * 8, + MaxBitrate: t.maxBitrate.Get(jiffies), + Loss: uint8(uint32(loss) * 100 / 256), + Rtt: rtt, + Jitter: j, + }) + } + cs.Down = append(cs.Down, conns) + } + sort.Slice(cs.Down, func(i, j int) bool { + return cs.Down[i].Id < cs.Down[j].Id + }) + + return &cs +} diff --git a/stats.go b/stats.go deleted file mode 100644 index aca779a..0000000 --- a/stats.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "sort" - "sync/atomic" - "time" - - "sfu/group" - "sfu/rtptime" -) - -type groupStats struct { - name string - clients []clientStats -} - -type clientStats struct { - id string - up, down []connStats -} - -type connStats struct { - id string - maxBitrate uint64 - tracks []trackStats -} - -type trackStats struct { - bitrate uint64 - maxBitrate uint64 - loss uint8 - rtt time.Duration - jitter time.Duration -} - -func getGroupStats() []groupStats { - names := group.GetNames() - - gs := make([]groupStats, 0, len(names)) - for _, name := range names { - g := group.Get(name) - if g == nil { - continue - } - clients := g.GetClients(nil) - stats := groupStats{ - name: name, - clients: make([]clientStats, 0, len(clients)), - } - for _, c := range clients { - c, ok := c.(*webClient) - if ok { - cs := getClientStats(c) - stats.clients = append(stats.clients, cs) - } - } - sort.Slice(stats.clients, func(i, j int) bool { - return stats.clients[i].id < stats.clients[j].id - }) - gs = append(gs, stats) - } - sort.Slice(gs, func(i, j int) bool { - return gs[i].name < gs[j].name - }) - - return gs -} - -func getClientStats(c *webClient) clientStats { - c.mu.Lock() - defer c.mu.Unlock() - - cs := clientStats{ - id: c.id, - } - - for _, up := range c.up { - conns := connStats{ - id: up.id, - } - tracks := up.getTracks() - for _, t := range tracks { - expected, lost, _, _ := t.cache.GetStats(false) - if expected == 0 { - expected = 1 - } - loss := uint8(lost * 100 / expected) - jitter := time.Duration(t.jitter.Jitter()) * - (time.Second / time.Duration(t.jitter.HZ())) - rate, _ := t.rate.Estimate() - conns.tracks = append(conns.tracks, trackStats{ - bitrate: uint64(rate) * 8, - loss: loss, - jitter: jitter, - }) - } - cs.up = append(cs.up, conns) - } - sort.Slice(cs.up, func(i, j int) bool { - return cs.up[i].id < cs.up[j].id - }) - - jiffies := rtptime.Jiffies() - for _, down := range c.down { - conns := connStats{ - id: down.id, - maxBitrate: down.GetMaxBitrate(jiffies), - } - for _, t := range down.tracks { - rate, _ := t.rate.Estimate() - rtt := rtptime.ToDuration(atomic.LoadUint64(&t.rtt), - rtptime.JiffiesPerSec) - loss, jitter := t.stats.Get(jiffies) - j := time.Duration(jitter) * time.Second / - time.Duration(t.track.Codec().ClockRate) - conns.tracks = append(conns.tracks, trackStats{ - bitrate: uint64(rate) * 8, - maxBitrate: t.maxBitrate.Get(jiffies), - loss: uint8(uint32(loss) * 100 / 256), - rtt: rtt, - jitter: j, - }) - } - cs.down = append(cs.down, conns) - } - sort.Slice(cs.down, func(i, j int) bool { - return cs.down[i].id < cs.down[j].id - }) - - return cs -} diff --git a/stats/stats.go b/stats/stats.go new file mode 100644 index 0000000..e65e74f --- /dev/null +++ b/stats/stats.go @@ -0,0 +1,73 @@ +package stats + +import ( + "sort" + "time" + + "sfu/group" +) + +type GroupStats struct { + Name string + Clients []*Client +} + +type Client struct { + Id string + Up, Down []Conn +} + +type Statable interface { + GetStats() *Client +} + +type Conn struct { + Id string + MaxBitrate uint64 + Tracks []Track +} + +type Track struct { + Bitrate uint64 + MaxBitrate uint64 + Loss uint8 + Rtt time.Duration + Jitter time.Duration +} + +func GetGroups() []GroupStats { + names := group.GetNames() + + gs := make([]GroupStats, 0, len(names)) + for _, name := range names { + g := group.Get(name) + if g == nil { + continue + } + clients := g.GetClients(nil) + stats := GroupStats{ + Name: name, + Clients: make([]*Client, 0, len(clients)), + } + for _, c := range clients { + s, ok := c.(Statable) + if ok { + cs := s.GetStats() + stats.Clients = append(stats.Clients, cs) + } else { + stats.Clients = append(stats.Clients, + &Client{Id: c.Id()}, + ) + } + } + sort.Slice(stats.Clients, func(i, j int) bool { + return stats.Clients[i].Id < stats.Clients[j].Id + }) + gs = append(gs, stats) + } + sort.Slice(gs, func(i, j int) bool { + return gs[i].Name < gs[j].Name + }) + + return gs +} diff --git a/webserver.go b/webserver.go index f811ef1..a1f98bc 100644 --- a/webserver.go +++ b/webserver.go @@ -21,6 +21,7 @@ import ( "sfu/disk" "sfu/group" + "sfu/stats" ) var server *http.Server @@ -50,7 +51,7 @@ func webserver() { IdleTimeout: 120 * time.Second, } server.RegisterOnShutdown(func() { - group.Range(func (g *group.Group) bool { + group.Range(func(g *group.Group) bool { go g.Shutdown("server is shutting down") return true }) @@ -225,7 +226,7 @@ func statsHandler(w http.ResponseWriter, r *http.Request) { return } - stats := getGroupStats() + ss := stats.GetGroups() fmt.Fprintf(w, "\n\n") fmt.Fprintf(w, "Stats\n") @@ -242,51 +243,51 @@ func statsHandler(w http.ResponseWriter, r *http.Request) { return err } - printTrack := func(w io.Writer, t trackStats) { + printTrack := func(w io.Writer, t stats.Track) { fmt.Fprintf(w, "") fmt.Fprintf(w, "") - printBitrate(w, t.bitrate, t.maxBitrate) + printBitrate(w, t.Bitrate, t.MaxBitrate) fmt.Fprintf(w, "") fmt.Fprintf(w, "%d%%", - t.loss, + t.Loss, ) fmt.Fprintf(w, "") - if t.rtt > 0 { - fmt.Fprintf(w, "%v", t.rtt) + if t.Rtt > 0 { + fmt.Fprintf(w, "%v", t.Rtt) } - if t.jitter > 0 { - fmt.Fprintf(w, "±%v", t.jitter) + if t.Jitter > 0 { + fmt.Fprintf(w, "±%v", t.Jitter) } fmt.Fprintf(w, "") fmt.Fprintf(w, "") } - for _, gs := range stats { - fmt.Fprintf(w, "

%v

\n", html.EscapeString(gs.name)) + for _, gs := range ss { + fmt.Fprintf(w, "

%v

\n", html.EscapeString(gs.Name)) fmt.Fprintf(w, "") - for _, cs := range gs.clients { - fmt.Fprintf(w, "\n", cs.id) - for _, up := range cs.up { + for _, cs := range gs.Clients { + fmt.Fprintf(w, "\n", cs.Id) + for _, up := range cs.Up { fmt.Fprintf(w, "", - up.id) - if up.maxBitrate > 0 { + up.Id) + if up.MaxBitrate > 0 { fmt.Fprintf(w, "", - up.maxBitrate) + up.MaxBitrate) } fmt.Fprintf(w, "\n") - for _, t := range up.tracks { + for _, t := range up.Tracks { printTrack(w, t) } } - for _, down := range cs.down { + for _, down := range cs.Down { fmt.Fprintf(w, "", - down.id) - if down.maxBitrate > 0 { + down.Id) + if down.MaxBitrate > 0 { fmt.Fprintf(w, "", - down.maxBitrate) + down.MaxBitrate) } fmt.Fprintf(w, "\n") - for _, t := range down.tracks { + for _, t := range down.Tracks { printTrack(w, t) } }
%v
%v
Up%v%v
Down %v%v