1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-22 16:45:58 +01:00
galene/rtpconn/rtpconn.go

1076 lines
23 KiB
Go
Raw Normal View History

package rtpconn
2020-06-08 22:14:28 +02:00
import (
"errors"
"io"
"log"
"math/bits"
"sync"
"sync/atomic"
"time"
2020-09-13 11:04:16 +02:00
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"sfu/conn"
2020-06-08 22:14:28 +02:00
"sfu/estimator"
"sfu/group"
2020-06-08 22:14:28 +02:00
"sfu/jitter"
"sfu/packetcache"
"sfu/rtptime"
)
type bitrate struct {
bitrate uint64
jiffies uint64
}
func (br *bitrate) Set(bitrate uint64, now uint64) {
atomic.StoreUint64(&br.bitrate, bitrate)
atomic.StoreUint64(&br.jiffies, now)
}
func (br *bitrate) Get(now uint64) uint64 {
ts := atomic.LoadUint64(&br.jiffies)
if now < ts || now-ts > receiverReportTimeout {
return ^uint64(0)
}
return atomic.LoadUint64(&br.bitrate)
}
type receiverStats struct {
loss uint32
jitter uint32
jiffies uint64
}
func (s *receiverStats) Set(loss uint8, jitter uint32, now uint64) {
atomic.StoreUint32(&s.loss, uint32(loss))
atomic.StoreUint32(&s.jitter, jitter)
atomic.StoreUint64(&s.jiffies, now)
}
func (s *receiverStats) Get(now uint64) (uint8, uint32) {
ts := atomic.LoadUint64(&s.jiffies)
if now < ts || now > ts+receiverReportTimeout {
return 0, 0
}
return uint8(atomic.LoadUint32(&s.loss)), atomic.LoadUint32(&s.jitter)
}
const receiverReportTimeout = 8 * rtptime.JiffiesPerSec
type iceConnection interface {
addICECandidate(candidate *webrtc.ICECandidateInit) error
flushICECandidates() error
}
type rtpDownTrack struct {
track *webrtc.Track
2020-09-13 11:04:16 +02:00
remote conn.UpTrack
maxBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
srTime uint64
srNTPTime uint64
remoteNTPTime uint64
remoteRTPTime uint32
2020-09-03 15:29:18 +02:00
cname atomic.Value
rtt uint64
2020-06-08 22:14:28 +02:00
}
func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error {
return down.track.WriteRTP(packet)
}
func (down *rtpDownTrack) Accumulate(bytes uint32) {
down.rate.Accumulate(bytes)
}
2020-09-13 11:04:16 +02:00
func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint64(&down.remoteNTPTime, ntp)
atomic.StoreUint32(&down.remoteRTPTime, rtp)
}
2020-09-13 11:04:16 +02:00
func (down *rtpDownTrack) SetCname(cname string) {
2020-09-03 15:29:18 +02:00
down.cname.Store(cname)
}
2020-06-08 22:14:28 +02:00
type rtpDownConnection struct {
id string
pc *webrtc.PeerConnection
2020-09-13 11:04:16 +02:00
remote conn.Up
tracks []*rtpDownTrack
maxREMBBitrate *bitrate
iceCandidates []*webrtc.ICECandidateInit
2020-06-08 22:14:28 +02:00
}
func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
2020-06-08 22:14:28 +02:00
if err != nil {
return nil, err
}
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
log.Printf("Got track on downstream connection")
})
conn := &rtpDownConnection{
id: id,
pc: pc,
remote: remote,
maxREMBBitrate: new(bitrate),
2020-06-08 22:14:28 +02:00
}
return conn, nil
}
func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 {
rate := down.maxREMBBitrate.Get(now)
var trackRate uint64
for _, t := range down.tracks {
r := t.maxBitrate.Get(now)
if r == ^uint64(0) {
if t.track.Kind() == webrtc.RTPCodecTypeAudio {
r = 128 * 1024
} else {
r = 512 * 1024
}
}
trackRate += r
}
if trackRate < rate {
return trackRate
}
return rate
}
2020-06-08 22:14:28 +02:00
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
if down.pc.RemoteDescription() != nil {
return down.pc.AddICECandidate(*candidate)
}
down.iceCandidates = append(down.iceCandidates, candidate)
return nil
}
func flushICECandidates(pc *webrtc.PeerConnection, candidates []*webrtc.ICECandidateInit) error {
if pc.RemoteDescription() == nil {
return errors.New("flushICECandidates called in bad state")
}
var err error
for _, candidate := range candidates {
err2 := pc.AddICECandidate(*candidate)
if err == nil {
err = err2
}
}
return err
}
func (down *rtpDownConnection) flushICECandidates() error {
err := flushICECandidates(down.pc, down.iceCandidates)
down.iceCandidates = nil
return err
}
type rtpUpTrack struct {
track *webrtc.Track
label string
rate *estimator.Estimator
cache *packetcache.Cache
jitter *jitter.Estimator
lastPLI uint64
lastFIR uint64
firSeqno uint32
2020-06-08 22:14:28 +02:00
localCh chan localTrackAction
readerDone chan struct{}
2020-06-08 22:14:28 +02:00
mu sync.Mutex
cname string
srTime uint64
srNTPTime uint64
srRTPTime uint32
local []conn.DownTrack
bufferedNACKs []uint16
2020-06-08 22:14:28 +02:00
}
type localTrackAction struct {
add bool
2020-09-13 11:04:16 +02:00
track conn.DownTrack
2020-06-08 22:14:28 +02:00
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpTrack) notifyLocal(add bool, track conn.DownTrack) {
2020-06-08 22:14:28 +02:00
select {
case up.localCh <- localTrackAction{add, track}:
case <-up.readerDone:
2020-06-08 22:14:28 +02:00
}
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
2020-10-27 18:20:09 +01:00
defer up.mu.Unlock()
2020-06-08 22:14:28 +02:00
for _, t := range up.local {
if t == local {
return nil
}
}
up.local = append(up.local, local)
2020-10-27 18:20:09 +01:00
// do this asynchronously, to avoid deadlocks when multiple
// clients call this simultaneously.
go up.notifyLocal(true, local)
2020-06-08 22:14:28 +02:00
return nil
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
2020-10-27 18:20:09 +01:00
defer up.mu.Unlock()
2020-06-08 22:14:28 +02:00
for i, l := range up.local {
if l == local {
up.local = append(up.local[:i], up.local[i+1:]...)
2020-10-27 18:20:09 +01:00
// do this asynchronously, to avoid deadlocking when
// multiple clients call this simultaneously.
go up.notifyLocal(false, l)
2020-06-08 22:14:28 +02:00
return true
}
}
return false
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpTrack) getLocal() []conn.DownTrack {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
defer up.mu.Unlock()
2020-09-13 11:04:16 +02:00
local := make([]conn.DownTrack, len(up.local))
2020-06-08 22:14:28 +02:00
copy(local, up.local)
return local
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 {
2020-06-08 22:14:28 +02:00
return up.cache.Get(seqno, result)
}
func (up *rtpUpTrack) Label() string {
return up.label
}
func (up *rtpUpTrack) Codec() *webrtc.RTPCodec {
return up.track.Codec()
}
func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool {
for _, fb := range up.track.Codec().RTCPFeedback {
if fb.Type == tpe && fb.Parameter == parameter {
return true
}
}
return false
}
type rtpUpConnection struct {
id string
label string
pc *webrtc.PeerConnection
labels map[string]string
iceCandidates []*webrtc.ICECandidateInit
mu sync.Mutex
tracks []*rtpUpTrack
2020-09-13 11:04:16 +02:00
local []conn.Down
2020-06-08 22:14:28 +02:00
}
func (up *rtpUpConnection) getTracks() []*rtpUpTrack {
up.mu.Lock()
defer up.mu.Unlock()
tracks := make([]*rtpUpTrack, len(up.tracks))
copy(tracks, up.tracks)
return tracks
}
func (up *rtpUpConnection) Id() string {
return up.id
}
func (up *rtpUpConnection) Label() string {
return up.label
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpConnection) AddLocal(local conn.Down) error {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
defer up.mu.Unlock()
for _, t := range up.local {
if t == local {
return nil
}
}
up.local = append(up.local, local)
return nil
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpConnection) DelLocal(local conn.Down) bool {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
defer up.mu.Unlock()
for i, l := range up.local {
if l == local {
up.local = append(up.local[:i], up.local[i+1:]...)
return true
}
}
return false
}
2020-09-13 11:04:16 +02:00
func (up *rtpUpConnection) getLocal() []conn.Down {
2020-06-08 22:14:28 +02:00
up.mu.Lock()
defer up.mu.Unlock()
2020-09-13 11:04:16 +02:00
local := make([]conn.Down, len(up.local))
2020-06-08 22:14:28 +02:00
copy(local, up.local)
return local
}
func (up *rtpUpConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
if up.pc.RemoteDescription() != nil {
return up.pc.AddICECandidate(*candidate)
}
up.iceCandidates = append(up.iceCandidates, candidate)
return nil
}
func (up *rtpUpConnection) flushICECandidates() error {
err := flushICECandidates(up.pc, up.iceCandidates)
up.iceCandidates = nil
return err
}
func getTrackMid(pc *webrtc.PeerConnection, track *webrtc.Track) string {
for _, t := range pc.GetTransceivers() {
if t.Receiver() != nil && t.Receiver().Track() == track {
return t.Mid()
}
}
return ""
}
// called locked
func (up *rtpUpConnection) complete() bool {
2020-09-03 15:29:18 +02:00
for mid := range up.labels {
2020-06-08 22:14:28 +02:00
found := false
for _, t := range up.tracks {
m := getTrackMid(up.pc, t.track)
if m == mid {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func newUpConn(c group.Client, id string) (*rtpUpConnection, error) {
pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
2020-06-08 22:14:28 +02:00
if err != nil {
return nil, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
},
)
if err != nil {
pc.Close()
return nil, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
},
)
if err != nil {
pc.Close()
return nil, err
}
2020-09-13 11:04:16 +02:00
up := &rtpUpConnection{id: id, pc: pc}
2020-06-08 22:14:28 +02:00
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
2020-09-13 11:04:16 +02:00
up.mu.Lock()
2020-06-08 22:14:28 +02:00
mid := getTrackMid(pc, remote)
if mid == "" {
log.Printf("Couldn't get track's mid")
return
}
2020-09-13 11:04:16 +02:00
label, ok := up.labels[mid]
2020-06-08 22:14:28 +02:00
if !ok {
log.Printf("Couldn't get track's label")
isvideo := remote.Kind() == webrtc.RTPCodecTypeVideo
if isvideo {
label = "video"
} else {
label = "audio"
}
}
track := &rtpUpTrack{
track: remote,
label: label,
cache: packetcache.New(minPacketCache(remote)),
2020-06-08 22:14:28 +02:00
rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate),
localCh: make(chan localTrackAction, 2),
readerDone: make(chan struct{}),
2020-06-08 22:14:28 +02:00
}
2020-09-13 11:04:16 +02:00
up.tracks = append(up.tracks, track)
2020-06-08 22:14:28 +02:00
2020-09-13 11:04:16 +02:00
go readLoop(up, track)
2020-06-08 22:14:28 +02:00
2020-09-13 11:04:16 +02:00
go rtcpUpListener(up, track, receiver)
2020-06-08 22:14:28 +02:00
2020-09-13 11:04:16 +02:00
complete := up.complete()
var tracks []conn.UpTrack
if complete {
tracks = make([]conn.UpTrack, len(up.tracks))
for i, t := range up.tracks {
2020-06-08 22:14:28 +02:00
tracks[i] = t
}
}
// pushConn might need to take the lock
2020-09-13 11:04:16 +02:00
up.mu.Unlock()
if complete {
clients := c.Group().GetClients(c)
2020-06-08 22:14:28 +02:00
for _, cc := range clients {
cc.PushConn(up.id, up, tracks, up.label)
2020-06-08 22:14:28 +02:00
}
2020-09-13 11:04:16 +02:00
go rtcpUpSender(up)
2020-06-08 22:14:28 +02:00
}
})
2020-09-13 11:04:16 +02:00
return up, nil
2020-06-08 22:14:28 +02:00
}
var ErrUnsupportedFeedback = errors.New("unsupported feedback type")
var ErrRateLimited = errors.New("rate limited")
func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error {
if !track.hasRtcpFb("nack", "pli") {
return ErrUnsupportedFeedback
}
last := atomic.LoadUint64(&track.lastPLI)
now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/2 {
2020-06-08 22:14:28 +02:00
return ErrRateLimited
}
atomic.StoreUint64(&track.lastPLI, now)
return sendPLI(up.pc, track.track.SSRC())
}
func sendPLI(pc *webrtc.PeerConnection, ssrc uint32) error {
return pc.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: ssrc},
})
}
func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error {
// we need to reliably increment the seqno, even if we are going
// to drop the packet due to rate limiting.
var seqno uint8
if increment {
seqno = uint8(atomic.AddUint32(&track.firSeqno, 1) & 0xFF)
} else {
seqno = uint8(atomic.LoadUint32(&track.firSeqno) & 0xFF)
}
if !track.hasRtcpFb("ccm", "fir") {
return ErrUnsupportedFeedback
}
last := atomic.LoadUint64(&track.lastFIR)
now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/2 {
2020-06-08 22:14:28 +02:00
return ErrRateLimited
}
atomic.StoreUint64(&track.lastFIR, now)
return sendFIR(up.pc, track.track.SSRC(), seqno)
}
func sendFIR(pc *webrtc.PeerConnection, ssrc uint32, seqno uint8) error {
return pc.WriteRTCP([]rtcp.Packet{
&rtcp.FullIntraRequest{
FIR: []rtcp.FIREntry{
2020-09-03 15:29:18 +02:00
{
2020-06-08 22:14:28 +02:00
SSRC: ssrc,
SequenceNumber: seqno,
},
},
},
})
}
func (up *rtpUpConnection) sendNACK(track *rtpUpTrack, first uint16, bitmap uint16) error {
if !track.hasRtcpFb("nack", "") {
return ErrUnsupportedFeedback
2020-06-08 22:14:28 +02:00
}
err := sendNACKs(up.pc, track.track.SSRC(),
[]rtcp.NackPair{{first, rtcp.PacketBitmap(bitmap)}},
)
2020-06-08 22:14:28 +02:00
if err == nil {
track.cache.Expect(1 + bits.OnesCount16(bitmap))
}
return err
}
func (up *rtpUpConnection) sendNACKs(track *rtpUpTrack, seqnos []uint16) error {
count := len(seqnos)
if count == 0 {
return nil
}
if !track.hasRtcpFb("nack", "") {
return ErrUnsupportedFeedback
}
var nacks []rtcp.NackPair
for len(seqnos) > 0 {
if len(nacks) >= 240 {
log.Printf("NACK: packet overflow")
break
}
var f, b uint16
f, b, seqnos = packetcache.ToBitmap(seqnos)
nacks = append(nacks, rtcp.NackPair{f, rtcp.PacketBitmap(b)})
}
err := sendNACKs(up.pc, track.track.SSRC(), nacks)
if err == nil {
track.cache.Expect(count)
}
return err
}
func sendNACKs(pc *webrtc.PeerConnection, ssrc uint32, nacks []rtcp.NackPair) error {
2020-06-08 22:14:28 +02:00
packet := rtcp.Packet(
&rtcp.TransportLayerNack{
MediaSSRC: ssrc,
Nacks: nacks,
2020-06-08 22:14:28 +02:00
},
)
return pc.WriteRTCP([]rtcp.Packet{packet})
}
func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLayerNack) {
var unhandled []uint16
2020-06-08 22:14:28 +02:00
var packet rtp.Packet
buf := make([]byte, packetcache.BufSize)
for _, nack := range p.Nacks {
for _, seqno := range nack.PacketList() {
2020-09-13 11:04:16 +02:00
l := track.remote.GetRTP(seqno, buf)
2020-06-08 22:14:28 +02:00
if l == 0 {
unhandled = append(unhandled, seqno)
2020-06-08 22:14:28 +02:00
continue
}
err := packet.Unmarshal(buf[:l])
if err != nil {
continue
}
err = track.track.WriteRTP(&packet)
if err != nil {
log.Printf("WriteRTP: %v", err)
continue
}
track.rate.Accumulate(uint32(l))
}
}
if len(unhandled) == 0 {
return
}
track.remote.Nack(conn.remote, unhandled)
}
func (track *rtpUpTrack) Nack(conn conn.Up, nacks []uint16) error {
track.mu.Lock()
defer track.mu.Unlock()
doit := len(track.bufferedNACKs) == 0
outer:
for _, nack := range nacks {
for _, seqno := range track.bufferedNACKs {
if seqno == nack {
continue outer
}
}
track.bufferedNACKs = append(track.bufferedNACKs, nack)
}
if doit {
up, ok := conn.(*rtpUpConnection)
if !ok {
log.Printf("Nack: unexpected type %T", conn)
return errors.New("unexpected connection type")
}
go nackWriter(up, track)
}
return nil
2020-06-08 22:14:28 +02:00
}
func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) {
2020-10-01 22:21:59 +02:00
buf := make([]byte, 1500)
2020-06-08 22:14:28 +02:00
for {
firstSR := false
2020-10-01 22:21:59 +02:00
n, err := r.Read(buf)
2020-06-08 22:14:28 +02:00
if err != nil {
if err != io.EOF {
2020-10-01 22:21:59 +02:00
log.Printf("Read RTCP: %v", err)
2020-06-08 22:14:28 +02:00
}
return
}
2020-10-01 22:21:59 +02:00
ps, err := rtcp.Unmarshal(buf[:n])
if err != nil {
log.Printf("Unmarshal RTCP: %v", err)
continue
}
2020-06-08 22:14:28 +02:00
2020-10-01 22:21:59 +02:00
jiffies := rtptime.Jiffies()
2020-06-08 22:14:28 +02:00
for _, p := range ps {
2020-09-03 15:29:18 +02:00
local := track.getLocal()
2020-06-08 22:14:28 +02:00
switch p := p.(type) {
case *rtcp.SenderReport:
track.mu.Lock()
if track.srTime == 0 {
firstSR = true
}
2020-10-01 22:21:59 +02:00
track.srTime = jiffies
2020-06-08 22:14:28 +02:00
track.srNTPTime = p.NTPTime
track.srRTPTime = p.RTPTime
track.mu.Unlock()
for _, l := range local {
2020-09-13 11:04:16 +02:00
l.SetTimeOffset(p.NTPTime, p.RTPTime)
}
2020-06-08 22:14:28 +02:00
case *rtcp.SourceDescription:
2020-09-03 15:29:18 +02:00
for _, c := range p.Chunks {
if c.Source != track.track.SSRC() {
continue
}
for _, i := range c.Items {
if i.Type != rtcp.SDESCNAME {
continue
}
track.mu.Lock()
track.cname = i.Text
track.mu.Unlock()
for _, l := range local {
2020-09-13 11:04:16 +02:00
l.SetCname(i.Text)
2020-09-03 15:29:18 +02:00
}
}
}
2020-06-08 22:14:28 +02:00
}
}
if firstSR {
// this is the first SR we got for at least one track,
// quickly propagate the time offsets downstream
local := conn.getLocal()
for _, l := range local {
l, ok := l.(*rtpDownConnection)
if ok {
err := sendSR(l)
if err != nil {
log.Printf("sendSR: %v", err)
}
}
}
}
}
}
func sendUpRTCP(conn *rtpUpConnection) error {
2020-06-08 22:14:28 +02:00
conn.mu.Lock()
defer conn.mu.Unlock()
if len(conn.tracks) == 0 {
state := conn.pc.ConnectionState()
if state == webrtc.PeerConnectionStateClosed {
return io.ErrClosedPipe
}
2020-06-08 22:14:28 +02:00
return nil
}
now := rtptime.Jiffies()
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
for _, t := range conn.tracks {
updateUpTrack(t)
2020-06-08 22:14:28 +02:00
expected, lost, totalLost, eseqno := t.cache.GetStats(true)
if expected == 0 {
expected = 1
}
if lost >= expected {
lost = expected - 1
}
t.mu.Lock()
srTime := t.srTime
srNTPTime := t.srNTPTime
t.mu.Unlock()
var delay uint64
if srTime != 0 {
delay = (now - srTime) /
(rtptime.JiffiesPerSec / 0x10000)
}
reports = append(reports, rtcp.ReceptionReport{
SSRC: t.track.SSRC(),
FractionLost: uint8((lost * 256) / expected),
TotalLost: totalLost,
LastSequenceNumber: eseqno,
Jitter: t.jitter.Jitter(),
LastSenderReport: uint32(srNTPTime >> 16),
Delay: uint32(delay),
})
}
packets := []rtcp.Packet{
2020-06-08 22:14:28 +02:00
&rtcp.ReceiverReport{
Reports: reports,
},
}
rate := ^uint64(0)
for _, l := range conn.local {
r := l.GetMaxBitrate(now)
if r < rate {
rate = r
}
}
if rate < group.MinBitrate {
rate = group.MinBitrate
}
var ssrcs []uint32
for _, t := range conn.tracks {
if t.hasRtcpFb("goog-remb", "") {
continue
}
ssrcs = append(ssrcs, t.track.SSRC())
}
if len(ssrcs) > 0 {
packets = append(packets,
&rtcp.ReceiverEstimatedMaximumBitrate{
Bitrate: rate,
SSRCs: ssrcs,
},
)
}
return conn.pc.WriteRTCP(packets)
2020-06-08 22:14:28 +02:00
}
func rtcpUpSender(conn *rtpUpConnection) {
for {
time.Sleep(time.Second)
err := sendUpRTCP(conn)
2020-06-08 22:14:28 +02:00
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
log.Printf("sendRR: %v", err)
}
}
}
func sendSR(conn *rtpDownConnection) error {
// since this is only called after all tracks have been created,
// there is no need for locking.
packets := make([]rtcp.Packet, 0, len(conn.tracks))
now := time.Now()
nowNTP := rtptime.TimeToNTP(now)
jiffies := rtptime.TimeToJiffies(now)
for _, t := range conn.tracks {
clockrate := t.track.Codec().ClockRate
var nowRTP uint32
remoteNTP := atomic.LoadUint64(&t.remoteNTPTime)
remoteRTP := atomic.LoadUint32(&t.remoteRTPTime)
2020-09-03 15:29:18 +02:00
if remoteNTP != 0 {
srTime := rtptime.NTPToTime(remoteNTP)
d := now.Sub(srTime)
if d > 0 && d < time.Hour {
delay := rtptime.FromDuration(
d, clockrate,
)
nowRTP = remoteRTP + uint32(delay)
}
p, b := t.rate.Totals()
packets = append(packets,
&rtcp.SenderReport{
SSRC: t.track.SSRC(),
NTPTime: nowNTP,
RTPTime: nowRTP,
PacketCount: p,
OctetCount: b,
})
atomic.StoreUint64(&t.srTime, jiffies)
atomic.StoreUint64(&t.srNTPTime, nowNTP)
}
cname, ok := t.cname.Load().(string)
if ok {
item := rtcp.SourceDescriptionItem{
Type: rtcp.SDESCNAME,
Text: cname,
}
packets = append(packets,
&rtcp.SourceDescription{
Chunks: []rtcp.SourceDescriptionChunk{
{
Source: t.track.SSRC(),
Items: []rtcp.SourceDescriptionItem{item},
},
},
},
)
2020-06-08 22:14:28 +02:00
}
}
if len(packets) == 0 {
state := conn.pc.ConnectionState()
if state == webrtc.PeerConnectionStateClosed {
return io.ErrClosedPipe
}
2020-06-08 22:14:28 +02:00
return nil
}
return conn.pc.WriteRTCP(packets)
}
func rtcpDownSender(conn *rtpDownConnection) {
for {
time.Sleep(time.Second)
err := sendSR(conn)
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
log.Printf("sendSR: %v", err)
}
}
}
const (
minLossRate = 9600
initLossRate = 512 * 1000
maxLossRate = 1 << 30
)
func (track *rtpDownTrack) updateRate(loss uint8, now uint64) {
rate := track.maxBitrate.Get(now)
2020-06-08 22:14:28 +02:00
if rate < minLossRate || rate > maxLossRate {
// no recent feedback, reset
rate = initLossRate
}
if loss < 5 {
// if our actual rate is low, then we're not probing the
// bottleneck
r, _ := track.rate.Estimate()
actual := 8 * uint64(r)
if actual >= (rate*7)/8 {
// loss < 0.02, multiply by 1.05
rate = rate * 269 / 256
if rate > maxLossRate {
rate = maxLossRate
}
}
} else if loss > 25 {
// loss > 0.1, multiply by (1 - loss/2)
rate = rate * (512 - uint64(loss)) / 512
if rate < minLossRate {
rate = minLossRate
}
}
// update unconditionally, to set the timestamp
track.maxBitrate.Set(rate, now)
2020-06-08 22:14:28 +02:00
}
func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RTPSender) {
var gotFir bool
lastFirSeqno := uint8(0)
2020-10-01 22:21:59 +02:00
buf := make([]byte, 1500)
2020-06-08 22:14:28 +02:00
for {
2020-10-01 22:21:59 +02:00
n, err := s.Read(buf)
2020-06-08 22:14:28 +02:00
if err != nil {
if err != io.EOF {
2020-10-01 22:21:59 +02:00
log.Printf("Read RTCP: %v", err)
2020-06-08 22:14:28 +02:00
}
return
}
2020-10-01 22:21:59 +02:00
ps, err := rtcp.Unmarshal(buf[:n])
if err != nil {
log.Printf("Unmarshal RTCP: %v", err)
continue
}
2020-06-08 22:14:28 +02:00
jiffies := rtptime.Jiffies()
for _, p := range ps {
switch p := p.(type) {
case *rtcp.PictureLossIndication:
remote, ok := conn.remote.(*rtpUpConnection)
if !ok {
continue
}
rt, ok := track.remote.(*rtpUpTrack)
if !ok {
continue
}
err := remote.sendPLI(rt)
2020-06-09 14:57:37 +02:00
if err != nil && err != ErrRateLimited {
2020-06-08 22:14:28 +02:00
log.Printf("sendPLI: %v", err)
}
case *rtcp.FullIntraRequest:
found := false
var seqno uint8
for _, entry := range p.FIR {
if entry.SSRC == track.track.SSRC() {
found = true
seqno = entry.SequenceNumber
break
}
}
if !found {
log.Printf("Misdirected FIR")
continue
}
increment := true
if gotFir {
increment = seqno != lastFirSeqno
}
gotFir = true
lastFirSeqno = seqno
remote, ok := conn.remote.(*rtpUpConnection)
if !ok {
continue
}
rt, ok := track.remote.(*rtpUpTrack)
if !ok {
continue
}
err := remote.sendFIR(rt, increment)
if err == ErrUnsupportedFeedback {
err := remote.sendPLI(rt)
2020-06-09 14:57:37 +02:00
if err != nil && err != ErrRateLimited {
2020-06-08 22:14:28 +02:00
log.Printf("sendPLI: %v", err)
}
2020-10-30 01:32:08 +01:00
} else if err != nil && err != ErrRateLimited {
2020-06-08 22:14:28 +02:00
log.Printf("sendFIR: %v", err)
}
case *rtcp.ReceiverEstimatedMaximumBitrate:
conn.maxREMBBitrate.Set(p.Bitrate, jiffies)
2020-06-08 22:14:28 +02:00
case *rtcp.ReceiverReport:
for _, r := range p.Reports {
if r.SSRC == track.track.SSRC() {
handleReport(track, r, jiffies)
}
}
case *rtcp.SenderReport:
for _, r := range p.Reports {
if r.SSRC == track.track.SSRC() {
handleReport(track, r, jiffies)
}
}
case *rtcp.TransportLayerNack:
gotNACK(conn, track, p)
2020-06-08 22:14:28 +02:00
}
}
}
}
func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint64) {
track.stats.Set(report.FractionLost, report.Jitter, jiffies)
track.updateRate(report.FractionLost, jiffies)
if report.LastSenderReport != 0 {
jiffies := rtptime.Jiffies()
srTime := atomic.LoadUint64(&track.srTime)
if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec {
return
}
srNTPTime := atomic.LoadUint64(&track.srNTPTime)
if report.LastSenderReport == uint32(srNTPTime>>16) {
delay := uint64(report.Delay) *
(rtptime.JiffiesPerSec / 0x10000)
if delay > jiffies-srTime {
return
}
rtt := (jiffies - srTime) - delay
oldrtt := atomic.LoadUint64(&track.rtt)
newrtt := rtt
if oldrtt > 0 {
newrtt = (3*oldrtt + rtt) / 4
}
atomic.StoreUint64(&track.rtt, newrtt)
}
}
}
func minPacketCache(track *webrtc.Track) int {
if track.Kind() == webrtc.RTPCodecTypeVideo {
return 128
}
return 24
}
func updateUpTrack(track *rtpUpTrack) {
2020-06-08 22:14:28 +02:00
now := rtptime.Jiffies()
clockrate := track.track.Codec().ClockRate
local := track.getLocal()
var maxrto uint64
for _, l := range local {
ll, ok := l.(*rtpDownTrack)
if ok {
_, j := ll.stats.Get(now)
jitter := uint64(j) *
(rtptime.JiffiesPerSec / uint64(clockrate))
2020-06-08 22:14:28 +02:00
rtt := atomic.LoadUint64(&ll.rtt)
rto := rtt + 4*jitter
if rto > maxrto {
maxrto = rto
}
}
}
_, r := track.rate.Estimate()
packets := int((uint64(r) * maxrto * 4) / rtptime.JiffiesPerSec)
min := minPacketCache(track.track)
if packets < min {
packets = min
2020-06-08 22:14:28 +02:00
}
if packets > 1024 {
packets = 1024
2020-06-08 22:14:28 +02:00
}
track.cache.ResizeCond(packets)
}