1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-26 10:35:59 +01:00
galene/diskwriter/diskwriter.go
Juliusz Chroboczek 14a4303664 Rework connection replacement.
We used to signal connection replacement by reusing the same connection
id.  This turned out to be racy, as we couldn't reliably discard old
answers after a connection id was refused.

We now use a new id for every new connection, and explicitly signal
stream replacement in the offer message.  This requires maintaining a
local id on the client side.
2021-01-31 23:59:17 +01:00

595 lines
12 KiB
Go

package diskwriter
import (
crand "crypto/rand"
"encoding/hex"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/at-wat/ebml-go/webm"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3/pkg/media/samplebuilder"
"github.com/jech/galene/conn"
"github.com/jech/galene/group"
)
var Directory string
type Client struct {
group *group.Group
id string
mu sync.Mutex
down map[string]*diskConn
closed bool
}
func newId() string {
b := make([]byte, 16)
crand.Read(b)
return hex.EncodeToString(b)
}
func New(g *group.Group) *Client {
return &Client{group: g, id: newId()}
}
func (client *Client) Group() *group.Group {
return client.group
}
func (client *Client) Id() string {
return client.id
}
func (client *Client) Username() string {
return "RECORDING"
}
func (client *Client) Challenge(group string, cred group.ClientCredentials) bool {
return true
}
func (client *Client) OverridePermissions(g *group.Group) bool {
return true
}
func (client *Client) SetPermissions(perms group.ClientPermissions) {
return
}
func (client *Client) Permissions() group.ClientPermissions {
return group.ClientPermissions{}
}
func (client *Client) PushClient(id, username string, add bool) error {
return nil
}
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
for _, down := range client.down {
down.Close()
}
client.down = nil
client.closed = true
return nil
}
func (client *Client) Kick(id, user, message string) error {
err := client.Close()
group.DelClient(client)
return err
}
func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, replace string) error {
if client.group != g {
return nil
}
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
return errors.New("disk client is closed")
}
rp := client.down[replace]
if rp != nil {
rp.Close()
delete(client.down, replace)
} else {
log.Printf("Replacing unknown connection")
}
old := client.down[id]
if old != nil {
old.Close()
delete(client.down, id)
}
if up == nil {
return nil
}
directory := filepath.Join(Directory, client.group.Name())
err := os.MkdirAll(directory, 0700)
if err != nil {
g.WallOps("Write to disk: " + err.Error())
return err
}
if client.down == nil {
client.down = make(map[string]*diskConn)
}
down, err := newDiskConn(client, directory, up, tracks)
if err != nil {
g.WallOps("Write to disk: " + err.Error())
return err
}
client.down[up.Id()] = down
return nil
}
type diskConn struct {
client *Client
directory string
username string
hasVideo bool
mu sync.Mutex
file *os.File
remote conn.Up
tracks []*diskTrack
width, height uint32
lastWarning time.Time
}
// called locked
func (conn *diskConn) warn(message string) {
now := time.Now()
if now.Sub(conn.lastWarning) < 10*time.Second {
return
}
log.Println(message)
conn.client.group.WallOps(message)
conn.lastWarning = now
}
// called locked
func (conn *diskConn) reopen() error {
for _, t := range conn.tracks {
if t.writer != nil {
t.writer.Close()
t.writer = nil
}
}
conn.file = nil
file, err := openDiskFile(conn.directory, conn.username)
if err != nil {
return err
}
conn.file = file
return nil
}
func (conn *diskConn) Close() error {
conn.remote.DelLocal(conn)
conn.mu.Lock()
tracks := make([]*diskTrack, 0, len(conn.tracks))
for _, t := range conn.tracks {
if t.writer != nil {
t.writer.Close()
t.writer = nil
}
tracks = append(tracks, t)
}
conn.mu.Unlock()
for _, t := range tracks {
t.remote.DelLocal(t)
}
return nil
}
func openDiskFile(directory, username string) (*os.File, error) {
filenameFormat := "2006-01-02T15:04:05.000"
if runtime.GOOS == "windows" {
filenameFormat = "2006-01-02T15-04-05-000"
}
filename := time.Now().Format(filenameFormat)
if username != "" {
filename = filename + "-" + username
}
for counter := 0; counter < 100; counter++ {
var fn string
if counter == 0 {
fn = fmt.Sprintf("%v.webm", filename)
} else {
fn = fmt.Sprintf("%v-%02d.webm", filename, counter)
}
fn = filepath.Join(directory, fn)
f, err := os.OpenFile(
fn, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600,
)
if err == nil {
return f, nil
} else if !os.IsExist(err) {
return nil, err
}
}
return nil, errors.New("couldn't create file")
}
type diskTrack struct {
remote conn.UpTrack
conn *diskConn
writer webm.BlockWriteCloser
builder *samplebuilder.SampleBuilder
// bit 32 is a boolean indicating that the origin is valid
origin uint64
lastKf uint32
savedKf *rtp.Packet
}
func newDiskConn(client *Client, directory string, up conn.Up, remoteTracks []conn.UpTrack) (*diskConn, error) {
_, username := up.User()
conn := diskConn{
client: client,
directory: directory,
username: username,
tracks: make([]*diskTrack, 0, len(remoteTracks)),
remote: up,
}
for _, remote := range remoteTracks {
var builder *samplebuilder.SampleBuilder
codec := remote.Codec()
switch strings.ToLower(codec.MimeType) {
case "audio/opus":
builder = samplebuilder.New(
16, &codecs.OpusPacket{}, codec.ClockRate,
samplebuilder.WithPartitionHeadChecker(
&codecs.OpusPartitionHeadChecker{},
),
)
case "video/vp8":
if conn.hasVideo {
return nil, errors.New("multiple video tracks not supported")
}
builder = samplebuilder.New(
128, &codecs.VP8Packet{}, codec.ClockRate,
samplebuilder.WithPartitionHeadChecker(
&codecs.VP8PartitionHeadChecker{},
),
)
conn.hasVideo = true
case "video/vp9":
if conn.hasVideo {
return nil, errors.New("multiple video tracks not supported")
}
builder = samplebuilder.New(
128, &codecs.VP9Packet{}, codec.ClockRate,
samplebuilder.WithPartitionHeadChecker(
&codecs.VP9PartitionHeadChecker{},
),
)
conn.hasVideo = true
default:
client.group.WallOps(
"Cannot record codec " + codec.MimeType,
)
continue
}
track := &diskTrack{
remote: remote,
builder: builder,
conn: &conn,
}
conn.tracks = append(conn.tracks, track)
remote.AddLocal(track)
}
err := up.AddLocal(&conn)
if err != nil {
return nil, err
}
return &conn, nil
}
func (t *diskTrack) SetTimeOffset(ntp uint64, rtp uint32) {
}
func (t *diskTrack) SetCname(string) {
}
func clonePacket(packet *rtp.Packet) *rtp.Packet {
buf, err := packet.Marshal()
if err != nil {
return nil
}
var p rtp.Packet
err = p.Unmarshal(buf)
if err != nil {
return nil
}
return &p
}
func isKeyframe(codec string, data []byte) bool {
switch strings.ToLower(codec) {
case "video/vp8":
if len(data) < 1 {
return false
}
return (data[0] & 0x1) == 0
case "video/vp9":
if len(data) < 1 {
return false
}
if data[0]&0xC0 != 0x80 {
return false
}
profile := (data[0] >> 4) & 0x3
if profile != 3 {
return (data[0] & 0xC) == 0
}
return (data[0] & 0x6) == 0
default:
panic("Eek!")
}
}
func keyframeDimensions(codec string, data []byte, packet *rtp.Packet) (uint32, uint32) {
switch strings.ToLower(codec) {
case "video/vp8":
if len(data) < 10 {
return 0, 0
}
raw := uint32(data[6]) | uint32(data[7])<<8 |
uint32(data[8])<<16 | uint32(data[9])<<24
width := raw & 0x3FFF
height := (raw >> 16) & 0x3FFF
return width, height
case "video/vp9":
if packet == nil {
return 0, 0
}
var vp9 codecs.VP9Packet
_, err := vp9.Unmarshal(packet.Payload)
if err != nil {
return 0, 0
}
if(!vp9.V) {
return 0, 0
}
w := uint32(0)
h := uint32(0)
for i := range vp9.Width {
if i >= len(vp9.Height) {
break
}
if w < uint32(vp9.Width[i]) {
w = uint32(vp9.Width[i])
}
if h < uint32(vp9.Height[i]) {
h = uint32(vp9.Height[i])
}
}
return w, h
default:
return 0, 0
}
}
func (t *diskTrack) WriteRTP(packet *rtp.Packet) error {
// since we call initWriter, we take the connection lock for simplicity.
t.conn.mu.Lock()
defer t.conn.mu.Unlock()
if t.builder == nil {
return nil
}
codec := t.remote.Codec()
p := clonePacket(packet)
if p == nil {
return nil
}
if strings.ToLower(codec.MimeType) == "video/vp9" {
var vp9 codecs.VP9Packet
_, err := vp9.Unmarshal(p.Payload)
if err == nil && vp9.B && len(vp9.Payload) >= 1 {
profile := (vp9.Payload[0] >> 4) & 0x3
kf := false
if profile != 3 {
kf = (vp9.Payload[0] & 0xC) == 0
} else {
kf = (vp9.Payload[0] & 0x6) == 0
}
if kf {
t.savedKf = p
}
}
}
kfNeeded := false
t.builder.Push(p)
for {
sample, ts := t.builder.PopWithTimestamp()
if sample == nil {
if kfNeeded {
return conn.ErrKeyframeNeeded
}
return nil
}
keyframe := true
switch strings.ToLower(codec.MimeType) {
case "video/vp8", "video/vp9":
keyframe = isKeyframe(codec.MimeType, sample.Data)
if keyframe {
err := t.conn.initWriter(
keyframeDimensions(
codec.MimeType, sample.Data,
t.savedKf,
),
)
if err != nil {
t.conn.warn(
"Write to disk " + err.Error(),
)
return err
}
t.lastKf = ts
} else if t.writer != nil {
// Request a keyframe every 4s
delta := ts - t.lastKf
if (delta&0x80000000) != 0 || delta > 4*90000 {
kfNeeded = true
}
}
default:
if t.writer == nil {
if !t.conn.hasVideo {
err := t.conn.initWriter(0, 0)
if err != nil {
t.conn.warn(
"Write to disk " +
err.Error(),
)
return err
}
}
}
}
if t.writer == nil {
if !keyframe {
return conn.ErrKeyframeNeeded
}
return nil
}
if t.origin == 0 {
t.origin = uint64(ts) | (1 << 32)
}
ts -= uint32(t.origin)
tm := ts / (t.remote.Codec().ClockRate / 1000)
_, err := t.writer.Write(keyframe, int64(tm), sample.Data)
if err != nil {
return err
}
}
}
// called locked
func (conn *diskConn) initWriter(width, height uint32) error {
if conn.file != nil && width == conn.width && height == conn.height {
return nil
}
var entries []webm.TrackEntry
for i, t := range conn.tracks {
var entry webm.TrackEntry
codec := t.remote.Codec()
switch strings.ToLower(codec.MimeType) {
case "audio/opus":
entry = webm.TrackEntry{
Name: "Audio",
TrackNumber: uint64(i + 1),
CodecID: "A_OPUS",
TrackType: 2,
Audio: &webm.Audio{
SamplingFrequency: float64(codec.ClockRate),
Channels: uint64(codec.Channels),
},
}
case "video/vp8":
entry = webm.TrackEntry{
Name: "Video",
TrackNumber: uint64(i + 1),
CodecID: "V_VP8",
TrackType: 1,
Video: &webm.Video{
PixelWidth: uint64(width),
PixelHeight: uint64(height),
},
}
case "video/vp9":
entry = webm.TrackEntry{
Name: "Video",
TrackNumber: uint64(i + 1),
CodecID: "V_VP9",
TrackType: 1,
Video: &webm.Video{
PixelWidth: uint64(width),
PixelHeight: uint64(height),
},
}
default:
return errors.New("unknown track type")
}
entries = append(entries, entry)
}
err := conn.reopen()
if err != nil {
return err
}
writers, err := webm.NewSimpleBlockWriter(conn.file, entries)
if err != nil {
conn.file.Close()
conn.file = nil
return err
}
if len(writers) != len(conn.tracks) {
conn.file.Close()
conn.file = nil
return errors.New("unexpected number of writers")
}
conn.width = width
conn.height = height
for i, t := range conn.tracks {
t.writer = writers[i]
}
return nil
}
func (down *diskConn) GetMaxBitrate(now uint64) uint64 {
return ^uint64(0)
}
func (t *diskTrack) Accumulate(bytes uint32) {
return
}