capsule AI-native Unix-like composition layer

src/server/internal/mediapeer/segment.go

6,232 bytes · 216 lines · capsule://quake0day/[email protected] raw on github

package mediapeer

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"os/exec"
	"time"

	"github.com/pion/webrtc/v4/pkg/media"
	"github.com/pion/webrtc/v4/pkg/media/ivfreader"
	opus "gopkg.in/hraban/opus.v2"
)

// RawAVSegment is an unencoded video+audio segment ready for VP8 encoding.
type RawAVSegment struct {
	TraceLabel        string
	Epoch             uint64
	SegmentSeq        int64
	MediaStartMS      int64
	DurationMS        int64
	MarkerID          int64
	MarkerMediaMS     int64
	MarkerDurationMS  int64
	MarkerFrequencyHz int
	RGB               []byte
	PCM               []byte
	UserFinalAt       time.Time
	SampleRate        int
	Width             int
	Height            int
	FPS               int
	NumFrames         int
	QueuedAt          time.Time // set by SendAVSegment for pipeline latency tracking

	// Fence is a pipeline drain marker. When set (non-nil) and RGB is empty,
	// the encoder passes it through without encoding. The publisher closes
	// the channel after finishing all preceding segments, signalling the
	// producer that the pipeline has been fully drained.
	Fence chan struct{}
}

// AVSegment is a pre-encoded video+audio segment ready for paced publishing.
type AVSegment struct {
	TraceLabel        string
	Epoch             uint64
	SegmentSeq        int64
	MediaStartMS      int64
	DurationMS        int64
	MarkerID          int64
	MarkerMediaMS     int64
	MarkerDurationMS  int64
	MarkerFrequencyHz int
	VP8Samples        []media.Sample
	PCM               []byte
	UserFinalAt       time.Time
	SampleRate        int
	Width             int
	Height            int
	FPS               int
	NumFrames         int
	QueuedAt          time.Time // carried from RawAVSegment for end-to-end latency
	Fence             chan struct{}
}

const defaultVP8BitrateKbps = 2000

// EncodeRGBChunkToVP8Samples encodes a contiguous RGB24 buffer to VP8 samples.
func EncodeRGBChunkToVP8Samples(rgb []byte, width, height, numFrames, fps int) ([]media.Sample, error) {
	return EncodeRGBChunkToVP8SamplesWithBitrate(rgb, width, height, numFrames, fps, defaultVP8BitrateKbps)
}

// EncodeRGBChunkToVP8SamplesWithBitrate encodes a contiguous RGB24 buffer to VP8 samples.
func EncodeRGBChunkToVP8SamplesWithBitrate(rgb []byte, width, height, numFrames, fps int, bitrateKbps int) ([]media.Sample, error) {
	if numFrames <= 0 || width <= 0 || height <= 0 {
		return nil, nil
	}
	if fps <= 0 {
		fps = 25
	}
	if bitrateKbps <= 0 {
		bitrateKbps = defaultVP8BitrateKbps
	}
	want := width * height * 3 * numFrames
	if len(rgb) < want {
		return nil, fmt.Errorf("rgb buffer too small: got %d want >= %d", len(rgb), want)
	}
	rgb = rgb[:want]

	frameDur := time.Second / time.Duration(fps)
	if frameDur <= 0 {
		frameDur = time.Millisecond * 40
	}

	// IVF VP8 via libvpx; single-process encode per chunk (latency vs simplicity tradeoff).
	bitrateArg := fmt.Sprintf("%dk", bitrateKbps)
	bufSizeArg := fmt.Sprintf("%dk", bitrateKbps*2)
	keyframeInterval := fmt.Sprintf("%d", numFrames)
	cmd := exec.Command(
		"ffmpeg",
		"-loglevel", "error",
		"-f", "rawvideo",
		"-pixel_format", "rgb24",
		"-video_size", fmt.Sprintf("%dx%d", width, height),
		"-framerate", fmt.Sprintf("%d", fps),
		"-i", "pipe:0",
		"-frames:v", fmt.Sprintf("%d", numFrames),
		"-c:v", "libvpx",
		"-deadline", "realtime",
		"-cpu-used", "8",
		"-row-mt", "1",
		"-b:v", bitrateArg,
		"-maxrate", bitrateArg,
		"-bufsize", bufSizeArg,
		"-g", keyframeInterval, // Each restarted chunk starts with a keyframe; keep later frames as inter frames.
		"-keyint_min", keyframeInterval,
		"-an",
		"-f", "ivf",
		"pipe:1",
	)
	cmd.Stdin = bytes.NewReader(rgb)
	var ivfOut bytes.Buffer
	cmd.Stdout = &ivfOut
	var stderr bytes.Buffer
	cmd.Stderr = &stderr
	if err := cmd.Run(); err != nil {
		return nil, fmt.Errorf("ffmpeg vp8: %w: %s", err, stderr.String())
	}

	reader, fh, err := ivfreader.NewWith(&ivfOut)
	if err != nil {
		return nil, fmt.Errorf("ivf header: %w", err)
	}

	n := int(fh.NumFrames)
	if n <= 0 {
		n = numFrames
	}

	var samples []media.Sample
	for i := 0; i < n; i++ {
		payload, _, err := reader.ParseNextFrame()
		if err != nil {
			break
		}
		if len(payload) == 0 {
			continue
		}
		frame := make([]byte, len(payload))
		copy(frame, payload)
		samples = append(samples, media.Sample{
			Data:     frame,
			Duration: frameDur,
		})
	}
	if len(samples) == 0 {
		return nil, fmt.Errorf("ffmpeg produced zero vp8 frames")
	}
	return samples, nil
}

// EncodePCMToOpusSamples encodes an entire 16-bit mono PCM buffer into
// 20ms Opus frames returned as media.Samples. Encoding the full buffer
// at once avoids the sample-loss caused by splitting PCM into small
// slices and encoding each independently.
func EncodePCMToOpusSamples(pcm []byte, sampleRate int) ([]media.Sample, error) {
	if len(pcm) == 0 || sampleRate <= 0 {
		return nil, nil
	}

	enc, err := opus.NewEncoder(sampleRate, 1, opus.AppVoIP)
	if err != nil {
		return nil, fmt.Errorf("create opus encoder: %w", err)
	}

	// 20ms frame parameters
	samplesPerFrame := sampleRate / 50   // 20ms = 1/50 second
	bytesPerFrame := samplesPerFrame * 2 // 16-bit mono
	if bytesPerFrame <= 0 {
		return nil, fmt.Errorf("invalid opus frame size for sample_rate=%d", sampleRate)
	}
	opusBuf := make([]byte, 4000) // max opus frame size

	// Pad trailing bytes (often silence from strict segment sizing) so RTP audio
	// duration matches the PCM buffer length and lip-synced video segments.
	if rem := len(pcm) % bytesPerFrame; rem != 0 {
		pcm = append(append([]byte(nil), pcm...), make([]byte, bytesPerFrame-rem)...)
	}

	estimatedFrames := len(pcm) / bytesPerFrame
	samples := make([]media.Sample, 0, estimatedFrames)

	for offset := 0; offset+bytesPerFrame <= len(pcm); offset += bytesPerFrame {
		pcmSamples := make([]int16, samplesPerFrame)
		for i := 0; i < samplesPerFrame; i++ {
			pcmSamples[i] = int16(binary.LittleEndian.Uint16(pcm[offset+i*2:]))
		}

		n, err := enc.Encode(pcmSamples, opusBuf)
		if err != nil {
			return nil, fmt.Errorf("opus encode: %w", err)
		}
		if n == 0 {
			continue
		}

		frame := make([]byte, n)
		copy(frame, opusBuf[:n])
		samples = append(samples, media.Sample{
			Data:     frame,
			Duration: 20 * time.Millisecond,
		})
	}
	return samples, nil
}