1
Fork 0
mirror of https://github.com/jech/galene.git synced 2024-11-10 02:35:58 +01:00
galene/diskwriter/diskwriter.go
Juliusz Chroboczek 24187430e8 Rename client status to data, add group data.
We now distinguish between status, which is maintained by the server,
and data, which is provided by the client.  In addition to client data,
we now support group data.
2022-01-29 23:28:08 +01:00

664 lines
14 KiB
Go

package diskwriter
import (
crand "crypto/rand"
"encoding/hex"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/at-wat/ebml-go/mkvcore"
"github.com/at-wat/ebml-go/webm"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/jech/samplebuilder"
gcodecs "github.com/jech/galene/codecs"
"github.com/jech/galene/conn"
"github.com/jech/galene/group"
)
var Directory string
type Client struct {
group *group.Group
id string
mu sync.Mutex
down map[string]*diskConn
closed bool
}
func newId() string {
b := make([]byte, 16)
crand.Read(b)
return hex.EncodeToString(b)
}
func New(g *group.Group) *Client {
return &Client{group: g, id: newId()}
}
func (client *Client) Group() *group.Group {
return client.group
}
func (client *Client) Id() string {
return client.id
}
func (client *Client) Username() string {
return "RECORDING"
}
func (client *Client) SetPermissions(perms group.ClientPermissions) {
return
}
func (client *Client) Permissions() group.ClientPermissions {
return group.ClientPermissions{
System: true,
}
}
func (client *Client) Data() map[string]interface{} {
return nil
}
func (client *Client) PushClient(group, kind, id, username string, permissions group.ClientPermissions, data map[string]interface{}) error {
return nil
}
func (client *Client) RequestConns(target group.Client, g *group.Group, id string) error {
return nil
}
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
for _, down := range client.down {
down.Close()
}
client.down = nil
client.closed = true
return nil
}
func (client *Client) Kick(id, user, message string) error {
err := client.Close()
group.DelClient(client)
return err
}
func (client *Client) Joined(group, kind string) error {
return nil
}
func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, replace string) error {
if client.group != g {
return nil
}
client.mu.Lock()
defer client.mu.Unlock()
if client.closed {
return errors.New("disk client is closed")
}
if replace != "" {
rp := client.down[replace]
if rp != nil {
rp.Close()
delete(client.down, replace)
} else {
log.Printf("Disk writer: replacing unknown connection")
}
}
old := client.down[id]
if old != nil {
old.Close()
delete(client.down, id)
}
if up == nil {
return nil
}
directory := filepath.Join(Directory, client.group.Name())
err := os.MkdirAll(directory, 0700)
if err != nil {
g.WallOps("Write to disk: " + err.Error())
return err
}
if client.down == nil {
client.down = make(map[string]*diskConn)
}
down, err := newDiskConn(client, directory, up, tracks)
if err != nil {
g.WallOps("Write to disk: " + err.Error())
return err
}
client.down[up.Id()] = down
return nil
}
type diskConn struct {
client *Client
directory string
username string
hasVideo bool
mu sync.Mutex
file *os.File
remote conn.Up
tracks []*diskTrack
width, height uint32
lastWarning time.Time
}
// called locked
func (conn *diskConn) warn(message string) {
now := time.Now()
if now.Sub(conn.lastWarning) < 10*time.Second {
return
}
log.Println(message)
conn.client.group.WallOps(message)
conn.lastWarning = now
}
// called locked
func (conn *diskConn) reopen(extension string) error {
for _, t := range conn.tracks {
t.writeBuffered(true)
if t.writer != nil {
t.writer.Close()
t.writer = nil
}
}
conn.file = nil
file, err := openDiskFile(conn.directory, conn.username, extension)
if err != nil {
return err
}
conn.file = file
return nil
}
func (conn *diskConn) Close() error {
conn.remote.DelLocal(conn)
conn.mu.Lock()
tracks := make([]*diskTrack, 0, len(conn.tracks))
for _, t := range conn.tracks {
t.writeBuffered(true)
if t.writer != nil {
t.writer.Close()
t.writer = nil
}
tracks = append(tracks, t)
}
conn.mu.Unlock()
for _, t := range tracks {
t.remote.DelLocal(t)
}
return nil
}
func openDiskFile(directory, username, extension string) (*os.File, error) {
filenameFormat := "2006-01-02T15:04:05.000"
if runtime.GOOS == "windows" {
filenameFormat = "2006-01-02T15-04-05-000"
}
filename := time.Now().Format(filenameFormat)
if username != "" {
filename = filename + "-" + username
}
for counter := 0; counter < 100; counter++ {
var fn string
if counter == 0 {
fn = fmt.Sprintf("%v.%v", filename, extension)
} else {
fn = fmt.Sprintf("%v-%02d.%v",
filename, counter, extension,
)
}
fn = filepath.Join(directory, fn)
f, err := os.OpenFile(
fn, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600,
)
if err == nil {
return f, nil
} else if !os.IsExist(err) {
return nil, err
}
}
return nil, errors.New("couldn't create file")
}
type maybeUint32 uint64
const none maybeUint32 = 0
func some(value uint32) maybeUint32 {
return maybeUint32(uint64(1<<32) | uint64(value))
}
func valid(m maybeUint32) bool {
return (m & (1 << 32)) != 0
}
func value(m maybeUint32) uint32 {
return uint32(m)
}
type diskTrack struct {
remote conn.UpTrack
conn *diskConn
writer mkvcore.BlockWriteCloser
builder *samplebuilder.SampleBuilder
lastSeqno maybeUint32
origin maybeUint32
kfRequested time.Time
lastKf time.Time
savedKf *rtp.Packet
}
func newDiskConn(client *Client, directory string, up conn.Up, remoteTracks []conn.UpTrack) (*diskConn, error) {
var audio, video conn.UpTrack
for _, remote := range remoteTracks {
codec := remote.Codec().MimeType
if strings.EqualFold(codec, "audio/opus") {
if audio == nil {
audio = remote
} else {
client.group.WallOps("Multiple audio tracks, recording just one")
}
} else if strings.EqualFold(codec, "video/vp8") ||
strings.EqualFold(codec, "video/vp9") ||
strings.EqualFold(codec, "video/h264") {
if video == nil || video.Label() == "l" {
video = remote
} else if remote.Label() != "l" {
client.group.WallOps("Multiple video tracks, recording just one")
}
} else {
client.group.WallOps("Unknown codec, " + codec + ", not recording")
}
}
if video == nil && audio == nil {
return nil, errors.New("no usable tracks found")
}
tracks := make([]conn.UpTrack, 0, 2)
if audio != nil {
tracks = append(tracks, audio)
}
if video != nil {
tracks = append(tracks, video)
}
_, username := up.User()
conn := diskConn{
client: client,
directory: directory,
username: username,
tracks: make([]*diskTrack, 0, len(tracks)),
remote: up,
}
for _, remote := range tracks {
var builder *samplebuilder.SampleBuilder
codec := remote.Codec()
if strings.EqualFold(codec.MimeType, "audio/opus") {
builder = samplebuilder.New(
32, &codecs.OpusPacket{}, codec.ClockRate,
)
} else if strings.EqualFold(codec.MimeType, "video/vp8") {
builder = samplebuilder.New(
1024, &codecs.VP8Packet{}, codec.ClockRate,
)
conn.hasVideo = true
} else if strings.EqualFold(codec.MimeType, "video/vp9") {
builder = samplebuilder.New(
1024, &codecs.VP9Packet{}, codec.ClockRate,
)
conn.hasVideo = true
} else if strings.EqualFold(codec.MimeType, "video/h264") {
builder = samplebuilder.New(
1024, &codecs.H264Packet{}, codec.ClockRate,
)
conn.hasVideo = true
} else {
// this shouldn't happen
return nil, errors.New(
"cannot record codec " + codec.MimeType,
)
}
track := &diskTrack{
remote: remote,
builder: builder,
conn: &conn,
}
conn.tracks = append(conn.tracks, track)
}
// Only do this after all tracks have been added to conn, to avoid
// racing on hasVideo.
for _, t := range conn.tracks {
err := t.remote.AddLocal(t)
if err != nil {
log.Printf("Couldn't add disk track: %v", err)
conn.warn("Couldn't add disk track: " + err.Error())
}
}
err := up.AddLocal(&conn)
if err != nil {
return nil, err
}
return &conn, nil
}
func (t *diskTrack) SetTimeOffset(ntp uint64, rtp uint32) {
}
func (t *diskTrack) SetCname(string) {
}
func (t *diskTrack) Write(buf []byte) (int, error) {
t.conn.mu.Lock()
defer t.conn.mu.Unlock()
if t.builder == nil {
return 0, nil
}
// samplebuilder retains packets
data := make([]byte, len(buf))
copy(data, buf)
p := new(rtp.Packet)
err := p.Unmarshal(data)
if err != nil {
log.Printf("Diskwriter: %v", err)
return 0, nil
}
if valid(t.lastSeqno) {
lastSeqno := uint16(value(t.lastSeqno))
if ((p.SequenceNumber - lastSeqno) & 0x8000) == 0 {
// jump forward
count := p.SequenceNumber - lastSeqno
if count < 256 {
for i := uint16(1); i < count; i++ {
fetch(t, lastSeqno+i)
}
} else {
requestKeyframe(t)
}
t.lastSeqno = some(uint32(p.SequenceNumber))
} else {
// jump backward
count := lastSeqno - p.SequenceNumber
if count >= 512 {
t.lastSeqno = none
requestKeyframe(t)
}
}
} else {
t.lastSeqno = some(uint32(p.SequenceNumber))
}
err = t.writeRTP(p)
if err != nil {
return 0, err
}
return len(buf), nil
}
func fetch(t *diskTrack, seqno uint16) {
// since the samplebuilder retains packets, use a fresh buffer
buf := make([]byte, 1504)
n := t.remote.GetPacket(seqno, buf, false)
if n == 0 {
return
}
p := new(rtp.Packet)
err := p.Unmarshal(buf)
if err != nil {
return
}
t.writeRTP(p)
}
func requestKeyframe(t *diskTrack) {
now := time.Now()
if now.Sub(t.kfRequested) > 500*time.Millisecond {
t.remote.RequestKeyframe()
t.kfRequested = now
}
}
// writeRTP writes the packet without fetching lost packets
// Called locked.
func (t *diskTrack) writeRTP(p *rtp.Packet) error {
codec := t.remote.Codec().MimeType
if len(codec) > 6 && strings.EqualFold(codec[:6], "video/") {
kf, _ := gcodecs.Keyframe(codec, p)
if kf {
t.savedKf = p
t.lastKf = time.Now()
} else if time.Since(t.lastKf) > 4*time.Second {
requestKeyframe(t)
}
}
t.builder.Push(p)
return t.writeBuffered(false)
}
// writeBuffered writes any buffered samples to disk. If force is true,
// then samples will be flushed even if they are preceded by incomplete
// samples.
func (t *diskTrack) writeBuffered(force bool) error {
codec := t.remote.Codec().MimeType
for {
var sample *media.Sample
var ts uint32
if !force {
sample, ts = t.builder.PopWithTimestamp()
} else {
sample, ts = t.builder.ForcePopWithTimestamp()
}
if sample == nil {
return nil
}
var keyframe bool
if len(codec) > 6 && strings.EqualFold(codec[:6], "video/") {
if t.savedKf == nil {
keyframe = false
} else {
keyframe = (ts == t.savedKf.Timestamp)
}
if keyframe {
err := t.conn.initWriter(
gcodecs.KeyframeDimensions(
codec, t.savedKf,
),
)
if err != nil {
t.conn.warn(
"Write to disk " + err.Error(),
)
return err
}
}
} else {
if t.writer == nil {
if !t.conn.hasVideo {
err := t.conn.initWriter(0, 0)
if err != nil {
t.conn.warn(
"Write to disk " +
err.Error(),
)
return err
}
}
}
}
if t.writer == nil {
continue
}
if !valid(t.origin) {
t.origin = some(ts)
}
ts -= value(t.origin)
tm := ts / (t.remote.Codec().ClockRate / 1000)
_, err := t.writer.Write(keyframe, int64(tm), sample.Data)
if err != nil {
return err
}
}
}
// called locked
func (conn *diskConn) initWriter(width, height uint32) error {
if conn.file != nil && width == conn.width && height == conn.height {
return nil
}
isWebm := true
var desc []mkvcore.TrackDescription
for i, t := range conn.tracks {
var entry webm.TrackEntry
codec := t.remote.Codec()
if strings.EqualFold(codec.MimeType, "audio/opus") {
entry = webm.TrackEntry{
Name: "Audio",
TrackNumber: uint64(i + 1),
CodecID: "A_OPUS",
TrackType: 2,
Audio: &webm.Audio{
SamplingFrequency: float64(codec.ClockRate),
Channels: uint64(codec.Channels),
},
}
} else if strings.EqualFold(codec.MimeType, "video/vp8") {
entry = webm.TrackEntry{
Name: "Video",
TrackNumber: uint64(i + 1),
CodecID: "V_VP8",
TrackType: 1,
Video: &webm.Video{
PixelWidth: uint64(width),
PixelHeight: uint64(height),
},
}
} else if strings.EqualFold(codec.MimeType, "video/vp9") {
entry = webm.TrackEntry{
Name: "Video",
TrackNumber: uint64(i + 1),
CodecID: "V_VP9",
TrackType: 1,
Video: &webm.Video{
PixelWidth: uint64(width),
PixelHeight: uint64(height),
},
}
} else if strings.EqualFold(codec.MimeType, "video/h264") {
entry = webm.TrackEntry{
Name: "Video",
TrackNumber: uint64(i + 1),
CodecID: "V_MPEG4/ISO/AVC",
TrackType: 1,
Video: &webm.Video{
PixelWidth: uint64(width),
PixelHeight: uint64(height),
},
}
isWebm = false
} else {
return errors.New("unknown track type")
}
desc = append(desc,
mkvcore.TrackDescription{
TrackNumber: uint64(i + 1),
TrackEntry: entry,
},
)
}
extension := "webm"
header := webm.DefaultEBMLHeader
if !isWebm {
extension = "mkv"
h := *header
h.DocType = "matroska"
header = &h
}
err := conn.reopen(extension)
if err != nil {
return err
}
ws, err := mkvcore.NewSimpleBlockWriter(
conn.file, desc,
mkvcore.WithEBMLHeader(header),
mkvcore.WithSegmentInfo(webm.DefaultSegmentInfo),
mkvcore.WithBlockInterceptor(webm.DefaultBlockInterceptor),
)
if err != nil {
conn.file.Close()
conn.file = nil
return err
}
if len(ws) != len(conn.tracks) {
conn.file.Close()
conn.file = nil
return errors.New("unexpected number of writers")
}
conn.width = width
conn.height = height
for i, t := range conn.tracks {
t.writer = ws[i]
}
return nil
}
func (t *diskTrack) GetMaxBitrate() (uint64, int, int) {
return ^uint64(0), -1, -1
}