1
Fork 0

Merge packet list and window into cache.

This commit is contained in:
Juliusz Chroboczek 2020-04-29 11:06:39 +02:00
parent a813cc9ce4
commit e2d89c7c17
8 changed files with 271 additions and 295 deletions

View File

@ -16,8 +16,7 @@ import (
"sync/atomic"
"time"
"sfu/packetlist"
"sfu/packetwindow"
"sfu/packetcache"
"github.com/gorilla/websocket"
"github.com/pion/rtcp"
@ -288,7 +287,7 @@ func addUpConn(c *client, id string) (*upConnection, error) {
}
track := &upTrack{
track: remote,
list: packetlist.New(32),
cache: packetcache.New(32),
maxBitrate: ^uint64(0),
}
u.tracks = append(u.tracks, track)
@ -310,11 +309,10 @@ func addUpConn(c *client, id string) (*upConnection, error) {
}
func upLoop(conn *upConnection, track *upTrack) {
buf := make([]byte, packetlist.BufSize)
buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet
var local []*downTrack
var localTime time.Time
window := packetwindow.New()
for {
now := time.Now()
if now.Sub(localTime) > time.Second/2 {
@ -336,9 +334,9 @@ func upLoop(conn *upConnection, track *upTrack) {
continue
}
window.Set(packet.SequenceNumber)
if packet.SequenceNumber-window.First() > 24 {
first, bitmap := window.Get17()
first := track.cache.Store(packet.SequenceNumber, buf[:i])
if packet.SequenceNumber-first > 24 {
first, bitmap := track.cache.BitmapGet()
if bitmap != ^uint16(0) {
err := conn.sendNACK(track, first, ^bitmap)
if err != nil {
@ -347,8 +345,6 @@ func upLoop(conn *upConnection, track *upTrack) {
}
}
track.list.Store(packet.SequenceNumber, buf[:i])
for _, l := range local {
if l.muted() {
continue
@ -648,7 +644,7 @@ func sendRecovery(p *rtcp.TransportLayerNack, track *downTrack) {
var packet rtp.Packet
for _, nack := range p.Nacks {
for _, seqno := range nack.PacketList() {
raw := track.remote.list.Get(seqno)
raw := track.remote.cache.Get(seqno)
if raw != nil {
err := packet.Unmarshal(raw)
if err != nil {

View File

@ -15,14 +15,14 @@ import (
"sync/atomic"
"time"
"sfu/packetlist"
"sfu/packetcache"
"github.com/pion/webrtc/v2"
)
type upTrack struct {
track *webrtc.Track
list *packetlist.List
cache *packetcache.Cache
maxBitrate uint64
lastPLI uint64

104
packetcache/packetcache.go Normal file
View File

@ -0,0 +1,104 @@
package packetcache
import (
"sync"
)
const BufSize = 1500
type entry struct {
seqno uint16
length int
buf [BufSize]byte
}
type Cache struct {
mu sync.Mutex
first uint16 // the first seqno
bitmap uint32
tail int // the next entry to be rewritten
entries []entry
}
func New(capacity int) *Cache {
return &Cache{
entries: make([]entry, capacity),
}
}
// Set a bit in the bitmap, shifting first if necessary.
func (cache *Cache) set(seqno uint16) {
if cache.bitmap == 0 {
cache.first = seqno
cache.bitmap = 1
return
}
if ((seqno - cache.first) & 0x8000) != 0 {
return
}
if seqno == cache.first {
cache.bitmap >>= 1
cache.first += 1
for (cache.bitmap & 1) == 1 {
cache.bitmap >>= 1
cache.first += 1
}
return
}
if seqno-cache.first < 32 {
cache.bitmap |= (1 << uint16(seqno-cache.first))
return
}
shift := seqno - cache.first - 31
cache.bitmap >>= shift
cache.first += shift
cache.bitmap |= (1 << uint16(seqno-cache.first))
return
}
// Store a packet, setting bitmap at the same time
func (cache *Cache) Store(seqno uint16, buf []byte) uint16 {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.set(seqno)
cache.entries[cache.tail].seqno = seqno
copy(cache.entries[cache.tail].buf[:], buf)
cache.entries[cache.tail].length = len(buf)
cache.tail = (cache.tail + 1) % len(cache.entries)
return cache.first
}
func (cache *Cache) Get(seqno uint16) []byte {
cache.mu.Lock()
defer cache.mu.Unlock()
for i := range cache.entries {
if cache.entries[i].length == 0 ||
cache.entries[i].seqno != seqno {
continue
}
buf := make([]byte, cache.entries[i].length)
copy(buf, cache.entries[i].buf[:])
return buf
}
return nil
}
// Shift 17 bits out of the bitmap, return first index and remaining 16.
func (cache *Cache) BitmapGet() (uint16, uint16) {
cache.mu.Lock()
defer cache.mu.Unlock()
first := cache.first
bitmap := uint16((cache.bitmap >> 1) & 0xFFFF)
cache.bitmap >>= 17
cache.first += 17
return first, bitmap
}

View File

@ -0,0 +1,158 @@
package packetcache
import (
"bytes"
"math/rand"
"testing"
"github.com/pion/rtcp"
)
func randomBuf() []byte {
length := rand.Int31n(BufSize-1) + 1
buf := make([]byte, length)
rand.Read(buf)
return buf
}
func TestCache(t *testing.T) {
buf1 := randomBuf()
buf2 := randomBuf()
cache := New(16)
cache.Store(13, buf1)
cache.Store(17, buf2)
if bytes.Compare(cache.Get(13), buf1) != 0 {
t.Errorf("Couldn't get 13")
}
if bytes.Compare(cache.Get(17), buf2) != 0 {
t.Errorf("Couldn't get 17")
}
if cache.Get(42) != nil {
t.Errorf("Creation ex nihilo")
}
}
func TestCacheOverflow(t *testing.T) {
cache := New(16)
for i := 0; i < 32; i++ {
cache.Store(uint16(i), []byte{uint8(i)})
}
for i := 0; i < 32; i++ {
buf := cache.Get(uint16(i))
if i < 16 {
if buf != nil {
t.Errorf("Creation ex nihilo: %v", i)
}
} else {
if len(buf) != 1 || buf[0] != uint8(i) {
t.Errorf("Expected [%v], got %v", i, buf)
}
}
}
}
func TestBitmap(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
packet := make([]byte, 1)
cache := New(16)
var first uint16
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
first = cache.Store(uint16(42 + i), packet)
}
}
value >>= uint16(first - 42)
if uint32(value) != cache.bitmap {
t.Errorf("Got %b, expected %b", cache.bitmap, value)
}
}
func TestBitmapWrap(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
packet := make([]byte, 1)
cache := New(16)
cache.Store(0x7000, packet)
cache.Store(0xA000, packet)
var first uint16
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
first = cache.Store(uint16(42 + i), packet)
}
}
value >>= uint16(first - 42)
if uint32(value) != cache.bitmap {
t.Errorf("Got %b, expected %b", cache.bitmap, value)
}
}
func TestBitmapGet(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
packet := make([]byte, 1)
cache := New(16)
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
cache.Store(uint16(42 + i), packet)
}
}
pos := uint16(42)
for cache.bitmap != 0 {
first, bitmap := cache.BitmapGet()
if first < pos || first >= pos+64 {
t.Errorf("First is %v, pos is %v", first, pos)
}
value >>= (first - pos)
pos = first
if (value & 1) != 0 {
t.Errorf("Value is odd")
}
value >>= 1
pos += 1
if bitmap != uint16(value&0xFFFF) {
t.Errorf("Got %b, expected %b", bitmap, (value & 0xFFFF))
}
value >>= 16
pos += 16
}
if value != 0 {
t.Errorf("Value is %v", value)
}
}
func TestBitmapPacket(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
packet := make([]byte, 1)
cache := New(16)
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
cache.Store(uint16(42 + i), packet)
}
}
first, bitmap := cache.BitmapGet()
p := rtcp.NackPair{first, rtcp.PacketBitmap(^bitmap)}
pl := p.PacketList()
for _, s := range pl {
if s < 42 || s >= 42+64 {
if (value & (1 << (s - 42))) != 0 {
t.Errorf("Bit %v unexpectedly set", s-42)
}
}
}
}

View File

@ -1,51 +0,0 @@
package packetlist
import (
"sync"
)
const BufSize = 1500
type entry struct {
seqno uint16
length int
buf [BufSize]byte
}
type List struct {
mu sync.Mutex
tail int
entries []entry
}
func New(capacity int) *List {
return &List{
entries: make([]entry, capacity),
}
}
func (list *List) Store(seqno uint16, buf []byte) {
list.mu.Lock()
defer list.mu.Unlock()
list.entries[list.tail].seqno = seqno
copy(list.entries[list.tail].buf[:], buf)
list.entries[list.tail].length = len(buf)
list.tail = (list.tail + 1) % len(list.entries)
}
func (list *List) Get(seqno uint16) []byte {
list.mu.Lock()
defer list.mu.Unlock()
for i := range list.entries {
if list.entries[i].length == 0 ||
list.entries[i].seqno != seqno {
continue
}
buf := make([]byte, list.entries[i].length)
copy(buf, list.entries[i].buf[:])
return buf
}
return nil
}

View File

@ -1,53 +0,0 @@
package packetlist
import (
"bytes"
"math/rand"
"testing"
)
func randomBuf() []byte {
length := rand.Int31n(BufSize-1) + 1
buf := make([]byte, length)
rand.Read(buf)
return buf
}
func TestList(t *testing.T) {
buf1 := randomBuf()
buf2 := randomBuf()
list := New(16)
list.Store(13, buf1)
list.Store(17, buf2)
if bytes.Compare(list.Get(13), buf1) != 0 {
t.Errorf("Couldn't get 13")
}
if bytes.Compare(list.Get(17), buf2) != 0 {
t.Errorf("Couldn't get 17")
}
if list.Get(42) != nil {
t.Errorf("Creation ex nihilo")
}
}
func TestOverflow(t *testing.T) {
list := New(16)
for i := 0; i < 32; i++ {
list.Store(uint16(i), []byte{uint8(i)})
}
for i := 0; i < 32; i++ {
buf := list.Get(uint16(i))
if i < 16 {
if buf != nil {
t.Errorf("Creation ex nihilo: %v", i)
}
} else {
if len(buf) != 1 || buf[0] != uint8(i) {
t.Errorf("Expected [%v], got %v", i, buf)
}
}
}
}

View File

@ -1,76 +0,0 @@
package packetwindow
import (
"fmt"
)
type Window struct {
first uint16
bitmap uint32
}
func New() *Window {
return &Window{}
}
func (w *Window) String() string {
buf := make([]byte, 32)
for i := 0; i < 32; i++ {
if (w.bitmap & (1 << i)) != 0 {
buf[i] = '1'
} else {
buf[i] = '0'
}
}
return fmt.Sprintf("[%04x %s]", w.first, buf)
}
func (w *Window) First() uint16 {
return w.first
}
func (w *Window) Reset() {
w.bitmap = 0
}
func (w *Window) Set(seqno uint16) {
if w.bitmap == 0 {
w.first = seqno
w.bitmap = 1
return
}
if ((seqno - w.first) & 0x8000) != 0 {
return
}
if seqno == w.first {
w.bitmap >>= 1
w.first += 1
for (w.bitmap & 1) == 1 {
w.bitmap >>= 1
w.first += 1
}
return
}
if seqno - w.first < 32 {
w.bitmap |= (1 << uint16(seqno - w.first))
return
}
shift := seqno - w.first - 31
w.bitmap >>= shift
w.first += shift
w.bitmap |= (1 << uint16(seqno - w.first))
return
}
func (w *Window) Get17() (uint16, uint16) {
first := w.first
bitmap := uint16((w.bitmap >> 1) & 0xFFFF)
w.bitmap >>= 17
w.first += 17
return first, bitmap
}

View File

@ -1,102 +0,0 @@
package packetwindow
import (
"testing"
"github.com/pion/rtcp"
)
func TestWindow(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
value >>= uint16(w.first - 42)
if uint32(value) != w.bitmap {
t.Errorf("Got %b, expected %b", w.bitmap, value)
}
}
func TestWindowWrap(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
w.Set(0x7000)
w.Set(0xA000)
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
value >>= uint16(w.first - 42)
if uint32(value) != w.bitmap {
t.Errorf("Got %b, expected %b", w.bitmap, value)
}
}
func TestWindowGet(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
pos := uint16(42)
for w.bitmap != 0 {
first, bitmap := w.Get17()
if first < pos || first >= pos+64 {
t.Errorf("First is %v, pos is %v", first, pos)
}
value >>= (first - pos)
pos = first
if (value & 1) != 0 {
t.Errorf("Value is odd")
}
value >>= 1
pos += 1
if bitmap != uint16(value&0xFFFF) {
t.Errorf("Got %b, expected %b", bitmap, (value & 0xFFFF))
}
value >>= 16
pos += 16
}
if value != 0 {
t.Errorf("Value is %v", value)
}
}
func TestWindowPacket(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
first, bitmap := w.Get17()
p := rtcp.NackPair{first, rtcp.PacketBitmap(^bitmap)}
list := p.PacketList()
for _, s := range list {
if s < 42 || s >= 42 + 64 {
if (value & (1 << (s - 42))) != 0 {
t.Errorf("Bit %v unexpectedly set", s - 42)
}
}
}
}