mirror of
https://github.com/jech/galene.git
synced 2024-11-22 16:45:58 +01:00
9b8d868647
We could call addLocal after the remote was closed, which would cause the local connection to remain forever. Thanks to Ludovic Rateau.
1220 lines
26 KiB
Go
1220 lines
26 KiB
Go
package rtpconn
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"math/bits"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/webrtc/v3"
|
|
|
|
"github.com/jech/galene/conn"
|
|
"github.com/jech/galene/estimator"
|
|
"github.com/jech/galene/group"
|
|
"github.com/jech/galene/ice"
|
|
"github.com/jech/galene/jitter"
|
|
"github.com/jech/galene/packetcache"
|
|
"github.com/jech/galene/packetmap"
|
|
"github.com/jech/galene/rtptime"
|
|
)
|
|
|
|
type bitrate struct {
|
|
bitrate uint64
|
|
jiffies uint64
|
|
}
|
|
|
|
func (br *bitrate) Set(bitrate uint64, now uint64) {
|
|
atomic.StoreUint64(&br.bitrate, bitrate)
|
|
atomic.StoreUint64(&br.jiffies, now)
|
|
}
|
|
|
|
func (br *bitrate) Get(now uint64) uint64 {
|
|
ts := atomic.LoadUint64(&br.jiffies)
|
|
if now < ts || now-ts > receiverReportTimeout {
|
|
return ^uint64(0)
|
|
}
|
|
return atomic.LoadUint64(&br.bitrate)
|
|
}
|
|
|
|
type receiverStats struct {
|
|
loss uint32
|
|
jitter uint32
|
|
jiffies uint64
|
|
}
|
|
|
|
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
|
|
atomic.StoreUint32(&s.loss, uint32(loss))
|
|
atomic.StoreUint32(&s.jitter, jitter)
|
|
atomic.StoreUint64(&s.jiffies, now)
|
|
}
|
|
|
|
const receiverReportTimeout = 30 * rtptime.JiffiesPerSec
|
|
|
|
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
|
|
ts := atomic.LoadUint64(&s.jiffies)
|
|
if now < ts || now > ts+receiverReportTimeout {
|
|
return 0, 0
|
|
}
|
|
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
|
|
}
|
|
|
|
type iceConnection interface {
|
|
addICECandidate(candidate *webrtc.ICECandidateInit) error
|
|
flushICECandidates() error
|
|
}
|
|
|
|
type downTrackAtomics struct {
|
|
rtt uint64
|
|
sr uint64
|
|
srNTP uint64
|
|
remoteNTP uint64
|
|
remoteRTP uint32
|
|
layerInfo uint32
|
|
}
|
|
|
|
type rtpDownTrack struct {
|
|
track *webrtc.TrackLocalStaticRTP
|
|
sender *webrtc.RTPSender
|
|
conn *rtpDownConnection
|
|
remote conn.UpTrack
|
|
ssrc webrtc.SSRC
|
|
packetmap packetmap.Map
|
|
maxBitrate *bitrate
|
|
maxREMBBitrate *bitrate
|
|
rate *estimator.Estimator
|
|
stats *receiverStats
|
|
atomics *downTrackAtomics
|
|
cname atomic.Value
|
|
}
|
|
|
|
func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
|
|
atomic.StoreUint64(&down.atomics.remoteNTP, ntp)
|
|
atomic.StoreUint32(&down.atomics.remoteRTP, rtp)
|
|
}
|
|
|
|
func (down *rtpDownTrack) getTimeOffset() (uint64, uint32) {
|
|
ntp := atomic.LoadUint64(&down.atomics.remoteNTP)
|
|
rtp := atomic.LoadUint32(&down.atomics.remoteRTP)
|
|
return ntp, rtp
|
|
}
|
|
|
|
func (down *rtpDownTrack) getRTT() uint64 {
|
|
return atomic.LoadUint64(&down.atomics.rtt)
|
|
}
|
|
|
|
func (down *rtpDownTrack) setRTT(rtt uint64) {
|
|
atomic.StoreUint64(&down.atomics.rtt, rtt)
|
|
}
|
|
|
|
func (down *rtpDownTrack) getSRTime() (uint64, uint64) {
|
|
tm := atomic.LoadUint64(&down.atomics.sr)
|
|
ntp := atomic.LoadUint64(&down.atomics.srNTP)
|
|
return tm, ntp
|
|
}
|
|
|
|
func (down *rtpDownTrack) setSRTime(tm uint64, ntp uint64) {
|
|
atomic.StoreUint64(&down.atomics.sr, tm)
|
|
atomic.StoreUint64(&down.atomics.srNTP, ntp)
|
|
}
|
|
|
|
func (down *rtpDownTrack) SetCname(cname string) {
|
|
down.cname.Store(cname)
|
|
}
|
|
|
|
type layerInfo struct {
|
|
sid, wantedSid, maxSid uint8
|
|
tid, wantedTid, maxTid uint8
|
|
}
|
|
|
|
func (down *rtpDownTrack) getLayerInfo() layerInfo {
|
|
info := atomic.LoadUint32(&down.atomics.layerInfo)
|
|
return layerInfo{
|
|
sid: uint8((info & 0xF)),
|
|
wantedSid: uint8((info >> 4) & 0xF),
|
|
maxSid: uint8((info >> 8) & 0xF),
|
|
tid: uint8((info >> 16) & 0xF),
|
|
wantedTid: uint8((info >> 20) & 0xF),
|
|
maxTid: uint8((info >> 24) & 0xF),
|
|
}
|
|
}
|
|
|
|
func (down *rtpDownTrack) setLayerInfo(info layerInfo) {
|
|
atomic.StoreUint32(&down.atomics.layerInfo,
|
|
uint32(info.sid&0xF)|
|
|
uint32(info.wantedSid&0xF)<<4|
|
|
uint32(info.maxSid&0xF)<<8|
|
|
uint32(info.tid&0xF)<<16|
|
|
uint32(info.wantedTid&0xF)<<20|
|
|
uint32(info.maxTid&0xF)<<24,
|
|
)
|
|
}
|
|
|
|
const (
|
|
negotiationUnneeded = iota
|
|
negotiationNeeded
|
|
negotiationRestartIce
|
|
)
|
|
|
|
type rtpDownConnection struct {
|
|
id string
|
|
pc *webrtc.PeerConnection
|
|
remote conn.Up
|
|
iceCandidates []*webrtc.ICECandidateInit
|
|
negotiationNeeded int
|
|
requested []string
|
|
|
|
mu sync.Mutex
|
|
tracks []*rtpDownTrack
|
|
}
|
|
|
|
func (down *rtpDownConnection) getTracks() []*rtpDownTrack {
|
|
down.mu.Lock()
|
|
defer down.mu.Unlock()
|
|
tracks := make([]*rtpDownTrack, len(down.tracks))
|
|
copy(tracks, down.tracks)
|
|
return tracks
|
|
}
|
|
|
|
func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
|
|
api, err := c.Group().API()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pc, err := api.NewPeerConnection(*ice.ICEConfiguration())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
log.Printf("Got track on downstream connection")
|
|
})
|
|
|
|
conn := &rtpDownConnection{
|
|
id: id,
|
|
pc: pc,
|
|
remote: remote,
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
var packetBufPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]byte, packetcache.BufSize)
|
|
},
|
|
}
|
|
|
|
func (down *rtpDownTrack) Write(buf []byte) (int, error) {
|
|
codec := down.remote.Codec().MimeType
|
|
|
|
flags, err := getPacketFlags(codec, buf)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
layer := down.getLayerInfo()
|
|
|
|
if flags.tid > layer.maxTid || flags.sid > layer.maxSid {
|
|
if flags.tid > layer.maxTid {
|
|
if layer.tid == layer.maxTid {
|
|
layer.wantedTid = flags.tid
|
|
layer.tid = flags.tid
|
|
}
|
|
layer.maxTid = flags.tid
|
|
}
|
|
if flags.sid > layer.maxSid {
|
|
if layer.sid == layer.maxSid {
|
|
layer.wantedSid = flags.sid
|
|
layer.sid = flags.sid
|
|
}
|
|
layer.maxSid = flags.sid
|
|
}
|
|
down.setLayerInfo(layer)
|
|
down.adjustLayer()
|
|
}
|
|
if flags.start && (layer.tid != layer.wantedTid) {
|
|
if layer.wantedTid < layer.tid || flags.tidupsync {
|
|
layer.tid = layer.wantedTid
|
|
down.setLayerInfo(layer)
|
|
}
|
|
}
|
|
|
|
if flags.start && (layer.sid != layer.wantedSid) {
|
|
if flags.sidsync {
|
|
layer.sid = layer.wantedTid
|
|
down.setLayerInfo(layer)
|
|
}
|
|
}
|
|
|
|
if flags.tid > layer.tid || flags.sid > layer.sid ||
|
|
(flags.sid < layer.sid && flags.sidnonreference) {
|
|
ok := down.packetmap.Drop(flags.seqno, flags.pid)
|
|
if ok {
|
|
return 0, nil
|
|
}
|
|
}
|
|
|
|
ok, newseqno, piddelta := down.packetmap.Map(flags.seqno, flags.pid)
|
|
if !ok {
|
|
return 0, nil
|
|
}
|
|
|
|
if newseqno == flags.seqno && piddelta == 0 {
|
|
return down.write(buf)
|
|
}
|
|
|
|
ibuf2 := packetBufPool.Get()
|
|
defer packetBufPool.Put(ibuf2)
|
|
buf2 := ibuf2.([]byte)
|
|
|
|
n := copy(buf2, buf)
|
|
err = rewritePacket(codec, buf2[:n], newseqno, piddelta)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return down.write(buf2[:n])
|
|
}
|
|
|
|
func (down *rtpDownTrack) write(buf []byte) (int, error) {
|
|
n, err := down.track.Write(buf)
|
|
if err == nil {
|
|
down.rate.Accumulate(uint32(n))
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (t *rtpDownTrack) GetMaxBitrate() (uint64, int, int) {
|
|
now := rtptime.Jiffies()
|
|
layer := t.getLayerInfo()
|
|
r := t.maxBitrate.Get(now)
|
|
if r == ^uint64(0) {
|
|
r = 512 * 1024
|
|
}
|
|
rr := t.maxREMBBitrate.Get(now)
|
|
if rr != 0 && rr < r {
|
|
r = rr
|
|
}
|
|
return r, int(layer.sid), int(layer.tid)
|
|
}
|
|
|
|
func (t *rtpDownTrack) adjustLayer() {
|
|
max, _, _ := t.GetMaxBitrate()
|
|
r, _ := t.rate.Estimate()
|
|
rate := uint64(r) * 8
|
|
if rate < max*7/8 {
|
|
layer := t.getLayerInfo()
|
|
if layer.tid < layer.maxTid {
|
|
layer.wantedTid = layer.tid + 1
|
|
t.setLayerInfo(layer)
|
|
}
|
|
} else if rate > max*3/2 {
|
|
layer := t.getLayerInfo()
|
|
if layer.tid > 0 {
|
|
layer.wantedTid = layer.tid - 1
|
|
t.setLayerInfo(layer)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
|
|
if down.pc.RemoteDescription() != nil {
|
|
return down.pc.AddICECandidate(*candidate)
|
|
}
|
|
down.iceCandidates = append(down.iceCandidates, candidate)
|
|
return nil
|
|
}
|
|
|
|
func flushICECandidates(pc *webrtc.PeerConnection, candidates []*webrtc.ICECandidateInit) error {
|
|
if pc.RemoteDescription() == nil {
|
|
return errors.New("flushICECandidates called in bad state")
|
|
}
|
|
|
|
var err error
|
|
for _, candidate := range candidates {
|
|
err2 := pc.AddICECandidate(*candidate)
|
|
if err == nil {
|
|
err = err2
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (down *rtpDownConnection) flushICECandidates() error {
|
|
err := flushICECandidates(down.pc, down.iceCandidates)
|
|
down.iceCandidates = nil
|
|
return err
|
|
}
|
|
|
|
type rtpUpTrack struct {
|
|
track *webrtc.TrackRemote
|
|
conn *rtpUpConnection
|
|
rate *estimator.Estimator
|
|
cache *packetcache.Cache
|
|
jitter *jitter.Estimator
|
|
cname atomic.Value
|
|
|
|
localCh chan trackAction
|
|
readerDone chan struct{}
|
|
|
|
mu sync.Mutex
|
|
srTime uint64
|
|
srNTPTime uint64
|
|
srRTPTime uint32
|
|
maxLayer uint8
|
|
local []conn.DownTrack
|
|
bufferedNACKs []uint16
|
|
}
|
|
|
|
const (
|
|
trackActionAdd = iota
|
|
trackActionDel
|
|
trackActionKeyframe
|
|
)
|
|
|
|
type trackAction struct {
|
|
action int
|
|
track conn.DownTrack
|
|
}
|
|
|
|
func (up *rtpUpTrack) action(action int, track conn.DownTrack) {
|
|
select {
|
|
case up.localCh <- trackAction{action, track}:
|
|
case <-up.readerDone:
|
|
}
|
|
}
|
|
|
|
func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
|
|
for _, t := range up.local {
|
|
if t == local {
|
|
return nil
|
|
}
|
|
}
|
|
up.local = append(up.local, local)
|
|
up.action(trackActionAdd, local)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpTrack) RequestKeyframe() error {
|
|
up.action(trackActionKeyframe, nil)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
for i, l := range up.local {
|
|
if l == local {
|
|
up.local = append(up.local[:i], up.local[i+1:]...)
|
|
// do this asynchronously, to avoid deadlocking when
|
|
// multiple clients call this simultaneously.
|
|
go up.action(trackActionDel, l)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (up *rtpUpTrack) getLocal() []conn.DownTrack {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
local := make([]conn.DownTrack, len(up.local))
|
|
copy(local, up.local)
|
|
return local
|
|
}
|
|
|
|
func (up *rtpUpTrack) Label() string {
|
|
return up.track.RID()
|
|
}
|
|
|
|
func (up *rtpUpTrack) Kind() webrtc.RTPCodecType {
|
|
return up.track.Kind()
|
|
}
|
|
|
|
func (up *rtpUpTrack) Codec() webrtc.RTPCodecCapability {
|
|
return up.track.Codec().RTPCodecCapability
|
|
}
|
|
|
|
func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool {
|
|
for _, fb := range up.track.Codec().RTCPFeedback {
|
|
if fb.Type == tpe && fb.Parameter == parameter {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type rtpUpConnection struct {
|
|
id string
|
|
client group.Client
|
|
label string
|
|
userId string
|
|
username string
|
|
pc *webrtc.PeerConnection
|
|
iceCandidates []*webrtc.ICECandidateInit
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
pushed bool
|
|
replace string
|
|
tracks []*rtpUpTrack
|
|
local []conn.Down
|
|
}
|
|
|
|
func (up *rtpUpConnection) getTracks() []*rtpUpTrack {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
tracks := make([]*rtpUpTrack, len(up.tracks))
|
|
copy(tracks, up.tracks)
|
|
return tracks
|
|
}
|
|
|
|
func (up *rtpUpConnection) getReplace(reset bool) string {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
replace := up.replace
|
|
if reset {
|
|
up.replace = ""
|
|
}
|
|
return replace
|
|
}
|
|
|
|
func (up *rtpUpConnection) Id() string {
|
|
return up.id
|
|
}
|
|
|
|
func (up *rtpUpConnection) Label() string {
|
|
return up.label
|
|
}
|
|
|
|
func (up *rtpUpConnection) User() (string, string) {
|
|
return up.userId, up.username
|
|
}
|
|
|
|
func (up *rtpUpConnection) AddLocal(local conn.Down) error {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
// the connection may have been closed in the meantime, in which
|
|
// case we'd never get rid of the down connection
|
|
if up.closed {
|
|
return os.ErrClosed
|
|
}
|
|
for _, t := range up.local {
|
|
if t == local {
|
|
return nil
|
|
}
|
|
}
|
|
up.local = append(up.local, local)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpConnection) DelLocal(local conn.Down) bool {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
for i, l := range up.local {
|
|
if l == local {
|
|
up.local = append(up.local[:i], up.local[i+1:]...)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (up *rtpUpConnection) getLocal() []conn.Down {
|
|
up.mu.Lock()
|
|
defer up.mu.Unlock()
|
|
local := make([]conn.Down, len(up.local))
|
|
copy(local, up.local)
|
|
return local
|
|
}
|
|
|
|
func (up *rtpUpConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
|
|
if up.pc.RemoteDescription() != nil {
|
|
return up.pc.AddICECandidate(*candidate)
|
|
}
|
|
up.iceCandidates = append(up.iceCandidates, candidate)
|
|
return nil
|
|
}
|
|
|
|
func (up *rtpUpConnection) flushICECandidates() error {
|
|
err := flushICECandidates(up.pc, up.iceCandidates)
|
|
up.iceCandidates = nil
|
|
return err
|
|
}
|
|
|
|
// pushConnNow pushes a connection to all of the clients in a group
|
|
func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) {
|
|
up.mu.Lock()
|
|
up.pushed = true
|
|
replace := up.replace
|
|
up.replace = ""
|
|
tracks := make([]conn.UpTrack, len(up.tracks))
|
|
for i, t := range up.tracks {
|
|
tracks[i] = t
|
|
}
|
|
up.mu.Unlock()
|
|
|
|
for _, c := range cs {
|
|
c.PushConn(g, up.id, up, tracks, replace)
|
|
}
|
|
}
|
|
|
|
// pushConn schedules a call to pushConnNow
|
|
func pushConn(up *rtpUpConnection, g *group.Group, cs []group.Client) {
|
|
up.mu.Lock()
|
|
up.pushed = false
|
|
up.mu.Unlock()
|
|
|
|
go func(g *group.Group, cs []group.Client) {
|
|
time.Sleep(200 * time.Millisecond)
|
|
up.mu.Lock()
|
|
pushed := up.pushed
|
|
up.pushed = true
|
|
up.mu.Unlock()
|
|
if !pushed {
|
|
pushConnNow(up, g, cs)
|
|
}
|
|
}(g, cs)
|
|
}
|
|
|
|
func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpConnection, error) {
|
|
var o sdp.SessionDescription
|
|
err := o.Unmarshal([]byte(offer))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
api, err := c.Group().API()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pc, err := api.NewPeerConnection(*ice.ICEConfiguration())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, m := range o.MediaDescriptions {
|
|
_, err = pc.AddTransceiverFromKind(
|
|
webrtc.NewRTPCodecType(m.MediaName.Media),
|
|
webrtc.RtpTransceiverInit{
|
|
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
|
},
|
|
)
|
|
if err != nil {
|
|
pc.Close()
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
up := &rtpUpConnection{id: id, client: c, label: label, pc: pc}
|
|
|
|
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
up.mu.Lock()
|
|
|
|
track := &rtpUpTrack{
|
|
track: remote,
|
|
conn: up,
|
|
cache: packetcache.New(minPacketCache(remote)),
|
|
rate: estimator.New(time.Second),
|
|
jitter: jitter.New(remote.Codec().ClockRate),
|
|
localCh: make(chan trackAction, 2),
|
|
readerDone: make(chan struct{}),
|
|
}
|
|
|
|
up.tracks = append(up.tracks, track)
|
|
|
|
go readLoop(track)
|
|
|
|
go rtcpUpListener(up, track, receiver)
|
|
|
|
up.mu.Unlock()
|
|
|
|
pushConn(up, c.Group(), c.Group().GetClients(c))
|
|
})
|
|
|
|
pushConn(up, c.Group(), c.Group().GetClients(c))
|
|
go rtcpUpSender(up)
|
|
|
|
return up, nil
|
|
}
|
|
|
|
var ErrUnsupportedFeedback = errors.New("unsupported feedback type")
|
|
var ErrRateLimited = errors.New("rate limited")
|
|
|
|
func (track *rtpUpTrack) sendPLI() error {
|
|
if !track.hasRtcpFb("nack", "pli") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
return sendPLI(track.conn.pc, track.track.SSRC())
|
|
}
|
|
|
|
func sendPLI(pc *webrtc.PeerConnection, ssrc webrtc.SSRC) error {
|
|
return pc.WriteRTCP([]rtcp.Packet{
|
|
&rtcp.PictureLossIndication{MediaSSRC: uint32(ssrc)},
|
|
})
|
|
}
|
|
|
|
func (track *rtpUpTrack) sendNACK(first uint16, bitmap uint16) error {
|
|
if !track.hasRtcpFb("nack", "") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
|
|
err := sendNACKs(track.conn.pc, track.track.SSRC(),
|
|
[]rtcp.NackPair{{first, rtcp.PacketBitmap(bitmap)}},
|
|
)
|
|
if err == nil {
|
|
track.cache.Expect(1 + bits.OnesCount16(bitmap))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (track *rtpUpTrack) sendNACKs(seqnos []uint16) error {
|
|
count := len(seqnos)
|
|
if count == 0 {
|
|
return nil
|
|
}
|
|
|
|
if !track.hasRtcpFb("nack", "") {
|
|
return ErrUnsupportedFeedback
|
|
}
|
|
|
|
var nacks []rtcp.NackPair
|
|
|
|
for len(seqnos) > 0 {
|
|
if len(nacks) >= 240 {
|
|
log.Printf("NACK: packet overflow")
|
|
break
|
|
}
|
|
var f, b uint16
|
|
f, b, seqnos = packetcache.ToBitmap(seqnos)
|
|
nacks = append(nacks, rtcp.NackPair{f, rtcp.PacketBitmap(b)})
|
|
}
|
|
err := sendNACKs(track.conn.pc, track.track.SSRC(), nacks)
|
|
if err == nil {
|
|
track.cache.Expect(count)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func sendNACKs(pc *webrtc.PeerConnection, ssrc webrtc.SSRC, nacks []rtcp.NackPair) error {
|
|
packet := rtcp.Packet(
|
|
&rtcp.TransportLayerNack{
|
|
MediaSSRC: uint32(ssrc),
|
|
Nacks: nacks,
|
|
},
|
|
)
|
|
return pc.WriteRTCP([]rtcp.Packet{packet})
|
|
}
|
|
|
|
func gotNACK(track *rtpDownTrack, p *rtcp.TransportLayerNack) {
|
|
buf := make([]byte, packetcache.BufSize)
|
|
for _, nack := range p.Nacks {
|
|
nack.Range(func(s uint16) bool {
|
|
ok, seqno, _ := track.packetmap.Reverse(s)
|
|
if !ok {
|
|
return true
|
|
}
|
|
l := track.remote.GetPacket(seqno, buf, true)
|
|
if l == 0 {
|
|
return true
|
|
}
|
|
_, err := track.Write(buf[:l])
|
|
if err != nil {
|
|
log.Printf("Write: %v", err)
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
func (track *rtpUpTrack) GetPacket(seqno uint16, result []byte, nack bool) uint16 {
|
|
n := track.cache.Get(seqno, result)
|
|
if n > 0 || !nack {
|
|
return n
|
|
}
|
|
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
doit := len(track.bufferedNACKs) == 0
|
|
|
|
for _, s := range track.bufferedNACKs {
|
|
if s == seqno {
|
|
return 0
|
|
}
|
|
}
|
|
track.bufferedNACKs = append(track.bufferedNACKs, seqno)
|
|
|
|
if doit {
|
|
go nackWriter(track)
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) {
|
|
buf := make([]byte, 1500)
|
|
|
|
for {
|
|
firstSR := false
|
|
n, _, err := r.ReadSimulcast(buf, track.track.RID())
|
|
if err != nil {
|
|
if err != io.EOF && err != io.ErrClosedPipe {
|
|
log.Printf("Read RTCP: %v", err)
|
|
}
|
|
return
|
|
}
|
|
ps, err := rtcp.Unmarshal(buf[:n])
|
|
if err != nil {
|
|
log.Printf("Unmarshal RTCP: %v", err)
|
|
continue
|
|
}
|
|
|
|
jiffies := rtptime.Jiffies()
|
|
|
|
for _, p := range ps {
|
|
local := track.getLocal()
|
|
switch p := p.(type) {
|
|
case *rtcp.SenderReport:
|
|
track.mu.Lock()
|
|
if track.srTime == 0 {
|
|
firstSR = true
|
|
}
|
|
track.srTime = jiffies
|
|
track.srNTPTime = p.NTPTime
|
|
track.srRTPTime = p.RTPTime
|
|
track.mu.Unlock()
|
|
for _, l := range local {
|
|
l.SetTimeOffset(p.NTPTime, p.RTPTime)
|
|
}
|
|
case *rtcp.SourceDescription:
|
|
for _, c := range p.Chunks {
|
|
if c.Source != uint32(track.track.SSRC()) {
|
|
continue
|
|
}
|
|
for _, i := range c.Items {
|
|
if i.Type != rtcp.SDESCNAME {
|
|
continue
|
|
}
|
|
track.cname.Store(i.Text)
|
|
for _, l := range local {
|
|
l.SetCname(i.Text)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if firstSR {
|
|
// this is the first SR we got for at least one track,
|
|
// quickly propagate the time offsets downstream
|
|
local := conn.getLocal()
|
|
for _, l := range local {
|
|
l, ok := l.(*rtpDownConnection)
|
|
if ok {
|
|
err := sendSR(l)
|
|
if err != nil {
|
|
log.Printf("sendSR: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendUpRTCP(up *rtpUpConnection) error {
|
|
tracks := up.getTracks()
|
|
|
|
if len(up.tracks) == 0 {
|
|
state := up.pc.ConnectionState()
|
|
if state == webrtc.PeerConnectionStateClosed {
|
|
return io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
now := rtptime.Jiffies()
|
|
|
|
reports := make([]rtcp.ReceptionReport, 0, len(up.tracks))
|
|
for _, t := range tracks {
|
|
updateUpTrack(t)
|
|
stats := t.cache.GetStats(true)
|
|
var totalLost uint32
|
|
if stats.TotalExpected > stats.TotalReceived {
|
|
totalLost = stats.TotalExpected - stats.TotalReceived
|
|
}
|
|
var fractionLost uint32
|
|
if stats.Expected > stats.Received {
|
|
lost := stats.Expected - stats.Received
|
|
fractionLost = lost * 256 / stats.Expected
|
|
if fractionLost >= 255 {
|
|
fractionLost = 255
|
|
}
|
|
}
|
|
|
|
t.mu.Lock()
|
|
srTime := t.srTime
|
|
srNTPTime := t.srNTPTime
|
|
t.mu.Unlock()
|
|
|
|
var delay uint64
|
|
if srTime != 0 {
|
|
delay = (now - srTime) /
|
|
(rtptime.JiffiesPerSec / 0x10000)
|
|
}
|
|
|
|
reports = append(reports, rtcp.ReceptionReport{
|
|
SSRC: uint32(t.track.SSRC()),
|
|
FractionLost: uint8(fractionLost),
|
|
TotalLost: totalLost,
|
|
LastSequenceNumber: stats.ESeqno,
|
|
Jitter: t.jitter.Jitter(),
|
|
LastSenderReport: uint32(srNTPTime >> 16),
|
|
Delay: uint32(delay),
|
|
})
|
|
}
|
|
|
|
packets := []rtcp.Packet{
|
|
&rtcp.ReceiverReport{
|
|
Reports: reports,
|
|
},
|
|
}
|
|
|
|
var ssrcs []uint32
|
|
var rate uint64
|
|
for _, t := range tracks {
|
|
if !t.hasRtcpFb("goog-remb", "") {
|
|
continue
|
|
}
|
|
ssrcs = append(ssrcs, uint32(t.track.SSRC()))
|
|
if t.Kind() == webrtc.RTPCodecTypeAudio {
|
|
rate += 100 * 1024
|
|
} else if t.Label() == "l" {
|
|
rate += group.LowBitrate
|
|
} else {
|
|
minrate := ^uint64(0)
|
|
maxrate := uint64(group.MinBitrate)
|
|
maxsid := 0
|
|
maxtid := 0
|
|
local := t.getLocal()
|
|
for _, down := range local {
|
|
r, sid, tid := down.GetMaxBitrate()
|
|
if maxsid < sid {
|
|
maxsid = sid
|
|
}
|
|
if maxtid < tid {
|
|
maxtid = tid
|
|
}
|
|
if r < group.MinBitrate {
|
|
r = group.MinBitrate
|
|
}
|
|
if minrate > r {
|
|
minrate = r
|
|
}
|
|
if maxrate < r {
|
|
maxrate = r
|
|
}
|
|
}
|
|
// assume that lower spatial layers take up 1/5 of
|
|
// the throughput
|
|
if maxsid > 0 {
|
|
maxrate = maxrate * 5 / 4
|
|
}
|
|
// assume that each layer takes two times less
|
|
// throughput than the higher one. Then we've
|
|
// got enough slack for a factor of 2^(layers-1).
|
|
for i := 0; i < maxtid; i++ {
|
|
if minrate < ^uint64(0)/2 {
|
|
minrate *= 2
|
|
}
|
|
}
|
|
if minrate < maxrate {
|
|
rate += minrate
|
|
} else {
|
|
rate += maxrate
|
|
}
|
|
}
|
|
}
|
|
|
|
if rate < ^uint64(0) && len(ssrcs) > 0 {
|
|
packets = append(packets,
|
|
&rtcp.ReceiverEstimatedMaximumBitrate{
|
|
Bitrate: rate,
|
|
SSRCs: ssrcs,
|
|
},
|
|
)
|
|
}
|
|
return up.pc.WriteRTCP(packets)
|
|
}
|
|
|
|
func rtcpUpSender(conn *rtpUpConnection) {
|
|
for {
|
|
time.Sleep(time.Second)
|
|
err := sendUpRTCP(conn)
|
|
if err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
return
|
|
}
|
|
log.Printf("sendUpRTCP: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendSR(conn *rtpDownConnection) error {
|
|
tracks := conn.getTracks()
|
|
|
|
packets := make([]rtcp.Packet, 0, len(tracks))
|
|
|
|
now := time.Now()
|
|
nowNTP := rtptime.TimeToNTP(now)
|
|
jiffies := rtptime.TimeToJiffies(now)
|
|
|
|
for _, t := range tracks {
|
|
clockrate := t.track.Codec().ClockRate
|
|
|
|
var nowRTP uint32
|
|
|
|
remoteNTP, remoteRTP := t.getTimeOffset()
|
|
if remoteNTP != 0 {
|
|
srTime := rtptime.NTPToTime(remoteNTP)
|
|
d := now.Sub(srTime)
|
|
if d > 0 && d < time.Hour {
|
|
delay := rtptime.FromDuration(
|
|
d, clockrate,
|
|
)
|
|
nowRTP = remoteRTP + uint32(delay)
|
|
}
|
|
|
|
p, b := t.rate.Totals()
|
|
packets = append(packets,
|
|
&rtcp.SenderReport{
|
|
SSRC: uint32(t.ssrc),
|
|
NTPTime: nowNTP,
|
|
RTPTime: nowRTP,
|
|
PacketCount: p,
|
|
OctetCount: b,
|
|
})
|
|
t.setSRTime(jiffies, nowNTP)
|
|
}
|
|
|
|
cname, ok := t.cname.Load().(string)
|
|
if ok && cname != "" {
|
|
item := rtcp.SourceDescriptionItem{
|
|
Type: rtcp.SDESCNAME,
|
|
Text: cname,
|
|
}
|
|
packets = append(packets,
|
|
&rtcp.SourceDescription{
|
|
Chunks: []rtcp.SourceDescriptionChunk{
|
|
{
|
|
Source: uint32(t.ssrc),
|
|
Items: []rtcp.SourceDescriptionItem{item},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
if len(packets) == 0 {
|
|
state := conn.pc.ConnectionState()
|
|
if state == webrtc.PeerConnectionStateClosed {
|
|
return io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return conn.pc.WriteRTCP(packets)
|
|
}
|
|
|
|
func rtcpDownSender(conn *rtpDownConnection) {
|
|
for {
|
|
time.Sleep(time.Second / 2)
|
|
err := sendSR(conn)
|
|
if err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
return
|
|
}
|
|
log.Printf("sendSR: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
minLossRate = 9600
|
|
initLossRate = 512 * 1000
|
|
maxLossRate = 1 << 30
|
|
)
|
|
|
|
func (track *rtpDownTrack) updateRate(loss uint8, now uint64) {
|
|
rate := track.maxBitrate.Get(now)
|
|
if rate < minLossRate || rate > maxLossRate {
|
|
// no recent feedback, reset
|
|
rate = initLossRate
|
|
}
|
|
if loss < 5 {
|
|
// if our actual rate is low, then we're not probing the
|
|
// bottleneck
|
|
r, _ := track.rate.Estimate()
|
|
actual := 8 * uint64(r)
|
|
if actual >= (rate*3)/4 {
|
|
// loss < 0.02, multiply by 1.05
|
|
rate = rate * 269 / 256
|
|
if rate > maxLossRate {
|
|
rate = maxLossRate
|
|
}
|
|
}
|
|
} else if loss > 25 {
|
|
// loss > 0.1, multiply by (1 - loss/2)
|
|
rate = rate * (512 - uint64(loss)) / 512
|
|
if rate < minLossRate {
|
|
rate = minLossRate
|
|
}
|
|
}
|
|
|
|
// update unconditionally, to set the timestamp
|
|
track.maxBitrate.Set(rate, now)
|
|
}
|
|
|
|
func rtcpDownListener(track *rtpDownTrack, s *webrtc.RTPSender) {
|
|
lastFirSeqno := uint8(0)
|
|
|
|
buf := make([]byte, 1500)
|
|
|
|
for {
|
|
n, _, err := s.Read(buf)
|
|
if err != nil {
|
|
if err != io.EOF && err != io.ErrClosedPipe {
|
|
log.Printf("Read RTCP: %v", err)
|
|
}
|
|
return
|
|
}
|
|
ps, err := rtcp.Unmarshal(buf[:n])
|
|
if err != nil {
|
|
log.Printf("Unmarshal RTCP: %v", err)
|
|
continue
|
|
}
|
|
|
|
adjust := false
|
|
jiffies := rtptime.Jiffies()
|
|
|
|
for _, p := range ps {
|
|
switch p := p.(type) {
|
|
case *rtcp.PictureLossIndication:
|
|
track.remote.RequestKeyframe()
|
|
case *rtcp.FullIntraRequest:
|
|
found := false
|
|
var seqno uint8
|
|
for _, entry := range p.FIR {
|
|
if entry.SSRC == uint32(track.ssrc) {
|
|
found = true
|
|
seqno = entry.SequenceNumber
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
log.Printf("Misdirected FIR")
|
|
continue
|
|
}
|
|
|
|
if seqno != lastFirSeqno {
|
|
track.remote.RequestKeyframe()
|
|
}
|
|
case *rtcp.ReceiverEstimatedMaximumBitrate:
|
|
track.maxREMBBitrate.Set(p.Bitrate, jiffies)
|
|
adjust = true
|
|
case *rtcp.ReceiverReport:
|
|
for _, r := range p.Reports {
|
|
if r.SSRC == uint32(track.ssrc) {
|
|
handleReport(track, r, jiffies)
|
|
adjust = true
|
|
}
|
|
}
|
|
case *rtcp.SenderReport:
|
|
for _, r := range p.Reports {
|
|
if r.SSRC == uint32(track.ssrc) {
|
|
handleReport(track, r, jiffies)
|
|
}
|
|
}
|
|
case *rtcp.TransportLayerNack:
|
|
gotNACK(track, p)
|
|
}
|
|
}
|
|
if adjust {
|
|
track.adjustLayer()
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint64) {
|
|
track.stats.Set(report.FractionLost, report.Jitter, jiffies)
|
|
track.updateRate(report.FractionLost, jiffies)
|
|
|
|
if report.LastSenderReport != 0 {
|
|
jiffies := rtptime.Jiffies()
|
|
srTime, srNTPTime := track.getSRTime()
|
|
if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec {
|
|
return
|
|
}
|
|
if report.LastSenderReport == uint32(srNTPTime>>16) {
|
|
delay := uint64(report.Delay) *
|
|
(rtptime.JiffiesPerSec / 0x10000)
|
|
if delay > jiffies-srTime {
|
|
return
|
|
}
|
|
rtt := (jiffies - srTime) - delay
|
|
oldrtt := track.getRTT()
|
|
newrtt := rtt
|
|
if oldrtt > 0 {
|
|
newrtt = (3*oldrtt + rtt) / 4
|
|
}
|
|
track.setRTT(newrtt)
|
|
}
|
|
}
|
|
}
|
|
|
|
func minPacketCache(track *webrtc.TrackRemote) int {
|
|
if track.Kind() == webrtc.RTPCodecTypeVideo {
|
|
return 128
|
|
}
|
|
return 24
|
|
}
|
|
|
|
func updateUpTrack(track *rtpUpTrack) {
|
|
now := rtptime.Jiffies()
|
|
|
|
clockrate := track.track.Codec().ClockRate
|
|
local := track.getLocal()
|
|
var maxrto uint64
|
|
for _, l := range local {
|
|
ll, ok := l.(*rtpDownTrack)
|
|
if ok {
|
|
_, j := ll.stats.Get(now)
|
|
jitter := uint64(j) *
|
|
(rtptime.JiffiesPerSec / uint64(clockrate))
|
|
rtt := ll.getRTT()
|
|
rto := rtt + 4*jitter
|
|
if rto > maxrto {
|
|
maxrto = rto
|
|
}
|
|
}
|
|
}
|
|
_, r := track.rate.Estimate()
|
|
packets := int((uint64(r) * maxrto * 4) / rtptime.JiffiesPerSec)
|
|
min := minPacketCache(track.track)
|
|
if packets < min {
|
|
packets = min
|
|
}
|
|
if packets > 1024 {
|
|
packets = 1024
|
|
}
|
|
track.cache.ResizeCond(packets)
|
|
}
|