src/server/internal/recording/recorder.go
9,751 bytes · 365 lines · capsule://quake0day/[email protected]
raw on github
package recording
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"sync"
"github.com/cyberverse/server/internal/config"
)
// VideoRecorder is held by the Orchestrator and creates per-turn recordings.
type VideoRecorder struct {
cfg config.RecordingConfig
}
// NewVideoRecorder creates a new VideoRecorder.
func NewVideoRecorder(cfg config.RecordingConfig) *VideoRecorder {
return &VideoRecorder{cfg: cfg}
}
func (r *VideoRecorder) resolveOutputDir(sessionDir string) string {
if filepath.IsAbs(sessionDir) {
return sessionDir
}
return filepath.Join(r.cfg.OutputDir, sessionDir)
}
// EncodeRGB24ToMP4 writes raw RGB24 frames plus optional mono PCM16 audio into an MP4.
func EncodeRGB24ToMP4(outPath string, width, height, fps int, rgbChunks [][]byte, pcm []byte, sampleRate int, crf int) error {
if width <= 0 || height <= 0 || fps <= 0 {
return fmt.Errorf("recording: invalid video params width=%d height=%d fps=%d", width, height, fps)
}
if len(rgbChunks) == 0 {
return fmt.Errorf("recording: no rgb chunks to encode")
}
if crf <= 0 {
crf = 23
}
if sampleRate <= 0 {
sampleRate = 16000
}
if _, err := exec.LookPath("ffmpeg"); err != nil {
return fmt.Errorf("recording: ffmpeg not found in PATH")
}
if err := os.MkdirAll(filepath.Dir(outPath), 0755); err != nil {
return fmt.Errorf("recording: create output dir: %w", err)
}
videoSize := fmt.Sprintf("%dx%d", width, height)
args := []string{
"-hide_banner",
"-loglevel", "error",
"-y",
"-f", "rawvideo",
"-pixel_format", "rgb24",
"-video_size", videoSize,
"-framerate", fmt.Sprintf("%d", fps),
"-i", "pipe:0",
}
var audioTmpPath string
if len(pcm) > 0 {
// Use os.CreateTemp to avoid naming conflicts when multiple goroutines
// encode idle videos concurrently.
tmpFile, err := os.CreateTemp("", "cyberverse-idle-audio-*.pcm")
if err != nil {
return fmt.Errorf("recording: create temp pcm file: %w", err)
}
audioTmpPath = tmpFile.Name()
if _, err := tmpFile.Write(pcm); err != nil {
tmpFile.Close()
os.Remove(audioTmpPath)
return fmt.Errorf("recording: write temp pcm: %w", err)
}
tmpFile.Close()
defer os.Remove(audioTmpPath)
args = append(args,
"-f", "s16le",
"-ac", "1",
"-ar", fmt.Sprintf("%d", sampleRate),
"-i", audioTmpPath,
)
}
args = append(args,
"-c:v", "libx264",
"-preset", "fast",
"-crf", fmt.Sprintf("%d", crf),
"-pix_fmt", "yuv420p",
)
if len(pcm) > 0 {
args = append(args,
"-c:a", "aac",
"-b:a", "96k",
"-shortest",
)
} else {
args = append(args, "-an")
}
args = append(args, outPath)
cmd := exec.Command("ffmpeg", args...)
stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("recording: ffmpeg stdin pipe: %w", err)
}
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Start(); err != nil {
return fmt.Errorf("recording: ffmpeg start: %w", err)
}
writeErr := func() error {
for _, chunk := range rgbChunks {
if len(chunk) == 0 {
continue
}
if _, err := stdin.Write(chunk); err != nil {
return err
}
}
return nil
}()
closeErr := stdin.Close()
waitErr := cmd.Wait()
if writeErr != nil {
return fmt.Errorf("recording: ffmpeg stdin write: %w", writeErr)
}
if closeErr != nil {
return fmt.Errorf("recording: ffmpeg stdin close: %w", closeErr)
}
if waitErr != nil {
return fmt.Errorf("recording: ffmpeg encode failed: %w: %s", waitErr, stderr.String())
}
return nil
}
// TurnRecording manages one conversation turn: video encoded via ffmpeg stdin pipe,
// audio (silence-padded segPCM from flushVoiceSeg) accumulated in memory.
// Finish() muxes them into a single MP4.
type TurnRecording struct {
videoCmd *exec.Cmd
videoWriter io.WriteCloser
videoDone chan error
videoTmpPath string
mu sync.Mutex
audioBuf bytes.Buffer
sampleRate int
outPath string
cfg config.RecordingConfig
}
// BeginTurn starts a new turn recording. Returns nil if recording is disabled.
// sessionDir can be an absolute path (used when recording into a character's data space)
// or a relative name (legacy behavior, joined with cfg.OutputDir).
func (r *VideoRecorder) BeginTurn(sessionDir, turnID string, width, height, fps int) *TurnRecording {
if !r.cfg.Enabled {
return nil
}
outDir := r.resolveOutputDir(sessionDir)
if err := os.MkdirAll(outDir, 0755); err != nil {
log.Printf("recording: failed to create dir %s: %v", outDir, err)
return nil
}
outPath := filepath.Join(outDir, turnID+".mp4")
videoTmpPath := filepath.Join(outDir, turnID+"-video.mp4")
videoSize := fmt.Sprintf("%dx%d", width, height)
cmd := exec.Command("ffmpeg", "-y",
"-f", "rawvideo",
"-pixel_format", "rgb24",
"-video_size", videoSize,
"-framerate", fmt.Sprintf("%d", fps),
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "fast",
"-crf", fmt.Sprintf("%d", r.cfg.CRF),
"-pix_fmt", "yuv420p",
videoTmpPath,
)
stdin, err := cmd.StdinPipe()
if err != nil {
log.Printf("recording: stdin pipe error: %v", err)
return nil
}
if err := cmd.Start(); err != nil {
log.Printf("recording: ffmpeg start error: %v", err)
return nil
}
t := &TurnRecording{
videoCmd: cmd,
videoWriter: stdin,
videoDone: make(chan error, 1),
videoTmpPath: videoTmpPath,
outPath: outPath,
cfg: r.cfg,
}
go func() { t.videoDone <- cmd.Wait() }()
return t
}
// WriteVideoChunk writes raw RGB24 bytes into the ffmpeg encoder.
func (t *TurnRecording) WriteVideoChunk(rgbBytes []byte) {
if t == nil || len(rgbBytes) == 0 {
return
}
if _, err := t.videoWriter.Write(rgbBytes); err != nil {
log.Printf("recording: video write error: %v", err)
}
}
// WriteAudioChunk accumulates silence-padded PCM that is already aligned to
// the video segment duration (segPCM from flushVoiceSeg).
func (t *TurnRecording) WriteAudioChunk(pcmBytes []byte, sampleRate int) {
if t == nil || len(pcmBytes) == 0 {
return
}
t.mu.Lock()
defer t.mu.Unlock()
if t.sampleRate == 0 {
t.sampleRate = sampleRate
}
t.audioBuf.Write(pcmBytes)
}
// Finish closes the video pipe, writes a WAV tmp file, muxes both into the
// final MP4, then removes the temporary files.
func (t *TurnRecording) Finish() error {
if t == nil {
return nil
}
// Step 1: finalize video encoding.
t.videoWriter.Close()
if err := <-t.videoDone; err != nil {
log.Printf("recording: video ffmpeg error: %v", err)
os.Remove(t.videoTmpPath)
return fmt.Errorf("recording: finalize video failed: %w", err)
}
// Step 2: write audio WAV temp file.
t.mu.Lock()
pcmData := t.audioBuf.Bytes()
sr := t.sampleRate
t.mu.Unlock()
audioTmpPath := t.videoTmpPath[:len(t.videoTmpPath)-len("-video.mp4")] + "-audio.wav"
f, err := os.Create(audioTmpPath)
if err != nil {
return fmt.Errorf("recording: create audio tmp: %w", err)
}
writeWAVHeader(f, len(pcmData), sr)
f.Write(pcmData)
f.Close()
// Step 3: mux video + audio.
muxCmd := exec.Command("ffmpeg", "-y",
"-i", t.videoTmpPath,
"-i", audioTmpPath,
"-c:v", "copy",
"-c:a", "aac",
"-movflags", "+faststart",
t.outPath,
)
if out, err := muxCmd.CombinedOutput(); err != nil {
log.Printf("recording: mux error: %v\n%s", err, string(out))
os.Remove(t.videoTmpPath)
os.Remove(audioTmpPath)
return fmt.Errorf("recording: mux failed: %w", err)
}
// Step 4: cleanup.
os.Remove(t.videoTmpPath)
os.Remove(audioTmpPath)
log.Printf("recording saved: %s", t.outPath)
return nil
}
// SaveRawAudio saves Doubao's raw PCM output for a turn as a WAV file.
// sessionDir can be an absolute path or a relative name (joined with cfg.OutputDir).
func (r *VideoRecorder) SaveRawAudio(sessionDir, turnID string, pcm []byte, sampleRate int) error {
if !r.cfg.Enabled || len(pcm) == 0 {
return nil
}
outDir := r.resolveOutputDir(sessionDir)
if err := os.MkdirAll(outDir, 0755); err != nil {
return fmt.Errorf("recording: create raw audio dir: %w", err)
}
path := filepath.Join(outDir, turnID+"-raw.wav")
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("recording: create raw audio: %w", err)
}
defer f.Close()
writeWAVHeader(f, len(pcm), sampleRate)
f.Write(pcm)
log.Printf("recording raw audio saved: %s", path)
return nil
}
// SaveTranscript saves a turn's assistant reply as a UTF-8 text file.
// sessionDir can be an absolute path or a relative name (joined with cfg.OutputDir).
func (r *VideoRecorder) SaveTranscript(sessionDir, turnID, transcript string) error {
if !r.cfg.Enabled || turnID == "" || transcript == "" {
return nil
}
outDir := r.resolveOutputDir(sessionDir)
if err := os.MkdirAll(outDir, 0755); err != nil {
return fmt.Errorf("recording: create transcript dir: %w", err)
}
path := filepath.Join(outDir, turnID+".txt")
if err := os.WriteFile(path, []byte(transcript), 0644); err != nil {
return fmt.Errorf("recording: write transcript: %w", err)
}
log.Printf("recording transcript saved: %s", path)
return nil
}
// writeWAVHeader writes a standard 44-byte PCM WAV header (mono, 16-bit LE).
func writeWAVHeader(w io.Writer, dataLen int, sampleRate int) {
if sampleRate <= 0 {
sampleRate = 24000
}
const (
channels = 1
bitsPerSample = 16
)
byteRate := sampleRate * channels * bitsPerSample / 8
blockAlign := channels * bitsPerSample / 8
totalLen := 36 + dataLen
write := func(v any) { binary.Write(w, binary.LittleEndian, v) }
w.Write([]byte("RIFF"))
write(uint32(totalLen))
w.Write([]byte("WAVE"))
w.Write([]byte("fmt "))
write(uint32(16))
write(uint16(1)) // PCM format
write(uint16(channels))
write(uint32(sampleRate))
write(uint32(byteRate))
write(uint16(blockAlign))
write(uint16(bitsPerSample))
w.Write([]byte("data"))
write(uint32(dataLen))
}