1
Fork 0

Split out connection-related code into conn.go.

This commit is contained in:
Juliusz Chroboczek 2020-05-22 22:29:31 +02:00
parent 2a35f4e5d9
commit 6d6cb6caf2
2 changed files with 191 additions and 175 deletions

191
conn.go Normal file
View File

@ -0,0 +1,191 @@
// Copyright (c) 2020 by Juliusz Chroboczek.
// This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist.
package main
import (
"sync"
"sync/atomic"
"sfu/estimator"
"sfu/jitter"
"sfu/packetcache"
"github.com/pion/webrtc/v2"
)
type upTrack struct {
track *webrtc.Track
label string
rate *estimator.Estimator
cache *packetcache.Cache
jitter *jitter.Estimator
maxBitrate uint64
lastPLI uint64
lastSenderReport uint32
lastSenderReportTime uint32
localCh chan struct{} // signals that local has changed
writerDone chan struct{} // closed when the loop dies
mu sync.Mutex
local []*downTrack
}
func (up *upTrack) notifyLocal() {
var s struct{}
select {
case up.localCh <- s:
case <-up.writerDone:
}
}
func (up *upTrack) addLocal(local *downTrack) {
up.mu.Lock()
for _, t := range up.local {
if t == local {
up.mu.Unlock()
return
}
}
up.local = append(up.local, local)
up.mu.Unlock()
up.notifyLocal()
}
func (up *upTrack) delLocal(local *downTrack) bool {
up.mu.Lock()
for i, l := range up.local {
if l == local {
up.local = append(up.local[:i], up.local[i+1:]...)
up.mu.Unlock()
up.notifyLocal()
return true
}
}
up.mu.Unlock()
return false
}
func (up *upTrack) getLocal() []*downTrack {
up.mu.Lock()
defer up.mu.Unlock()
local := make([]*downTrack, len(up.local))
copy(local, up.local)
return local
}
func (up *upTrack) hasRtcpFb(tpe, parameter string) bool {
for _, fb := range up.track.Codec().RTCPFeedback {
if fb.Type == tpe && fb.Parameter == parameter {
return true
}
}
return false
}
type upConnection struct {
id string
label string
pc *webrtc.PeerConnection
tracks []*upTrack
labels map[string]string
iceCandidates []*webrtc.ICECandidateInit
}
func getUpMid(pc *webrtc.PeerConnection, track *webrtc.Track) string {
for _, t := range pc.GetTransceivers() {
if t.Receiver() != nil && t.Receiver().Track() == track {
return t.Mid()
}
}
return ""
}
func (up *upConnection) complete() bool {
for mid, _ := range up.labels {
found := false
for _, t := range up.tracks {
m := getUpMid(up.pc, t.track)
if m == mid {
found = true
break
}
}
if !found {
return false
}
}
return true
}
type bitrate struct {
bitrate uint64
microseconds uint64
}
const receiverReportTimeout = 8000000
func (br *bitrate) Set(bitrate uint64, now uint64) {
// this is racy -- a reader might read the
// data between the two writes. This shouldn't
// matter, we'll recover at the next sample.
atomic.StoreUint64(&br.bitrate, bitrate)
atomic.StoreUint64(&br.microseconds, now)
}
func (br *bitrate) Get(now uint64) uint64 {
ts := atomic.LoadUint64(&br.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return ^uint64(0)
}
return atomic.LoadUint64(&br.bitrate)
}
type receiverStats struct {
loss uint32
jitter uint32
microseconds uint64
}
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
atomic.StoreUint32(&s.loss, uint32(loss))
atomic.StoreUint32(&s.jitter, jitter)
atomic.StoreUint64(&s.microseconds, now)
}
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
ts := atomic.LoadUint64(&s.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return 0, 0
}
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
}
type downTrack struct {
track *webrtc.Track
remote *upTrack
maxLossBitrate *bitrate
maxREMBBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
}
func (down *downTrack) GetMaxBitrate(now uint64) uint64 {
br1 := down.maxLossBitrate.Get(now)
br2 := down.maxREMBBitrate.Get(now)
if br1 < br2 {
return br1
}
return br2
}
type downConnection struct {
id string
pc *webrtc.PeerConnection
remote *upConnection
tracks []*downTrack
iceCandidates []*webrtc.ICECandidateInit
}

175
group.go
View File

@ -16,186 +16,11 @@ import (
"sync/atomic"
"time"
"sfu/estimator"
"sfu/jitter"
"sfu/mono"
"sfu/packetcache"
"github.com/pion/webrtc/v2"
)
type upTrack struct {
track *webrtc.Track
label string
rate *estimator.Estimator
cache *packetcache.Cache
jitter *jitter.Estimator
maxBitrate uint64
lastPLI uint64
lastSenderReport uint32
lastSenderReportTime uint32
localCh chan struct{} // signals that local has changed
writerDone chan struct{} // closed when the loop dies
mu sync.Mutex
local []*downTrack
}
func (up *upTrack) notifyLocal() {
var s struct{}
select {
case up.localCh <- s:
case <-up.writerDone:
}
}
func (up *upTrack) addLocal(local *downTrack) {
up.mu.Lock()
for _, t := range up.local {
if t == local {
up.mu.Unlock()
return
}
}
up.local = append(up.local, local)
up.mu.Unlock()
up.notifyLocal()
}
func (up *upTrack) delLocal(local *downTrack) bool {
up.mu.Lock()
for i, l := range up.local {
if l == local {
up.local = append(up.local[:i], up.local[i+1:]...)
up.mu.Unlock()
up.notifyLocal()
return true
}
}
up.mu.Unlock()
return false
}
func (up *upTrack) getLocal() []*downTrack {
up.mu.Lock()
defer up.mu.Unlock()
local := make([]*downTrack, len(up.local))
copy(local, up.local)
return local
}
func (up *upTrack) hasRtcpFb(tpe, parameter string) bool {
for _, fb := range up.track.Codec().RTCPFeedback {
if fb.Type == tpe && fb.Parameter == parameter {
return true
}
}
return false
}
type upConnection struct {
id string
label string
pc *webrtc.PeerConnection
tracks []*upTrack
labels map[string]string
}
func getUpMid(pc *webrtc.PeerConnection, track *webrtc.Track) string {
for _, t := range pc.GetTransceivers() {
if t.Receiver() != nil && t.Receiver().Track() == track {
return t.Mid()
}
}
return ""
}
func (up *upConnection) complete() bool {
for mid, _ := range up.labels {
found := false
for _, t := range up.tracks {
m := getUpMid(up.pc, t.track)
if m == mid {
found = true
break
}
}
if !found {
return false
}
}
return true
}
type bitrate struct {
bitrate uint64
microseconds uint64
}
const receiverReportTimeout = 8000000
func (br *bitrate) Set(bitrate uint64, now uint64) {
// this is racy -- a reader might read the
// data between the two writes. This shouldn't
// matter, we'll recover at the next sample.
atomic.StoreUint64(&br.bitrate, bitrate)
atomic.StoreUint64(&br.microseconds, now)
}
func (br *bitrate) Get(now uint64) uint64 {
ts := atomic.LoadUint64(&br.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return ^uint64(0)
}
return atomic.LoadUint64(&br.bitrate)
}
type receiverStats struct {
loss uint32
jitter uint32
microseconds uint64
}
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
atomic.StoreUint32(&s.loss, uint32(loss))
atomic.StoreUint32(&s.jitter, jitter)
atomic.StoreUint64(&s.microseconds, now)
}
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
ts := atomic.LoadUint64(&s.microseconds)
if now < ts || now > ts+receiverReportTimeout {
return 0, 0
}
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
}
type downTrack struct {
track *webrtc.Track
remote *upTrack
maxLossBitrate *bitrate
maxREMBBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
}
func (down *downTrack) GetMaxBitrate(now uint64) uint64 {
br1 := down.maxLossBitrate.Get(now)
br2 := down.maxREMBBitrate.Get(now)
if br1 < br2 {
return br1
}
return br2
}
type downConnection struct {
id string
pc *webrtc.PeerConnection
remote *upConnection
tracks []*downTrack
}
type client struct {
group *group
id string