1
Fork 0

Use jiffies in rate estimator.

This avoids locking in estimate.
This commit is contained in:
Juliusz Chroboczek 2020-06-09 14:13:30 +02:00
parent f277b42e26
commit 2454e33df3
2 changed files with 31 additions and 34 deletions

View File

@ -1,49 +1,47 @@
package estimator package estimator
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"sfu/rtptime"
) )
type Estimator struct { type Estimator struct {
interval time.Duration interval uint64
bytes uint32 time uint64
packets uint32 bytes uint32
packets uint32
mu sync.Mutex
totalBytes uint32 totalBytes uint32
totalPackets uint32 totalPackets uint32
rate uint32 rate uint32
packetRate uint32 packetRate uint32
time time.Time
} }
func New(interval time.Duration) *Estimator { func New(interval time.Duration) *Estimator {
return &Estimator{ return &Estimator{
interval: interval, interval: rtptime.FromDuration(interval, rtptime.JiffiesPerSec),
time: time.Now(), time: rtptime.Now(rtptime.JiffiesPerSec),
} }
} }
func (e *Estimator) swap(now time.Time) { func (e *Estimator) swap(now uint64) {
interval := now.Sub(e.time) tm := atomic.LoadUint64(&e.time)
jiffies := now - tm
bytes := atomic.SwapUint32(&e.bytes, 0) bytes := atomic.SwapUint32(&e.bytes, 0)
packets := atomic.SwapUint32(&e.packets, 0) packets := atomic.SwapUint32(&e.packets, 0)
atomic.AddUint32(&e.totalBytes, bytes) atomic.AddUint32(&e.totalBytes, bytes)
atomic.AddUint32(&e.totalPackets, packets) atomic.AddUint32(&e.totalPackets, packets)
if interval < time.Millisecond { var rate, packetRate uint32
e.rate = 0 if jiffies >= rtptime.JiffiesPerSec/1000 {
e.packetRate = 0 rate = uint32(uint64(bytes) * rtptime.JiffiesPerSec / jiffies)
} else { packetRate =
e.rate = uint32(uint64(bytes*1000) / uint32(uint64(packets) * rtptime.JiffiesPerSec / jiffies)
uint64(interval/time.Millisecond))
e.packetRate = uint32(uint64(packets*1000) /
uint64(interval/time.Millisecond))
} }
e.time = now atomic.StoreUint32(&e.rate, rate)
atomic.StoreUint32(&e.packetRate, packetRate)
atomic.StoreUint64(&e.time, now)
} }
func (e *Estimator) Accumulate(count uint32) { func (e *Estimator) Accumulate(count uint32) {
@ -51,20 +49,17 @@ func (e *Estimator) Accumulate(count uint32) {
atomic.AddUint32(&e.packets, 1) atomic.AddUint32(&e.packets, 1)
} }
func (e *Estimator) estimate(now time.Time) (uint32, uint32) { func (e *Estimator) estimate(now uint64) (uint32, uint32) {
if now.Sub(e.time) > e.interval { tm := atomic.LoadUint64(&e.time)
if now < tm || now-tm > e.interval {
e.swap(now) e.swap(now)
} }
return e.rate, e.packetRate return atomic.LoadUint32(&e.rate), atomic.LoadUint32(&e.packetRate)
} }
func (e *Estimator) Estimate() (uint32, uint32) { func (e *Estimator) Estimate() (uint32, uint32) {
now := time.Now() return e.estimate(rtptime.Now(rtptime.JiffiesPerSec))
e.mu.Lock()
defer e.mu.Unlock()
return e.estimate(now)
} }
func (e *Estimator) Totals() (uint32, uint32) { func (e *Estimator) Totals() (uint32, uint32) {

View File

@ -2,18 +2,20 @@ package estimator
import ( import (
"testing" "testing"
"time"
"sfu/rtptime"
) )
func TestEstimator(t *testing.T) { func TestEstimator(t *testing.T) {
now := time.Now() now := rtptime.Jiffies()
e := New(time.Second) e := New(rtptime.JiffiesPerSec)
e.estimate(now) e.estimate(now)
e.Accumulate(42) e.Accumulate(42)
e.Accumulate(128) e.Accumulate(128)
e.estimate(now.Add(time.Second)) e.estimate(now + rtptime.JiffiesPerSec)
rate, packetRate := e.estimate(now.Add(time.Second + time.Millisecond)) rate, packetRate :=
e.estimate(now + (rtptime.JiffiesPerSec * 1001) / 1000)
if rate != 42+128 { if rate != 42+128 {
t.Errorf("Expected %v, got %v", 42+128, rate) t.Errorf("Expected %v, got %v", 42+128, rate)