diff --git a/rtpconn/rtpconn.go b/rtpconn/rtpconn.go index e1d165d..9212f82 100644 --- a/rtpconn/rtpconn.go +++ b/rtpconn/rtpconn.go @@ -66,19 +66,23 @@ type iceConnection interface { flushICECandidates() error } +type downTrackAtomics struct { + rtt uint64 + sr uint64 + srNTP uint64 + remoteNTP uint64 + remoteRTP uint32 +} + type rtpDownTrack struct { - track *webrtc.TrackLocalStaticRTP - remote conn.UpTrack - ssrc webrtc.SSRC - maxBitrate *bitrate - rate *estimator.Estimator - stats *receiverStats - srTime uint64 - srNTPTime uint64 - remoteNTPTime uint64 - remoteRTPTime uint32 - cname atomic.Value - rtt uint64 + track *webrtc.TrackLocalStaticRTP + remote conn.UpTrack + ssrc webrtc.SSRC + maxBitrate *bitrate + rate *estimator.Estimator + stats *receiverStats + atomics *downTrackAtomics + cname atomic.Value } func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error { @@ -90,8 +94,33 @@ func (down *rtpDownTrack) Accumulate(bytes uint32) { } func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) { - atomic.StoreUint64(&down.remoteNTPTime, ntp) - atomic.StoreUint32(&down.remoteRTPTime, rtp) + atomic.StoreUint64(&down.atomics.remoteNTP, ntp) + atomic.StoreUint32(&down.atomics.remoteRTP, rtp) +} + +func (down *rtpDownTrack) getTimeOffset() (uint64, uint32) { + ntp := atomic.LoadUint64(&down.atomics.remoteNTP) + rtp := atomic.LoadUint32(&down.atomics.remoteRTP) + return ntp, rtp +} + +func (down *rtpDownTrack) getRTT() uint64 { + return atomic.LoadUint64(&down.atomics.rtt) +} + +func (down *rtpDownTrack) setRTT(rtt uint64) { + atomic.StoreUint64(&down.atomics.rtt, rtt) +} + +func (down *rtpDownTrack) getSRTime() (uint64, uint64) { + tm := atomic.LoadUint64(&down.atomics.sr) + ntp := atomic.LoadUint64(&down.atomics.srNTP) + return tm, ntp +} + +func (down *rtpDownTrack) setSRTime(tm uint64, ntp uint64) { + atomic.StoreUint64(&down.atomics.sr, tm) + atomic.StoreUint64(&down.atomics.srNTP, ntp) } func (down *rtpDownTrack) SetCname(cname string) { @@ -177,21 +206,25 @@ func (down *rtpDownConnection) flushICECandidates() error { return err } -type rtpUpTrack struct { - track *webrtc.TrackRemote - label string - rate *estimator.Estimator - cache *packetcache.Cache - jitter *jitter.Estimator +type upTrackAtomics struct { lastPLI uint64 lastFIR uint64 firSeqno uint32 +} + +type rtpUpTrack struct { + track *webrtc.TrackRemote + label string + rate *estimator.Estimator + cache *packetcache.Cache + jitter *jitter.Estimator + atomics *upTrackAtomics + cname atomic.Value localCh chan localTrackAction readerDone chan struct{} mu sync.Mutex - cname string srTime uint64 srNTPTime uint64 srRTPTime uint32 @@ -477,6 +510,7 @@ func newUpConn(c group.Client, id string, labels map[string]string) (*rtpUpConne cache: packetcache.New(minPacketCache(remote)), rate: estimator.New(time.Second), jitter: jitter.New(remote.Codec().ClockRate), + atomics: &upTrackAtomics{}, localCh: make(chan localTrackAction, 2), readerDone: make(chan struct{}), } @@ -505,12 +539,12 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error { if !track.hasRtcpFb("nack", "pli") { return ErrUnsupportedFeedback } - last := atomic.LoadUint64(&track.lastPLI) + last := atomic.LoadUint64(&track.atomics.lastPLI) now := rtptime.Jiffies() if now >= last && now-last < rtptime.JiffiesPerSec/2 { return ErrRateLimited } - atomic.StoreUint64(&track.lastPLI, now) + atomic.StoreUint64(&track.atomics.lastPLI, now) return sendPLI(up.pc, track.track.SSRC()) } @@ -525,20 +559,20 @@ func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error { // to drop the packet due to rate limiting. var seqno uint8 if increment { - seqno = uint8(atomic.AddUint32(&track.firSeqno, 1) & 0xFF) + seqno = uint8(atomic.AddUint32(&track.atomics.firSeqno, 1) & 0xFF) } else { - seqno = uint8(atomic.LoadUint32(&track.firSeqno) & 0xFF) + seqno = uint8(atomic.LoadUint32(&track.atomics.firSeqno) & 0xFF) } if !track.hasRtcpFb("ccm", "fir") { return ErrUnsupportedFeedback } - last := atomic.LoadUint64(&track.lastFIR) + last := atomic.LoadUint64(&track.atomics.lastFIR) now := rtptime.Jiffies() if now >= last && now-last < rtptime.JiffiesPerSec/2 { return ErrRateLimited } - atomic.StoreUint64(&track.lastFIR, now) + atomic.StoreUint64(&track.atomics.lastFIR, now) return sendFIR(up.pc, track.track.SSRC(), seqno) } @@ -709,9 +743,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei if i.Type != rtcp.SDESCNAME { continue } - track.mu.Lock() - track.cname = i.Text - track.mu.Unlock() + track.cname.Store(i.Text) for _, l := range local { l.SetCname(i.Text) } @@ -847,8 +879,7 @@ func sendSR(conn *rtpDownConnection) error { var nowRTP uint32 - remoteNTP := atomic.LoadUint64(&t.remoteNTPTime) - remoteRTP := atomic.LoadUint32(&t.remoteRTPTime) + remoteNTP, remoteRTP := t.getTimeOffset() if remoteNTP != 0 { srTime := rtptime.NTPToTime(remoteNTP) d := now.Sub(srTime) @@ -868,12 +899,11 @@ func sendSR(conn *rtpDownConnection) error { PacketCount: p, OctetCount: b, }) - atomic.StoreUint64(&t.srTime, jiffies) - atomic.StoreUint64(&t.srNTPTime, nowNTP) + t.setSRTime(jiffies, nowNTP) } cname, ok := t.cname.Load().(string) - if ok { + if ok && cname != "" { item := rtcp.SourceDescriptionItem{ Type: rtcp.SDESCNAME, Text: cname, @@ -1054,11 +1084,10 @@ func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint if report.LastSenderReport != 0 { jiffies := rtptime.Jiffies() - srTime := atomic.LoadUint64(&track.srTime) + srTime, srNTPTime := track.getSRTime() if jiffies < srTime || jiffies-srTime > 8*rtptime.JiffiesPerSec { return } - srNTPTime := atomic.LoadUint64(&track.srNTPTime) if report.LastSenderReport == uint32(srNTPTime>>16) { delay := uint64(report.Delay) * (rtptime.JiffiesPerSec / 0x10000) @@ -1066,12 +1095,12 @@ func handleReport(track *rtpDownTrack, report rtcp.ReceptionReport, jiffies uint return } rtt := (jiffies - srTime) - delay - oldrtt := atomic.LoadUint64(&track.rtt) + oldrtt := track.getRTT() newrtt := rtt if oldrtt > 0 { newrtt = (3*oldrtt + rtt) / 4 } - atomic.StoreUint64(&track.rtt, newrtt) + track.setRTT(newrtt) } } } @@ -1095,7 +1124,7 @@ func updateUpTrack(track *rtpUpTrack) { _, j := ll.stats.Get(now) jitter := uint64(j) * (rtptime.JiffiesPerSec / uint64(clockrate)) - rtt := atomic.LoadUint64(&ll.rtt) + rtt := ll.getRTT() rto := rtt + 4*jitter if rto > maxrto { maxrto = rto diff --git a/rtpconn/rtpstats.go b/rtpconn/rtpstats.go index 9397d55..3e9f57e 100644 --- a/rtpconn/rtpstats.go +++ b/rtpconn/rtpstats.go @@ -2,7 +2,6 @@ package rtpconn import ( "sort" - "sync/atomic" "time" "github.com/jech/galene/rtptime" @@ -51,7 +50,7 @@ func (c *webClient) GetStats() *stats.Client { } for _, t := range down.tracks { rate, _ := t.rate.Estimate() - rtt := rtptime.ToDuration(atomic.LoadUint64(&t.rtt), + rtt := rtptime.ToDuration(t.getRTT(), rtptime.JiffiesPerSec) loss, jitter := t.stats.Get(jiffies) j := time.Duration(jitter) * time.Second / diff --git a/rtpconn/rtpwriter.go b/rtpconn/rtpwriter.go index ad01df1..7008cc2 100644 --- a/rtpconn/rtpwriter.go +++ b/rtpconn/rtpwriter.go @@ -266,12 +266,12 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { track.mu.Lock() ntp := track.srNTPTime rtp := track.srRTPTime - cname := track.cname track.mu.Unlock() if ntp != 0 { action.track.SetTimeOffset(ntp, rtp) } - if cname != "" { + cname, ok := track.cname.Load().(string) + if ok && cname != "" { action.track.SetCname(cname) } diff --git a/rtpconn/webclient.go b/rtpconn/webclient.go index 6b3f1f3..20b3da5 100644 --- a/rtpconn/webclient.go +++ b/rtpconn/webclient.go @@ -411,6 +411,7 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac maxBitrate: new(bitrate), stats: new(receiverStats), rate: estimator.New(time.Second), + atomics: &downTrackAtomics{}, } conn.tracks = append(conn.tracks, track) @@ -1159,7 +1160,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { c.write(clientMessage{ Type: "close", Id: m.Id, - }); + }) case "ice": if m.Candidate == nil { return group.ProtocolError("null candidate") @@ -1415,11 +1416,11 @@ func (c *webClient) Warn(oponly bool, message string) error { } return c.write(clientMessage{ - Type: "usermessage", - Kind: "warning", - Dest: c.id, + Type: "usermessage", + Kind: "warning", + Dest: c.id, Privileged: true, - Value: &message, + Value: &message, }) }