mirror of
https://github.com/jech/galene.git
synced 2024-11-22 16:45:58 +01:00
9e4aede72a
This could be solved by simply reordering the fields, but it is more robust to move the atomics into their own structure, and the extra indirection should not cost much.
1144 lines
25 KiB
Go
1144 lines
25 KiB
Go
package rtpconn
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"math/bits"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/webrtc/v3"
|
|
|
|
"github.com/jech/galene/conn"
|
|
"github.com/jech/galene/estimator"
|
|
"github.com/jech/galene/group"
|
|
"github.com/jech/galene/jitter"
|
|
"github.com/jech/galene/packetcache"
|
|
"github.com/jech/galene/rtptime"
|
|
)
|
|
|
|
type bitrate struct {
|
|
bitrate uint64
|
|
jiffies uint64
|
|
}
|
|
|
|
func (br *bitrate) Set(bitrate uint64, now uint64) {
|
|
atomic.StoreUint64(&br.bitrate, bitrate)
|
|
atomic.StoreUint64(&br.jiffies, now)
|
|
}
|
|
|
|
func (br *bitrate) Get(now uint64) uint64 {
|
|
ts := atomic.LoadUint64(&br.jiffies)
|
|
if now < ts || now-ts > receiverReportTimeout {
|
|
return ^uint64(0)
|
|
}
|
|
return atomic.LoadUint64(&br.bitrate)
|
|
}
|
|
|
|
type receiverStats struct {
|
|
loss uint32
|
|
jitter uint32
|
|
jiffies uint64
|
|
}
|
|
|
|
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
|
|
atomic.StoreUint32(&s.loss, uint32(loss))
|
|
atomic.StoreUint32(&s.jitter, jitter)
|
|
atomic.StoreUint64(&s.jiffies, now)
|
|
}
|
|
|
|
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
|
|
ts := atomic.LoadUint64(&s.jiffies)
|
|
if now < ts || now > ts+receiverReportTimeout {
|
|
return 0, 0
|
|
}
|
|
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
|
|
}
|
|
|
|
const receiverReportTimeout = 8 * rtptime.JiffiesPerSec
|
|
|
|
type iceConnection interface {
|
|
addICECandidate(candidate *webrtc.ICECandidateInit) error
|
|
flushICECandidates() error
|
|
}
|
|
|
|
type downTrackAtomics struct {
|
|
rtt uint64
|
|
sr uint64
|
|
srNTP uint64
|
|
remoteNTP uint64
|
|
remoteRTP uint32
|
|
}
|
|
|
|
type rtpDownTrack struct {
|
|
track *webrtc.TrackLocalStaticRTP
|
|
remote conn.UpTrack
|
|
ssrc webrtc.SSRC
|
|
maxBitrate *bitrate
|
|
rate *estimator.Estimator
|
|
stats *receiverStats
|
|
atomics *downTrackAtomics
|
|
cname atomic.Value
|
|
}
|
|
|
|
func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error {
|
|
return down.track.WriteRTP(packet)
|
|
}
|
|
|
|
func (down *rtpDownTrack) Accumulate(bytes uint32) {
|
|
down.rate.Accumulate(bytes)
|
|
}
|
|
|
|
func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
|
|
atomic.StoreUint64(&down.atomics.remoteNTP, ntp)
|
|
atomic.StoreUint32(&down.atomics.remoteRTP, rtp)
|
|
}
|
|
|
|
func (down *rtpDownTrack) getTimeOffset() (uint64, uint32) {
|
|
ntp := atomic.LoadUint64(&down.atomics.remoteNTP)
|
|
rtp := atomic.LoadUint32(&down.atomics.remoteRTP)
|
|
return ntp, rtp
|
|
}
|
|
|
|
func (down *rtpDownTrack) getRTT() uint64 {
|
|
return atomic.LoadUint64(&down.atomics.rtt)
|
|
}
|
|
|
|
func (down *rtpDownTrack) setRTT(rtt uint64) {
|
|
atomic.StoreUint64(&down.atomics.rtt, rtt)
|
|
}
|
|
|
|
func (down *rtpDownTrack) getSRTime() (uint64, uint64) {
|
|
tm := atomic.LoadUint64(&down.atomics.sr)
|
|
ntp := atomic.LoadUint64(&down.atomics.srNTP)
|
|
return tm, ntp
|
|
}
|
|
|
|
func (down *rtpDownTrack) setSRTime(tm uint64, ntp uint64) {
|
|
atomic.StoreUint64(&down.atomics.sr, tm)
|
|
atomic.StoreUint64(&down.atomics.srNTP, ntp)
|
|
}
|
|
|
|
func (down *rtpDownTrack) SetCname(cname string) {
|
|
down.cname.Store(cname)
|
|
}
|
|
|
|
type rtpDownConnection struct {
|
|
id string
|
|
pc *webrtc.PeerConnection
|
|
remote conn.Up
|
|
tracks []*rtpDownTrack
|
|
maxREMBBitrate *bitrate
|
|
iceCandidates []*webrtc.ICECandidateInit
|
|
}
|
|
|
|
func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
|
|
api := group.APIFromCodecs(remote.Codecs())
|
|
pc, err := api.NewPeerConnection(group.IceConfiguration())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
log.Printf("Got track on downstream connection")
|
|
})
|
|
|
|
conn := &rtpDownConnection{
|
|
id: id,
|
|
pc: pc,
|
|
remote: remote,
|
|
maxREMBBitrate: new(bitrate),
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 {
|
|
rate := down.maxREMBBitrate.Get(now)
|
|
var trackRate uint64
|
|
for _, t := range down.tracks {
|
|
r := t.maxBitrate.Get(now)
|
|
if r == ^uint64(0) {
|
|
if t.track.Kind() == webrtc.RTPCodecTypeAudio {
|
|
r = 128 * 1024
|
|
} else {
|
|
r = 512 * 1024
|
|
}
|
|
}
|
|
trackRate += r
|
|
}
|
|
if trackRate < rate {
|
|
return trackRate
|
|
}
|
|
return rate
|
|
}
|
|
|
|
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
|
|
if down.pc.RemoteDescription() != nil {
|
|
return down.pc.AddICECandidate(*candidate)
|
|
}
|
|
down.iceCandidates = append(down.iceCandidates, candidate)
|
|
return nil
|
|
}
|
|
|
|
func flushICECandidates(pc *webrtc.PeerConnection, candidates []*webrtc.ICECandidateInit) error {
|
|
if pc.RemoteDescription() == nil {
|
|
return errors.New("flushICECandidates called in bad state")
|
|
}
|
|
|
|
var err error
|
|
for _, candidate := range candidates {
|
|
err2 := pc.AddICECandidate(*candidate)
|
|
if err == nil {
|
|
err = err2
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (down *rtpDownConnection) flushICECandidates() error {
|
|
err := flushICECandidates(down.pc, down.iceCandidates)
|
|
down.iceCandidates = nil
|
|
return err
|
|
}
|
|
|
|
type upTrackAtomics struct {
|
|
lastPLI uint64
|
|
lastFIR uint64
|
|
firSeqno uint32
|
|
}
|
|
|
|
type rtpUpTrack struct {
|
|
track *webrtc.TrackRemote
|
|
label string
|
|
rate *estimator.Estimator
|
|
cache *packetcache.Cache
|
|
jitter *jitter.Estimator
|
|
atomics *upTrackAtomics
|
|
cname atomic.Value
|
|
|
|
localCh chan localTrackAction
|
|
readerDone chan struct{}
|
|
|
|
mu sync.Mutex
|
|
srTime uint64
|
|
srNTPTime uint64
|
|
srRTPTime uint32
|
|
local []conn.DownTrack
|
|
bufferedNACKs []uint16
|
|
}
|
|
|
|
type localTrackAction struct {
|
|
add bool
|
|
track conn.DownTrack
|
|
}
|
|
|
|
func (up *rtpUpTrack) notifyLocal(add bool, track conn.DownTrack) {
|
|
select {
|
|
case up.localCh <- localTrackAction{add, track}:
|
|
case <-up.readerDone:
|
|
}
|
|
}
|
|
|
|
func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
|
|
for _, t := range up.local {
|
|
if t == local {
|
|
return nil
|
|
}
|
|
}
|
|
up.local = append(up.local, local)
|
|
|
|
// do this asynchronously, to avoid deadlocks when multiple
|
|
// clients call this simultaneously.
|
|
go up.notifyLocal(true, local)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
for i, l := range up.local {
|
|
if l == local {
|
|
up.local = append(up.local[:i], up.local[i+1:]...)
|
|
// do this asynchronously, to avoid deadlocking when
|
|
// multiple clients call this simultaneously.
|
|
go up.notifyLocal(false, l)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (up *rtpUpTrack) getLocal() []conn.DownTrack {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
local := make([]conn.DownTrack, len(up.local))
|
|
copy(local, up.local)
|
|
return local
|
|
}
|
|
|
|
func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 {
|
|
return up.cache.Get(seqno, result)
|
|
}
|
|
|
|
func (up *rtpUpTrack) Label() string {
|
|
return up.label
|
|
}
|
|
|
|
func (up *rtpUpTrack) Codec() webrtc.RTPCodecCapability {
|
|
return up.track.Codec().RTPCodecCapability
|
|
}
|
|
|
|
func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool {
|
|
for _, fb := range up.track.Codec().RTCPFeedback {
|
|
if fb.Type == tpe && fb.Parameter == parameter {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type rtpUpConnection struct {
|
|
id string
|
|
label string
|
|
pc *webrtc.PeerConnection
|
|
labels map[string]string
|
|
iceCandidates []*webrtc.ICECandidateInit
|
|
|
|
mu sync.Mutex
|
|
pushed bool
|
|
tracks []*rtpUpTrack
|
|
local []conn.Down
|
|
}
|
|
|
|
func (up *rtpUpConnection) getTracks() []*rtpUpTrack {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
tracks := make([]*rtpUpTrack, len(up.tracks))
|
|
copy(tracks, up.tracks)
|
|
return tracks
|
|
}
|
|
|
|
func (up *rtpUpConnection) Id() string {
|
|
return up.id
|
|
}
|
|
|
|
func (up *rtpUpConnection) Label() string {
|
|
return up.label
|
|
}
|
|
|
|
func (up *rtpUpConnection) Codecs() []webrtc.RTPCodecCapability {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
|
|
codecs := make([]webrtc.RTPCodecCapability, len(up.tracks))
|
|
for i := range up.tracks {
|
|
codecs[i] = up.tracks[i].Codec()
|
|
}
|
|
return codecs
|
|
}
|
|
|
|
func (up *rtpUpConnection) AddLocal(local conn.Down) error {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
for _, t := range up.local {
|
|
if t == local {
|
|
return nil
|
|
}
|
|
}
|
|
up.local = append(up.local, local)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpConnection) DelLocal(local conn.Down) bool {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
for i, l := range up.local {
|
|
if l == local {
|
|
up.local = append(up.local[:i], up.local[i+1:]...)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (up *rtpUpConnection) getLocal() []conn.Down {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
local := make([]conn.Down, len(up.local))
|
|
copy(local, up.local)
|
|
return local
|
|
}
|
|
|
|
func (up *rtpUpConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
|
|
if up.pc.RemoteDescription() != nil {
|
|
return up.pc.AddICECandidate(*candidate)
|
|
}
|
|
up.iceCandidates = append(up.iceCandidates, candidate)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpConnection) flushICECandidates() error {
|
|
err := flushICECandidates(up.pc, up.iceCandidates)
|
|
up.iceCandidates = nil
|
|
return err
|
|
}
|
|
|
|
func getTrackMid(pc *webrtc.PeerConnection, track *webrtc.TrackRemote) string {
|
|
for _, t := range pc.GetTransceivers() {
|
|
if t.Receiver() != nil && t.Receiver().Track() == track {
|
|
return t.Mid()
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// called locked
|
|
func (up *rtpUpConnection) complete() bool {
|
|
for mid := range up.labels {
|
|
found := false
|
|
for _, t := range up.tracks {
|
|
m := getTrackMid(up.pc, t.track)
|
|
if m == mid {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// pushConnNow pushes a connection to all of the clients in a group
|
|
func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) {
|
|
up.mu.Lock()
|
|
up.pushed = true
|
|
tracks := make([]conn.UpTrack, len(up.tracks))
|
|
for i, t := range up.tracks {
|
|
tracks[i] = t
|
|
}
|
|
up.mu.Unlock()
|
|
|
|
for _, c := range cs {
|
|
c.PushConn(g, up.id, up, tracks, up.label)
|
|
}
|
|
}
|
|
|
|
// pushConn schedules a call to pushConnNow
|
|
func pushConn(up *rtpUpConnection, g *group.Group, cs []group.Client) {
|
|
if up.complete() {
|
|
pushConnNow(up, g, cs)
|
|
return
|
|
}
|
|
|
|
up.mu.Lock()
|
|
up.pushed = false
|
|
up.mu.Unlock()
|
|
|
|
go func(g *group.Group, cs []group.Client) {
|
|
time.Sleep(300 * time.Millisecond)
|
|
up.mu.Lock()
|
|
pushed := up.pushed
|
|
up.pushed = true
|
|
up.mu.Unlock()
|
|
if !pushed {
|
|
pushConnNow(up, g, cs)
|
|
}
|
|
}(g, cs)
|
|
}
|
|
|
|
func newUpConn(c group.Client, id string, labels map[string]string) (*rtpUpConnection, error) {
|
|
pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
|
|
webrtc.RtpTransceiverInit{
|
|
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
|
},
|
|
)
|
|
if err != nil {
|
|
pc.Close()
|
|
return nil, err
|
|
}
|
|
|
|
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
|
|
webrtc.RtpTransceiverInit{
|
|
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
|
},
|
|
)
|
|
if err != nil {
|
|
pc.Close()
|
|
return nil, err
|
|
}
|
|
|
|
up := &rtpUpConnection{id: id, pc: pc, labels: labels}
|
|
|
|
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
up.mu.Lock()
|
|
|
|
mid := getTrackMid(pc, remote)
|
|
if mid == "" {
|
|
log.Printf("Couldn't get track's mid")
|
|
return
|
|
}
|
|
|
|
label, ok := up.labels[mid]
|
|
if !ok {
|
|
log.Printf("Couldn't get track's label")
|
|
isvideo := remote.Kind() == webrtc.RTPCodecTypeVideo
|
|
if isvideo {
|
|
label = "video"
|
|
} else {
|
|
label = "audio"
|
|
}
|
|
}
|
|
|
|
track := &rtpUpTrack{
|
|
track: remote,
|
|
label: label,
|
|
cache: packetcache.New(minPacketCache(remote)),
|
|
rate: estimator.New(time.Second),
|
|
jitter: jitter.New(remote.Codec().ClockRate),
|
|
atomics: &upTrackAtomics{},
|
|
localCh: make(chan localTrackAction, 2),
|
|
readerDone: make(chan struct{}),
|
|
}
|
|
|
|
up.tracks = append(up.tracks, track)
|
|
|
|
go readLoop(up, track)
|
|
|
|
go rtcpUpListener(up, track, receiver)
|
|
|
|
up.mu.Unlock()
|
|
|
|
pushConn(up, c.Group(), c.Group().GetClients(c))
|
|
})
|
|
|
|
pushConn(up, c.Group(), c.Group().GetClients(c))
|
|
go rtcpUpSender(up)
|
|
|
|
return up, nil
|
|
}
|
|
|
|
var ErrUnsupportedFeedback = errors.New("unsupported feedback type")
|
|
var ErrRateLimited = errors.New("rate limited")
|
|
|
|
func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error {
|
|
if !track.hasRtcpFb("nack", "pli") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
last := atomic.LoadUint64(&track.atomics.lastPLI)
|
|
now := rtptime.Jiffies()
|
|
if now >= last && now-last < rtptime.JiffiesPerSec/2 {
|
|
return ErrRateLimited
|
|
}
|
|
atomic.StoreUint64(&track.atomics.lastPLI, now)
|
|
return sendPLI(up.pc, track.track.SSRC())
|
|
}
|
|
|
|
func sendPLI(pc *webrtc.PeerConnection, ssrc webrtc.SSRC) error {
|
|
return pc.WriteRTCP([]rtcp.Packet{
|
|
&rtcp.PictureLossIndication{MediaSSRC: uint32(ssrc)},
|
|
})
|
|
}
|
|
|
|
func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error {
|
|
// we need to reliably increment the seqno, even if we are going
|
|
// to drop the packet due to rate limiting.
|
|
var seqno uint8
|
|
if increment {
|
|
seqno = uint8(atomic.AddUint32(&track.atomics.firSeqno, 1) & 0xFF)
|
|
} else {
|
|
seqno = uint8(atomic.LoadUint32(&track.atomics.firSeqno) & 0xFF)
|
|
}
|
|
|
|
if !track.hasRtcpFb("ccm", "fir") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
last := atomic.LoadUint64(&track.atomics.lastFIR)
|
|
now := rtptime.Jiffies()
|
|
if now >= last && now-last < rtptime.JiffiesPerSec/2 {
|
|
return ErrRateLimited
|
|
}
|
|
atomic.StoreUint64(&track.atomics.lastFIR, now)
|
|
return sendFIR(up.pc, track.track.SSRC(), seqno)
|
|
}
|
|
|
|
func sendFIR(pc *webrtc.PeerConnection, ssrc webrtc.SSRC, seqno uint8) error {
|
|
return pc.WriteRTCP([]rtcp.Packet{
|
|
&rtcp.FullIntraRequest{
|
|
FIR: []rtcp.FIREntry{
|
|
{
|
|
SSRC: uint32(ssrc),
|
|
SequenceNumber: seqno,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func (up *rtpUpConnection) sendNACK(track *rtpUpTrack, first uint16, bitmap uint16) error {
|
|
if !track.hasRtcpFb("nack", "") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
|
|
err := sendNACKs(up.pc, track.track.SSRC(),
|
|
[]rtcp.NackPair{{first, rtcp.PacketBitmap(bitmap)}},
|
|
)
|
|
if err == nil {
|
|
track.cache.Expect(1 + bits.OnesCount16(bitmap))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (up *rtpUpConnection) sendNACKs(track *rtpUpTrack, seqnos []uint16) error {
|
|
count := len(seqnos)
|
|
if count == 0 {
|
|
return nil
|
|
}
|
|
|
|
if !track.hasRtcpFb("nack", "") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
|
|
var nacks []rtcp.NackPair
|
|
|
|
for len(seqnos) > 0 {
|
|
if len(nacks) >= 240 {
|
|
log.Printf("NACK: packet overflow")
|
|
break
|
|
}
|
|
var f, b uint16
|
|
f, b, seqnos = packetcache.ToBitmap(seqnos)
|
|
nacks = append(nacks, rtcp.NackPair{f, rtcp.PacketBitmap(b)})
|
|
}
|
|
err := sendNACKs(up.pc, track.track.SSRC(), nacks)
|
|
if err == nil {
|
|
track.cache.Expect(count)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func sendNACKs(pc *webrtc.PeerConnection, ssrc webrtc.SSRC, nacks []rtcp.NackPair) error {
|
|
packet := rtcp.Packet(
|
|
&rtcp.TransportLayerNack{
|
|
MediaSSRC: uint32(ssrc),
|
|
Nacks: nacks,
|
|
},
|
|
)
|
|
return pc.WriteRTCP([]rtcp.Packet{packet})
|
|
}
|
|
|
|
func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLayerNack) {
|
|
var unhandled []uint16
|
|
var packet rtp.Packet
|
|
buf := make([]byte, packetcache.BufSize)
|
|
for _, nack := range p.Nacks {
|
|
nack.Range(func(seqno uint16) bool {
|
|
l := track.remote.GetRTP(seqno, buf)
|
|
if l == 0 {
|
|
unhandled = append(unhandled, seqno)
|
|
return true
|
|
}
|
|
err := packet.Unmarshal(buf[:l])
|
|
if err != nil {
|
|
return true
|
|
}
|
|
err = track.track.WriteRTP(&packet)
|
|
if err != nil {
|
|
log.Printf("WriteRTP: %v", err)
|
|
return false
|
|
}
|
|
track.rate.Accumulate(uint32(l))
|
|
return true
|
|
})
|
|
}
|
|
if len(unhandled) == 0 {
|
|
return
|
|
}
|
|
|
|
track.remote.Nack(conn.remote, unhandled)
|
|
}
|
|
|
|
func (track *rtpUpTrack) Nack(conn conn.Up, nacks []uint16) error {
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
doit := len(track.bufferedNACKs) == 0
|
|
|
|
outer:
|
|
for _, nack := range nacks {
|
|
for _, seqno := range track.bufferedNACKs {
|
|
if seqno == nack {
|
|
continue outer
|
|
}
|
|
}
|
|
track.bufferedNACKs = append(track.bufferedNACKs, nack)
|
|
}
|
|
|
|
if doit {
|
|
up, ok := conn.(*rtpUpConnection)
|
|
if !ok {
|
|
log.Printf("Nack: unexpected type %T", conn)
|
|
return errors.New("unexpected connection type")
|
|
}
|
|
go nackWriter(up, track)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) {
|
|
buf := make([]byte, 1500)
|
|
|
|
for {
|
|
firstSR := false
|
|
n, _, err := r.Read(buf)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
log.Printf("Read RTCP: %v", err)
|
|
}
|
|
return
|
|
}
|
|
ps, err := rtcp.Unmarshal(buf[:n])
|
|
if err != nil {
|
|
log.Printf("Unmarshal RTCP: %v", err)
|
|
continue
|
|
}
|
|
|
|
jiffies := rtptime.Jiffies()
|
|
|
|
for _, p := range ps {
|
|
local := track.getLocal()
|
|
switch p := p.(type) {
|
|
case *rtcp.SenderReport:
|
|
track.mu.Lock()
|
|
if track.srTime == 0 {
|
|
firstSR = true
|
|
}
|
|
track.srTime = jiffies
|
|
track.srNTPTime = p.NTPTime
|
|
track.srRTPTime = p.RTPTime
|
|
track.mu.Unlock()
|
|
for _, l := range local {
|
|
l.SetTimeOffset(p.NTPTime, p.RTPTime)
|
|
}
|
|
case *rtcp.SourceDescription:
|
|
for _, c := range p.Chunks {
|
|
if c.Source != uint32(track.track.SSRC()) {
|
|
continue
|
|
}
|
|
for _, i := range c.Items {
|
|
if i.Type != rtcp.SDESCNAME {
|
|
continue
|
|
}
|
|
track.cname.Store(i.Text)
|
|
for _, l := range local {
|
|
l.SetCname(i.Text)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if firstSR {
|
|
// this is the first SR we got for at least one track,
|
|
// quickly propagate the time offsets downstream
|
|
local := conn.getLocal()
|
|
for _, l := range local {
|
|
l, ok := l.(*rtpDownConnection)
|
|
if ok {
|
|
err := sendSR(l)
|
|
if err != nil {
|
|
log.Printf("sendSR: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendUpRTCP(conn *rtpUpConnection) error {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if len(conn.tracks) == 0 {
|
|
state := conn.pc.ConnectionState()
|
|
if state == webrtc.PeerConnectionStateClosed {
|
|
return io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
now := rtptime.Jiffies()
|
|
|
|
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
|
|
for _, t := range conn.tracks {
|
|
updateUpTrack(t)
|
|
expected, lost, totalLost, eseqno := t.cache.GetStats(true)
|
|
if expected == 0 {
|
|
expected = 1
|
|
}
|
|
if lost >= expected {
|
|
lost = expected - 1
|
|
}
|
|
|
|
t.mu.Lock()
|
|
srTime := t.srTime
|
|
srNTPTime := t.srNTPTime
|
|
t.mu.Unlock()
|
|
|
|
var delay uint64
|
|
if srTime != 0 {
|
|
delay = (now - srTime) /
|
|
(rtptime.JiffiesPerSec / 0x10000)
|
|
}
|
|
|
|
reports = append(reports, rtcp.ReceptionReport{
|
|
SSRC: uint32(t.track.SSRC()),
|
|
FractionLost: uint8((lost * 256) / expected),
|
|
TotalLost: totalLost,
|
|
LastSequenceNumber: eseqno,
|
|
Jitter: t.jitter.Jitter(),
|
|
LastSenderReport: uint32(srNTPTime >> 16),
|
|
Delay: uint32(delay),
|
|
})
|
|
}
|
|
|
|
packets := []rtcp.Packet{
|
|
&rtcp.ReceiverReport{
|
|
Reports: reports,
|
|
},
|
|
}
|
|
|
|
rate := ^uint64(0)
|
|
for _, l := range conn.local {
|
|
r := l.GetMaxBitrate(now)
|
|
if r < rate {
|
|
rate = r
|
|
}
|
|
}
|
|
if rate < group.MinBitrate {
|
|
rate = group.MinBitrate
|
|
}
|
|
|
|
var ssrcs []uint32
|
|
for _, t := range conn.tracks {
|
|
if t.hasRtcpFb("goog-remb", "") {
|
|
continue
|
|
}
|
|
ssrcs = append(ssrcs, uint32(t.track.SSRC()))
|
|
}
|
|
|
|
if len(ssrcs) > 0 {
|
|
packets = append(packets,
|
|
&rtcp.ReceiverEstimatedMaximumBitrate{
|
|
Bitrate: rate,
|
|
SSRCs: ssrcs,
|
|
},
|
|
)
|
|
}
|
|
return conn.pc.WriteRTCP(packets)
|
|
}
|
|
|
|
func rtcpUpSender(conn *rtpUpConnection) {
|
|
for {
|
|
time.Sleep(time.Second)
|
|
err := sendUpRTCP(conn)
|
|
if err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
return
|
|
}
|
|
log.Printf("sendRR: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendSR(conn *rtpDownConnection) error {
|
|
// since this is only called after all tracks have been created,
|
|
// there is no need for locking.
|
|
packets := make([]rtcp.Packet, 0, len(conn.tracks))
|
|
|
|
now := time.Now()
|
|
nowNTP := rtptime.TimeToNTP(now)
|
|
jiffies := rtptime.TimeToJiffies(now)
|
|
|
|
for _, t := range conn.tracks {
|
|
clockrate := t.track.Codec().ClockRate
|
|
|
|
var nowRTP uint32
|
|
|
|
remoteNTP, remoteRTP := t.getTimeOffset()
|
|
if remoteNTP != 0 {
|
|
srTime := rtptime.NTPToTime(remoteNTP)
|
|
d := now.Sub(srTime)
|
|
if d > 0 && d < time.Hour {
|
|
delay := rtptime.FromDuration(
|
|
d, clockrate,
|
|
)
|
|
nowRTP = remoteRTP + uint32(delay)
|
|
}
|
|
|
|
p, b := t.rate.Totals()
|
|
packets = append(packets,
|
|
&rtcp.SenderReport{
|
|
SSRC: uint32(t.ssrc),
|
|
NTPTime: nowNTP,
|
|
RTPTime: nowRTP,
|
|
PacketCount: p,
|
|
OctetCount: b,
|
|
})
|
|
t.setSRTime(jiffies, nowNTP)
|
|
}
|
|
|
|
cname, ok := t.cname.Load().(string)
|
|
if ok && cname != "" {
|
|
item := rtcp.SourceDescriptionItem{
|
|
Type: rtcp.SDESCNAME,
|
|
Text: cname,
|
|
}
|
|
packets = append(packets,
|
|
&rtcp.SourceDescription{
|
|
Chunks: []rtcp.SourceDescriptionChunk{
|
|
{
|
|
Source: uint32(t.ssrc),
|
|
Items: []rtcp.SourceDescriptionItem{item},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
if len(packets) == 0 {
|
|
state := conn.pc.ConnectionState()
|
|
if state == webrtc.PeerConnectionStateClosed {
|
|
return io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return conn.pc.WriteRTCP(packets)
|
|
}
|
|
|
|
func rtcpDownSender(conn *rtpDownConnection) {
|
|
for {
|
|
time.Sleep(time.Second)
|
|
err := sendSR(conn)
|
|
if err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
return
|
|
}
|
|
log.Printf("sendSR: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
minLossRate = 9600
|
|
initLossRate = 512 * 1000
|
|
maxLossRate = 1 << 30
|
|
)
|
|
|
|
func (track *rtpDownTrack) updateRate(loss uint8, now uint64) {
|
|
rate := track.maxBitrate.Get(now)
|
|
if rate < minLossRate || rate > maxLossRate {
|
|
// no recent feedback, reset
|
|
rate = initLossRate
|
|
}
|
|
if loss < 5 {
|
|
// if our actual rate is low, then we're not probing the
|
|
// bottleneck
|
|
r, _ := track.rate.Estimate()
|
|
actual := 8 * uint64(r)
|
|
if actual >= (rate*7)/8 {
|
|
// loss < 0.02, multiply by 1.05
|
|
rate = rate * 269 / 256
|
|
if rate > maxLossRate {
|
|
rate = maxLossRate
|
|
}
|
|
}
|
|
} else if loss > 25 {
|
|
// loss > 0.1, multiply by (1 - loss/2)
|
|
rate = rate * (512 - uint64(loss)) / 512
|
|
if rate < minLossRate {
|
|
rate = minLossRate
|
|
}
|
|
}
|
|
|
|
// update unconditionally, to set the timestamp
|
|
track.maxBitrate.Set(rate, now)
|
|
}
|
|
|
|
func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RTPSender) {
|
|
var gotFir bool
|
|
lastFirSeqno := uint8(0)
|
|
|
|
buf := make([]byte, 1500)
|
|
|
|
for {
|
|
n, _, err := s.Read(buf)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
log.Printf("Read RTCP: %v", err)
|
|
}
|
|
return
|
|
}
|
|
ps, err := rtcp.Unmarshal(buf[:n])
|
|
if err != nil {
|
|
log.Printf("Unmarshal RTCP: %v", err)
|
|
continue
|
|
}
|
|
|
|
jiffies := rtptime.Jiffies()
|
|
|
|
for _, p := range ps {
|
|
switch p := p.(type) {
|
|
case *rtcp.PictureLossIndication:
|
|
remote, ok := conn.remote.(*rtpUpConnection)
|
|
if !ok {
|
|
continue
|
|
}
|
|
rt, ok := track.remote.(*rtpUpTrack)
|
|
if !ok {
|
|
continue
|
|
}
|
|
err := remote.sendPLI(rt)
|
|
if err != nil && err != ErrRateLimited {
|
|
log.Printf("sendPLI: %v", err)
|
|
}
|
|
case *rtcp.FullIntraRequest:
|
|
found := false
|
|
var seqno uint8
|
|
for _, entry := range p.FIR {
|
|
if entry.SSRC == uint32(track.ssrc) {
|
|
found = true
|
|
seqno = entry.SequenceNumber
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
log.Printf("Misdirected FIR")
|
|
continue
|
|
}
|
|
|
|
increment := true
|
|
if gotFir {
|
|
increment = seqno != lastFirSeqno
|
|
}
|
|
gotFir = true
|
|
lastFirSeqno = seqno
|
|
|
|
remote, ok := conn.remote.(*rtpUpConnection)
|
|
if !ok {
|
|
continue
|
|
}
|
|
rt, ok := track.remote.(*rtpUpTrack)
|
|
if !ok {
|
|
continue
|
|
}
|
|
err := remote.sendFIR(rt, increment)
|
|
if err == ErrUnsupportedFeedback {
|
|
err := remote.sendPLI(rt)
|
|
if err != nil && err != ErrRateLimited {
|
|
log.Printf("sendPLI: %v", err)
|
|
}
|
|
} else if err != nil && err != ErrRateLimited {
|
|
log.Printf("sendFIR: %v", err)
|
|
}
|
|
case *rtcp.ReceiverEstimatedMaximumBitrate:
|
|
conn.maxREMBBitrate.Set(p.Bitrate, jiffies)
|
|
case *rtcp.ReceiverReport:
|
|
for _, r := range p.Reports {
|
|
if r.SSRC == uint32(track.ssrc) {
|
|
handleReport(track, r, jiffies)
|
|
}
|
|
}
|
|
case *rtcp.SenderReport:
|
|
for _, r := range p.Reports {
|
|
if r.SSRC == uint32(track.ssrc) {
|
|
handleReport(track, r, jiffies)
|
|
}
|
|
}
|
|
case *rtcp.TransportLayerNack:
|
|
gotNACK(conn, track, p)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint64) {
|
|
track.stats.Set(report.FractionLost, report.Jitter, jiffies)
|
|
track.updateRate(report.FractionLost, jiffies)
|
|
|
|
if report.LastSenderReport != 0 {
|
|
jiffies := rtptime.Jiffies()
|
|
srTime, srNTPTime := track.getSRTime()
|
|
if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec {
|
|
return
|
|
}
|
|
if report.LastSenderReport == uint32(srNTPTime>>16) {
|
|
delay := uint64(report.Delay) *
|
|
(rtptime.JiffiesPerSec / 0x10000)
|
|
if delay > jiffies-srTime {
|
|
return
|
|
}
|
|
rtt := (jiffies - srTime) - delay
|
|
oldrtt := track.getRTT()
|
|
newrtt := rtt
|
|
if oldrtt > 0 {
|
|
newrtt = (3*oldrtt + rtt) / 4
|
|
}
|
|
track.setRTT(newrtt)
|
|
}
|
|
}
|
|
}
|
|
|
|
func minPacketCache(track *webrtc.TrackRemote) int {
|
|
if track.Kind() == webrtc.RTPCodecTypeVideo {
|
|
return 128
|
|
}
|
|
return 24
|
|
}
|
|
|
|
func updateUpTrack(track *rtpUpTrack) {
|
|
now := rtptime.Jiffies()
|
|
|
|
clockrate := track.track.Codec().ClockRate
|
|
local := track.getLocal()
|
|
var maxrto uint64
|
|
for _, l := range local {
|
|
ll, ok := l.(*rtpDownTrack)
|
|
if ok {
|
|
_, j := ll.stats.Get(now)
|
|
jitter := uint64(j) *
|
|
(rtptime.JiffiesPerSec / uint64(clockrate))
|
|
rtt := ll.getRTT()
|
|
rto := rtt + 4*jitter
|
|
if rto > maxrto {
|
|
maxrto = rto
|
|
}
|
|
}
|
|
}
|
|
_, r := track.rate.Estimate()
|
|
packets := int((uint64(r) * maxrto * 4) / rtptime.JiffiesPerSec)
|
|
min := minPacketCache(track.track)
|
|
if packets < min {
|
|
packets = min
|
|
}
|
|
if packets > 1024 {
|
|
packets = 1024
|
|
}
|
|
track.cache.ResizeCond(packets)
|
|
}
|