1
Fork 0

Fix issues with unaligned atomic operations.

This could be solved by simply reordering the fields, but it
is more robust to move the atomics into their own structure,
and the extra indirection should not cost much.
This commit is contained in:
Juliusz Chroboczek 2020-12-27 01:24:52 +01:00
parent e88942c9a9
commit 9e4aede72a
4 changed files with 78 additions and 49 deletions

View File

@ -66,6 +66,14 @@ type iceConnection interface {
flushICECandidates() error flushICECandidates() error
} }
type downTrackAtomics struct {
rtt uint64
sr uint64
srNTP uint64
remoteNTP uint64
remoteRTP uint32
}
type rtpDownTrack struct { type rtpDownTrack struct {
track *webrtc.TrackLocalStaticRTP track *webrtc.TrackLocalStaticRTP
remote conn.UpTrack remote conn.UpTrack
@ -73,12 +81,8 @@ type rtpDownTrack struct {
maxBitrate *bitrate maxBitrate *bitrate
rate *estimator.Estimator rate *estimator.Estimator
stats *receiverStats stats *receiverStats
srTime uint64 atomics *downTrackAtomics
srNTPTime uint64
remoteNTPTime uint64
remoteRTPTime uint32
cname atomic.Value cname atomic.Value
rtt uint64
} }
func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error { func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error {
@ -90,8 +94,33 @@ func (down *rtpDownTrack) Accumulate(bytes uint32) {
} }
func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) { func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint64(&down.remoteNTPTime, ntp) atomic.StoreUint64(&down.atomics.remoteNTP, ntp)
atomic.StoreUint32(&down.remoteRTPTime, rtp) atomic.StoreUint32(&down.atomics.remoteRTP, rtp)
}
func (down *rtpDownTrack) getTimeOffset() (uint64, uint32) {
ntp := atomic.LoadUint64(&down.atomics.remoteNTP)
rtp := atomic.LoadUint32(&down.atomics.remoteRTP)
return ntp, rtp
}
func (down *rtpDownTrack) getRTT() uint64 {
return atomic.LoadUint64(&down.atomics.rtt)
}
func (down *rtpDownTrack) setRTT(rtt uint64) {
atomic.StoreUint64(&down.atomics.rtt, rtt)
}
func (down *rtpDownTrack) getSRTime() (uint64, uint64) {
tm := atomic.LoadUint64(&down.atomics.sr)
ntp := atomic.LoadUint64(&down.atomics.srNTP)
return tm, ntp
}
func (down *rtpDownTrack) setSRTime(tm uint64, ntp uint64) {
atomic.StoreUint64(&down.atomics.sr, tm)
atomic.StoreUint64(&down.atomics.srNTP, ntp)
} }
func (down *rtpDownTrack) SetCname(cname string) { func (down *rtpDownTrack) SetCname(cname string) {
@ -177,21 +206,25 @@ func (down *rtpDownConnection) flushICECandidates() error {
return err return err
} }
type upTrackAtomics struct {
lastPLI uint64
lastFIR uint64
firSeqno uint32
}
type rtpUpTrack struct { type rtpUpTrack struct {
track *webrtc.TrackRemote track *webrtc.TrackRemote
label string label string
rate *estimator.Estimator rate *estimator.Estimator
cache *packetcache.Cache cache *packetcache.Cache
jitter *jitter.Estimator jitter *jitter.Estimator
lastPLI uint64 atomics *upTrackAtomics
lastFIR uint64 cname atomic.Value
firSeqno uint32
localCh chan localTrackAction localCh chan localTrackAction
readerDone chan struct{} readerDone chan struct{}
mu sync.Mutex mu sync.Mutex
cname string
srTime uint64 srTime uint64
srNTPTime uint64 srNTPTime uint64
srRTPTime uint32 srRTPTime uint32
@ -477,6 +510,7 @@ func newUpConn(c group.Client, id string, labels map[string]string) (*rtpUpConne
cache: packetcache.New(minPacketCache(remote)), cache: packetcache.New(minPacketCache(remote)),
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate), jitter: jitter.New(remote.Codec().ClockRate),
atomics: &upTrackAtomics{},
localCh: make(chan localTrackAction, 2), localCh: make(chan localTrackAction, 2),
readerDone: make(chan struct{}), readerDone: make(chan struct{}),
} }
@ -505,12 +539,12 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error {
if !track.hasRtcpFb("nack", "pli") { if !track.hasRtcpFb("nack", "pli") {
return ErrUnsupportedFeedback return ErrUnsupportedFeedback
} }
last := atomic.LoadUint64(&track.lastPLI) last := atomic.LoadUint64(&track.atomics.lastPLI)
now := rtptime.Jiffies() now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/2 { if now >= last && now-last < rtptime.JiffiesPerSec/2 {
return ErrRateLimited return ErrRateLimited
} }
atomic.StoreUint64(&track.lastPLI, now) atomic.StoreUint64(&track.atomics.lastPLI, now)
return sendPLI(up.pc, track.track.SSRC()) return sendPLI(up.pc, track.track.SSRC())
} }
@ -525,20 +559,20 @@ func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error {
// to drop the packet due to rate limiting. // to drop the packet due to rate limiting.
var seqno uint8 var seqno uint8
if increment { if increment {
seqno = uint8(atomic.AddUint32(&track.firSeqno, 1) & 0xFF) seqno = uint8(atomic.AddUint32(&track.atomics.firSeqno, 1) & 0xFF)
} else { } else {
seqno = uint8(atomic.LoadUint32(&track.firSeqno) & 0xFF) seqno = uint8(atomic.LoadUint32(&track.atomics.firSeqno) & 0xFF)
} }
if !track.hasRtcpFb("ccm", "fir") { if !track.hasRtcpFb("ccm", "fir") {
return ErrUnsupportedFeedback return ErrUnsupportedFeedback
} }
last := atomic.LoadUint64(&track.lastFIR) last := atomic.LoadUint64(&track.atomics.lastFIR)
now := rtptime.Jiffies() now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/2 { if now >= last && now-last < rtptime.JiffiesPerSec/2 {
return ErrRateLimited return ErrRateLimited
} }
atomic.StoreUint64(&track.lastFIR, now) atomic.StoreUint64(&track.atomics.lastFIR, now)
return sendFIR(up.pc, track.track.SSRC(), seqno) return sendFIR(up.pc, track.track.SSRC(), seqno)
} }
@ -709,9 +743,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
if i.Type != rtcp.SDESCNAME { if i.Type != rtcp.SDESCNAME {
continue continue
} }
track.mu.Lock() track.cname.Store(i.Text)
track.cname = i.Text
track.mu.Unlock()
for _, l := range local { for _, l := range local {
l.SetCname(i.Text) l.SetCname(i.Text)
} }
@ -847,8 +879,7 @@ func sendSR(conn *rtpDownConnection) error {
var nowRTP uint32 var nowRTP uint32
remoteNTP := atomic.LoadUint64(&t.remoteNTPTime) remoteNTP, remoteRTP := t.getTimeOffset()
remoteRTP := atomic.LoadUint32(&t.remoteRTPTime)
if remoteNTP != 0 { if remoteNTP != 0 {
srTime := rtptime.NTPToTime(remoteNTP) srTime := rtptime.NTPToTime(remoteNTP)
d := now.Sub(srTime) d := now.Sub(srTime)
@ -868,12 +899,11 @@ func sendSR(conn *rtpDownConnection) error {
PacketCount: p, PacketCount: p,
OctetCount: b, OctetCount: b,
}) })
atomic.StoreUint64(&t.srTime, jiffies) t.setSRTime(jiffies, nowNTP)
atomic.StoreUint64(&t.srNTPTime, nowNTP)
} }
cname, ok := t.cname.Load().(string) cname, ok := t.cname.Load().(string)
if ok { if ok && cname != "" {
item := rtcp.SourceDescriptionItem{ item := rtcp.SourceDescriptionItem{
Type: rtcp.SDESCNAME, Type: rtcp.SDESCNAME,
Text: cname, Text: cname,
@ -1054,11 +1084,10 @@ func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint
if report.LastSenderReport != 0 { if report.LastSenderReport != 0 {
jiffies := rtptime.Jiffies() jiffies := rtptime.Jiffies()
srTime := atomic.LoadUint64(&track.srTime) srTime, srNTPTime := track.getSRTime()
if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec { if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec {
return return
} }
srNTPTime := atomic.LoadUint64(&track.srNTPTime)
if report.LastSenderReport == uint32(srNTPTime>>16) { if report.LastSenderReport == uint32(srNTPTime>>16) {
delay := uint64(report.Delay) * delay := uint64(report.Delay) *
(rtptime.JiffiesPerSec / 0x10000) (rtptime.JiffiesPerSec / 0x10000)
@ -1066,12 +1095,12 @@ func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint
return return
} }
rtt := (jiffies - srTime) - delay rtt := (jiffies - srTime) - delay
oldrtt := atomic.LoadUint64(&track.rtt) oldrtt := track.getRTT()
newrtt := rtt newrtt := rtt
if oldrtt > 0 { if oldrtt > 0 {
newrtt = (3*oldrtt + rtt) / 4 newrtt = (3*oldrtt + rtt) / 4
} }
atomic.StoreUint64(&track.rtt, newrtt) track.setRTT(newrtt)
} }
} }
} }
@ -1095,7 +1124,7 @@ func updateUpTrack(track *rtpUpTrack) {
_, j := ll.stats.Get(now) _, j := ll.stats.Get(now)
jitter := uint64(j) * jitter := uint64(j) *
(rtptime.JiffiesPerSec / uint64(clockrate)) (rtptime.JiffiesPerSec / uint64(clockrate))
rtt := atomic.LoadUint64(&ll.rtt) rtt := ll.getRTT()
rto := rtt + 4*jitter rto := rtt + 4*jitter
if rto > maxrto { if rto > maxrto {
maxrto = rto maxrto = rto

View File

@ -2,7 +2,6 @@ package rtpconn
import ( import (
"sort" "sort"
"sync/atomic"
"time" "time"
"github.com/jech/galene/rtptime" "github.com/jech/galene/rtptime"
@ -51,7 +50,7 @@ func (c *webClient) GetStats() *stats.Client {
} }
for _, t := range down.tracks { for _, t := range down.tracks {
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()
rtt := rtptime.ToDuration(atomic.LoadUint64(&t.rtt), rtt := rtptime.ToDuration(t.getRTT(),
rtptime.JiffiesPerSec) rtptime.JiffiesPerSec)
loss, jitter := t.stats.Get(jiffies) loss, jitter := t.stats.Get(jiffies)
j := time.Duration(jitter) * time.Second / j := time.Duration(jitter) * time.Second /

View File

@ -266,12 +266,12 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
track.mu.Lock() track.mu.Lock()
ntp := track.srNTPTime ntp := track.srNTPTime
rtp := track.srRTPTime rtp := track.srRTPTime
cname := track.cname
track.mu.Unlock() track.mu.Unlock()
if ntp != 0 { if ntp != 0 {
action.track.SetTimeOffset(ntp, rtp) action.track.SetTimeOffset(ntp, rtp)
} }
if cname != "" { cname, ok := track.cname.Load().(string)
if ok && cname != "" {
action.track.SetCname(cname) action.track.SetCname(cname)
} }

View File

@ -411,6 +411,7 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac
maxBitrate: new(bitrate), maxBitrate: new(bitrate),
stats: new(receiverStats), stats: new(receiverStats),
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
atomics: &downTrackAtomics{},
} }
conn.tracks = append(conn.tracks, track) conn.tracks = append(conn.tracks, track)
@ -1159,7 +1160,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
c.write(clientMessage{ c.write(clientMessage{
Type: "close", Type: "close",
Id: m.Id, Id: m.Id,
}); })
case "ice": case "ice":
if m.Candidate == nil { if m.Candidate == nil {
return group.ProtocolError("null candidate") return group.ProtocolError("null candidate")