src/server/internal/orchestrator/orchestrator.go
119,323 bytes · 4,076 lines · capsule://quake0day/[email protected]
raw on github
package orchestrator
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"image"
"image/color"
"image/draw"
"image/png"
"log"
"math"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/cyberverse/server/internal/agenttask"
"github.com/cyberverse/server/internal/character"
"github.com/cyberverse/server/internal/config"
"github.com/cyberverse/server/internal/direct"
"github.com/cyberverse/server/internal/inference"
"github.com/cyberverse/server/internal/kanshan"
"github.com/cyberverse/server/internal/livekit"
"github.com/cyberverse/server/internal/mediapeer"
"github.com/cyberverse/server/internal/pb"
ragstore "github.com/cyberverse/server/internal/rag"
"github.com/cyberverse/server/internal/recording"
"github.com/cyberverse/server/internal/ws"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/webrtc/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// stdChunksPerSegment is how many avatar video chunks to batch before publishing
// in the standard (ASR/text→LLM→TTS→Avatar) pipeline. Qwen TTS produces audio
// before avatar video, so each avatar chunk is sent as its own paced AV segment
// with matching PCM instead of publishing audio ahead of video.
const stdChunksPerSegment = 1
// No hard cap on the assistant PCM buffer: long responses (>20s) were
// previously truncated, causing the first N seconds of audio to be dropped
// and all video segments to play with misaligned (or silent) audio.
// Set to 0 to disable the overflow guard entirely.
const voiceMaxPCMBufferSamples = 0
const avatarImageMaxUploadHint = "角色头像图片超过当前 10MB 上传限制,已使用默认头像;待机视频也不会生成。请压缩或缩放角色图片到 10MB 以内后重试。"
const (
doubaoDialogContextMaxPairs = 20
doubaoDialogContextLoadLimit = doubaoDialogContextMaxPairs * 4
startupGreetingHistoryItems = 6
qwenOmniMaxVisualFrameBytes = 500 * 1024
)
const standardGlobalSystemPrompt = `你需要遵守以下通用回复规范,这些规范优先于角色设定。
回复长度:
- 默认简洁,先直接回答用户真正问的事。
- 闲聊、确认、简单提问时,用 1-3 句话回答。
- 推荐、方案、分析、教程可以展开,但先给结论,再给细节。
- 用户明确要求详细、完整、深入时,才使用较长结构化回答。
表达风格:
- 像真实的人聊天,直接、自然、有分寸。
- 不使用固定开场白,不写舞台动作或心理活动。
- 不堆叠 emoji、感叹号、波浪号或无信息量的亲密称呼。
- 避免 AI 味套话,例如“作为一个 AI”“希望这对你有帮助”“我将从以下几个方面”。
内容质量:
- 不说空话,不绕圈子,不为了亲切而尬夸或过度共情。
- 不确定的事情要说明不确定,不要编造。
- 需要澄清时,只问一个最关键的问题。`
var (
ErrVisualInputUnsupported = errors.New("visual input is only supported in standard sessions or qwen_omni voice sessions")
ErrVisualInputDisabled = errors.New("visual input is disabled")
)
type voiceAVSyncBuffer struct {
mu sync.Mutex
pcmBytes []byte
sampleRate int
totalAudioIn int64
totalAudioOut int64
maxBufferSamples int
// Carries fractional samples from frames*sampleRate/fps to avoid
// long-session drift caused by per-segment integer rounding.
sampleCarryNumer int64
}
func newVoiceAVSyncBuffer(maxBufferSamples int) *voiceAVSyncBuffer {
if maxBufferSamples <= 0 {
maxBufferSamples = voiceMaxPCMBufferSamples
}
return &voiceAVSyncBuffer{maxBufferSamples: maxBufferSamples}
}
func (b *voiceAVSyncBuffer) appendPCM(pcm []byte, sampleRate int) (droppedBytes int) {
b.mu.Lock()
defer b.mu.Unlock()
if len(pcm) == 0 || sampleRate <= 0 {
return 0
}
if b.sampleRate == 0 {
b.sampleRate = sampleRate
}
if b.sampleRate != sampleRate {
b.sampleRate = sampleRate
b.pcmBytes = nil
}
if len(pcm)%2 != 0 {
pcm = pcm[:len(pcm)-1]
}
if len(pcm) == 0 {
return 0
}
b.pcmBytes = append(b.pcmBytes, pcm...)
b.totalAudioIn += int64(len(pcm) / 2)
maxBytes := b.maxBufferSamples * 2
if maxBytes > 0 && len(b.pcmBytes) > maxBytes {
droppedBytes = len(b.pcmBytes) - maxBytes
if droppedBytes%2 != 0 {
droppedBytes++
}
b.pcmBytes = b.pcmBytes[droppedBytes:]
}
return droppedBytes
}
func desiredSamplesForVideo(frames, fps, sampleRate int) int {
if frames <= 0 || fps <= 0 || sampleRate <= 0 {
return 0
}
// Rounded target samples for segment duration = frames / fps seconds.
return (frames*sampleRate + fps/2) / fps
}
func durationMSForVideo(frames, fps int) int64 {
if frames <= 0 || fps <= 0 {
return 0
}
return int64(math.Round(float64(frames) * 1000 / float64(fps)))
}
func (b *voiceAVSyncBuffer) takeSegmentPCM(frames, fps int, isFinal bool) ([]byte, int, int, int) {
b.mu.Lock()
defer b.mu.Unlock()
if frames <= 0 || fps <= 0 || b.sampleRate <= 0 {
return nil, 0, 0, 0
}
// Exact target with carry:
// want = floor((frames*sampleRate + carry)/fps), carry = modulo part.
numer := int64(frames*b.sampleRate) + b.sampleCarryNumer
wantSamples := int(numer / int64(fps))
b.sampleCarryNumer = numer % int64(fps)
if wantSamples <= 0 {
wantSamples = desiredSamplesForVideo(frames, fps, b.sampleRate)
}
if wantSamples <= 0 {
return nil, 0, 0, len(b.pcmBytes) / 2
}
wantBytes := wantSamples * 2
availableBytes := len(b.pcmBytes)
takeBytes := wantBytes
if takeBytes > availableBytes {
takeBytes = availableBytes
}
if takeBytes%2 != 0 {
takeBytes--
}
out := make([]byte, wantBytes) // strict lip-sync: always return exact segment duration
if takeBytes > 0 {
copy(out, b.pcmBytes[:takeBytes])
b.pcmBytes = b.pcmBytes[takeBytes:]
}
outSamples := takeBytes / 2
b.totalAudioOut += int64(outSamples)
if isFinal {
// Final close-loop: strict mode prefers exact A/V alignment over
// carrying remaining tail audio into post-video silence.
b.pcmBytes = nil
}
return out, outSamples, wantSamples, len(b.pcmBytes) / 2
}
func (b *voiceAVSyncBuffer) snapshot() (bufferedSamples int, totalIn int64, totalOut int64, sampleRate int) {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.pcmBytes) / 2, b.totalAudioIn, b.totalAudioOut, b.sampleRate
}
type voicePipelineTurn struct {
seq uint64
key string
questionID string
replyID string
assistantText string
recTurnID string
recAudioBuf []byte
recAudioSR int
historySaved bool
conversationSaved bool
transcriptSaved bool
rawAudioSaved bool
sessionDir string
turnStart time.Time
userFinalAt time.Time
firstAudioAt time.Time
audioFinalAt time.Time
avatarWorkerAt time.Time
firstAvatarAudioAt time.Time
avatarInputClosedAt time.Time
firstVideoAt time.Time
syncBuf *voiceAVSyncBuffer
avatarStarted bool
audioOnlyStarted bool
avatarInputClosed bool
avatarAudioCh chan *pb.AudioChunk
avatarCtx context.Context
avatarCancel context.CancelFunc
doneCh chan voicePipelineTurnResult
aborted bool
}
type voicePipelineTurnResult struct {
turn *voicePipelineTurn
err error
}
func voiceOutputTurnKey(output *pb.VoiceLLMOutput) string {
if output == nil {
return ""
}
if replyID := strings.TrimSpace(output.GetReplyId()); replyID != "" {
return "reply:" + replyID
}
if questionID := strings.TrimSpace(output.GetQuestionId()); questionID != "" {
return "question:" + questionID
}
return ""
}
func voiceOutputHasAssistantContent(output *pb.VoiceLLMOutput) bool {
if output == nil {
return false
}
if output.GetTranscript() != "" {
return true
}
audio := output.GetAudio()
return audio != nil && len(audio.GetData()) > 0
}
func voiceOutputIsFinal(output *pb.VoiceLLMOutput) bool {
if output == nil {
return false
}
if output.GetIsFinal() {
return true
}
audio := output.GetAudio()
return audio != nil && audio.GetIsFinal()
}
type dialogContextMessage struct {
sessionID string
role string
text string
timestamp time.Time
}
func stringValue(v any) string {
if s, ok := v.(string); ok {
return strings.TrimSpace(s)
}
return ""
}
func mapValue(v any) map[string]any {
if m, ok := v.(map[string]any); ok {
return m
}
return nil
}
func intValue(v any, fallback int) int {
switch n := v.(type) {
case int:
return n
case int64:
return int(n)
case float64:
return int(n)
case json.Number:
if parsed, err := n.Int64(); err == nil {
return int(parsed)
}
case string:
if parsed, err := strconv.Atoi(strings.TrimSpace(n)); err == nil {
return parsed
}
}
return fallback
}
func taskStatusValue(v any, fallback agenttask.Status) agenttask.Status {
status := agenttask.Status(stringValue(v))
switch status {
case agenttask.StatusQueued, agenttask.StatusRunning, agenttask.StatusWaitingUser,
agenttask.StatusCompleted, agenttask.StatusFailed, agenttask.StatusCancelled:
return status
default:
return fallback
}
}
func unixTimeFromNumber(n int64) time.Time {
if n <= 0 {
return time.Time{}
}
if n > 1_000_000_000_000 {
return time.UnixMilli(n).UTC()
}
return time.Unix(n, 0).UTC()
}
func parseConversationTimestamp(v any) time.Time {
switch t := v.(type) {
case string:
t = strings.TrimSpace(t)
if t == "" {
return time.Time{}
}
if parsed, err := time.Parse(time.RFC3339Nano, t); err == nil {
return parsed.UTC()
}
if parsed, err := time.Parse(time.RFC3339, t); err == nil {
return parsed.UTC()
}
if n, err := strconv.ParseInt(t, 10, 64); err == nil {
return unixTimeFromNumber(n)
}
case float64:
return unixTimeFromNumber(int64(t))
case int:
return unixTimeFromNumber(int64(t))
case int64:
return unixTimeFromNumber(t)
case json.Number:
if n, err := t.Int64(); err == nil {
return unixTimeFromNumber(n)
}
}
return time.Time{}
}
func buildDoubaoDialogContext(messages []map[string]any, maxPairs int, now time.Time) []DialogContextItem {
if maxPairs <= 0 {
maxPairs = doubaoDialogContextMaxPairs
}
if now.IsZero() {
now = time.Now().UTC()
} else {
now = now.UTC()
}
filtered := make([]dialogContextMessage, 0, len(messages))
for _, msg := range messages {
role := strings.ToLower(stringValue(msg["role"]))
if role != "user" && role != "assistant" {
continue
}
text := stringValue(msg["content"])
if text == "" {
text = stringValue(msg["text"])
}
if text == "" {
continue
}
sessionID := stringValue(msg["session_id"])
if sessionID == "" {
continue
}
filtered = append(filtered, dialogContextMessage{
sessionID: sessionID,
role: role,
text: text,
timestamp: parseConversationTimestamp(msg["timestamp"]),
})
}
paired := make([]dialogContextMessage, 0, len(filtered))
var pendingUsers []dialogContextMessage
pendingSessionID := ""
for _, msg := range filtered {
if len(pendingUsers) > 0 && msg.sessionID != pendingSessionID {
pendingUsers = nil
pendingSessionID = ""
}
if msg.role == "user" {
if len(pendingUsers) == 0 {
pendingSessionID = msg.sessionID
}
pendingUsers = append(pendingUsers, msg)
continue
}
if len(pendingUsers) == 0 || msg.sessionID != pendingSessionID {
continue
}
var merged strings.Builder
for i, userMsg := range pendingUsers {
if i > 0 {
merged.WriteString("\n")
}
merged.WriteString(userMsg.text)
}
paired = append(paired, dialogContextMessage{
sessionID: pendingSessionID,
role: "user",
text: merged.String(),
timestamp: pendingUsers[0].timestamp,
}, msg)
pendingUsers = nil
pendingSessionID = ""
}
maxItems := maxPairs * 2
if len(paired) > maxItems {
paired = paired[len(paired)-maxItems:]
}
if len(paired) == 0 {
return nil
}
items := make([]DialogContextItem, len(paired))
for i, msg := range paired {
items[i] = DialogContextItem{
Role: msg.role,
Text: msg.text,
Timestamp: msg.timestamp.UnixMilli(),
}
}
return normalizeDialogContextTimestamps(items, now)
}
func normalizeDialogContextTimestamps(items []DialogContextItem, now time.Time) []DialogContextItem {
if len(items) == 0 {
return nil
}
if now.IsZero() {
now = time.Now().UTC()
} else {
now = now.UTC()
}
fallbackStart := now.Add(-time.Duration(len(items)) * time.Millisecond)
for i := range items {
if items[i].Timestamp <= 0 || time.UnixMilli(items[i].Timestamp).After(now) {
items[i].Timestamp = fallbackStart.Add(time.Duration(i) * time.Millisecond).UnixMilli()
}
}
var last int64
for i := range items {
if items[i].Timestamp <= last {
items[i].Timestamp = last + 1
}
last = items[i].Timestamp
}
if nowMS := now.UnixMilli(); last > nowMS {
delta := last - nowMS
for i := range items {
items[i].Timestamp -= delta
}
}
return items
}
func buildVoiceDialogContextFromSession(session *Session, excludeTurnSeq uint64, maxPairs int, now time.Time) []DialogContextItem {
if session == nil {
return nil
}
if maxPairs <= 0 {
maxPairs = doubaoDialogContextMaxPairs
}
items := session.DialogContextSnapshot()
for _, msg := range session.HistorySnapshot() {
if excludeTurnSeq > 0 && msg.TurnSeq == excludeTurnSeq {
continue
}
role := strings.ToLower(strings.TrimSpace(msg.Role))
if role != "user" && role != "assistant" {
continue
}
text := strings.TrimSpace(msg.Content)
if text == "" {
continue
}
timestamp := int64(0)
if !msg.Timestamp.IsZero() {
timestamp = msg.Timestamp.UTC().UnixMilli()
}
items = append(items, DialogContextItem{
Role: role,
Text: text,
Timestamp: timestamp,
})
}
maxItems := maxPairs * 2
if len(items) > maxItems {
items = items[len(items)-maxItems:]
}
return normalizeDialogContextTimestamps(items, now)
}
func startupGreetingHistory(items []DialogContextItem, maxItems int) []DialogContextItem {
if maxItems <= 0 {
maxItems = startupGreetingHistoryItems
}
if len(items) > maxItems {
items = items[len(items)-maxItems:]
}
return append([]DialogContextItem(nil), items...)
}
func safeTraceValue(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return "-"
}
replacer := strings.NewReplacer(" ", "_", "\t", "_", "\n", "_", "\r", "_")
return replacer.Replace(value)
}
func voiceTraceLabel(sessionID string, turnSeq uint64, replyID, questionID string, segSeq int64) string {
parts := []string{
"sid=" + safeTraceValue(sessionID),
"turn=" + strconv.FormatUint(turnSeq, 10),
"reply=" + safeTraceValue(replyID),
}
if questionID != "" {
parts = append(parts, "qid="+safeTraceValue(questionID))
}
if segSeq > 0 {
parts = append(parts, "seg="+strconv.FormatInt(segSeq, 10))
}
return strings.Join(parts, " ")
}
func characterSystemPrompt(char *character.Character, includeName bool, includeSpeakingStyle bool) string {
if char == nil {
return ""
}
var parts []string
appendField := func(label, value string) {
value = strings.TrimSpace(value)
if value != "" {
parts = append(parts, label+":"+value)
}
}
appendField("角色提示", char.SystemPrompt)
if includeName {
appendField("角色名称", char.Name)
}
appendField("角色描述", char.Description)
appendField("角色性格", char.Personality)
if includeSpeakingStyle {
appendField("说话风格", char.SpeakingStyle)
}
return strings.Join(parts, "\n")
}
func composeSystemPrompt(globalPrompt string, rolePrompt string) string {
globalPrompt = strings.TrimSpace(globalPrompt)
rolePrompt = strings.TrimSpace(rolePrompt)
switch {
case globalPrompt == "" && rolePrompt == "":
return ""
case globalPrompt == "":
return "【角色设定】\n" + rolePrompt
case rolePrompt == "":
return "【全局输出规范】\n" + globalPrompt
default:
return "【全局输出规范】\n" + globalPrompt + "\n\n【角色设定】\n" + rolePrompt
}
}
func formatSinceUserFinal(start time.Time) string {
if start.IsZero() {
return "-"
}
return strconv.FormatInt(time.Since(start).Milliseconds(), 10)
}
func logVoiceTrace(event, sessionID string, turnSeq uint64, replyID, questionID string, since time.Time, fields ...string) {
parts := []string{
fmt.Sprintf("voice_trace event=%-30s", event),
"sid=" + safeTraceValue(sessionID),
"turn=" + strconv.FormatUint(turnSeq, 10),
"reply=" + safeTraceValue(replyID),
}
parts = append(parts, "qid="+safeTraceValue(questionID))
parts = append(parts, "since_user_final_ms="+formatSinceUserFinal(since))
parts = append(parts, fields...)
log.Print(strings.Join(parts, " "))
}
// Orchestrator manages the inference pipeline for each session,
// coordinating between the gRPC inference client, media peers,
// and WebSocket hub for real-time updates.
type Orchestrator struct {
inference inference.InferenceService
wsHub *ws.Hub
sessionMgr *SessionManager
charStore *character.Store
peers map[string]mediapeer.MediaPeer // sessionID → media peer (Bot or DirectPeer)
directPeers map[string]*direct.DirectPeer // sessionID → DirectPeer (for signaling dispatch)
recorder *recording.VideoRecorder
streamingMode string
pipelineCfg config.PipelineConfig
turnServer *direct.TURNServer
webrtcAPI *webrtc.API
estimatorCh <-chan cc.BandwidthEstimator
taskService *agenttask.Service
avatarMu sync.Mutex
mu sync.RWMutex
}
// New creates a new Orchestrator.
func New(inferenceClient inference.InferenceService, hub *ws.Hub, sessionMgr *SessionManager, recorder *recording.VideoRecorder, charStore *character.Store, pipelineCfg ...config.PipelineConfig) *Orchestrator {
o := &Orchestrator{
inference: inferenceClient,
wsHub: hub,
sessionMgr: sessionMgr,
charStore: charStore,
peers: make(map[string]mediapeer.MediaPeer),
directPeers: make(map[string]*direct.DirectPeer),
recorder: recorder,
}
if len(pipelineCfg) > 0 {
o.pipelineCfg = pipelineCfg[0]
o.streamingMode = pipelineCfg[0].StreamingMode
}
if o.streamingMode == "" {
o.streamingMode = "direct"
}
return o
}
// HandleSignaling dispatches WebRTC signaling messages to the DirectPeer.
func (o *Orchestrator) HandleSignaling(sessionID string, msg ws.WSMessage) {
o.mu.RLock()
dp := o.directPeers[sessionID]
o.mu.RUnlock()
if dp == nil {
return
}
switch msg.Type {
case "av_calibration_config":
dp.SetAVCalibrationEnabled(msg.Enabled)
case "av_sync_feedback":
dp.HandleAVSyncFeedback(msg.TurnSeq, msg.ExcessVideoLagMS, msg.JitterBufferDeltaMS, msg.Likely)
case "webrtc_ready":
o.sendDirectWebRTCConfig(sessionID)
if err := dp.StartNegotiation(); err != nil {
log.Printf("[Orchestrator] session=%s StartNegotiation failed: %v", sessionID, err)
}
case "direct_media_reset_request":
o.sendDirectWebRTCConfig(sessionID)
if err := dp.ResetMediaPath(context.Background()); err != nil {
log.Printf("[Orchestrator] session=%s ResetMediaPath failed: %v", sessionID, err)
}
case "webrtc_answer", "ice_candidate":
var sdpMid *string
if msg.SDPMid != "" {
sdpMid = &msg.SDPMid
}
dp.HandleSignaling(msg.Type, msg.SDP, msg.Candidate, sdpMid, msg.SDPMLine)
}
}
func (o *Orchestrator) sendDirectWebRTCConfig(sessionID string) {
// Send TURN ICE server config before the SDP offer.
if o.turnServer == nil {
return
}
host := o.pipelineCfg.ICEPublicIP
if host == "" {
host = "127.0.0.1"
}
o.broadcastJSON(sessionID, map[string]any{
"type": "webrtc_config",
"ice_servers": []any{o.turnServer.ICEServerConfig(host)},
})
}
// SetTURNServer sets the embedded TURN server for NAT traversal.
func (o *Orchestrator) SetTURNServer(ts *direct.TURNServer) {
o.turnServer = ts
}
// SetWebRTCAPI sets the shared webrtc.API with interceptors (NACK, TWCC, GCC).
func (o *Orchestrator) SetWebRTCAPI(api *webrtc.API, estimatorCh <-chan cc.BandwidthEstimator) {
o.webrtcAPI = api
o.estimatorCh = estimatorCh
}
func (o *Orchestrator) SetTaskService(taskService *agenttask.Service) {
o.taskService = taskService
}
// StreamingMode returns the current streaming mode.
func (o *Orchestrator) StreamingMode() string {
return o.streamingMode
}
func (o *Orchestrator) AvatarEnabled() bool {
if o == nil || o.pipelineCfg.AvatarEnabled == nil {
return true
}
return *o.pipelineCfg.AvatarEnabled
}
func (o *Orchestrator) HealthCheck(ctx context.Context) error {
if o == nil || o.inference == nil {
return errors.New("inference service is not configured")
}
return o.inference.HealthCheck(ctx)
}
func (o *Orchestrator) ragService() (inference.RAGService, bool) {
if o == nil || o.inference == nil {
return nil, false
}
svc, ok := o.inference.(inference.RAGService)
return svc, ok
}
func (o *Orchestrator) ragConfig() config.RAGConfig {
if o == nil {
return config.RAGConfig{}
}
cfg := o.pipelineCfg.RAG
if cfg.TopK == 0 {
cfg.TopK = 5
}
if cfg.MaxContextChars == 0 {
cfg.MaxContextChars = 4500
}
if cfg.MinScore == 0 {
cfg.MinScore = 0.25
}
return cfg
}
func (o *Orchestrator) IndexKnowledgeSource(ctx context.Context, characterID, characterDir string, source *ragstore.Source, sourcePath string) (int, error) {
svc, ok := o.ragService()
if !ok {
return 0, errors.New("RAG service is not configured")
}
if source == nil {
return 0, errors.New("knowledge source is nil")
}
return svc.IndexRAGSource(ctx, inference.RAGIndexSourceRequest{
CharacterID: characterID,
CharacterDir: characterDir,
SourceID: source.ID,
Title: source.Title,
Filename: source.Filename,
MimeType: source.MimeType,
SourcePath: sourcePath,
})
}
func (o *Orchestrator) DeleteKnowledgeSource(ctx context.Context, characterID, characterDir, sourceID string) error {
svc, ok := o.ragService()
if !ok {
return errors.New("RAG service is not configured")
}
return svc.DeleteRAGSource(ctx, characterID, characterDir, sourceID)
}
func (o *Orchestrator) searchKnowledge(ctx context.Context, characterID, query string) ([]inference.RAGSearchResult, error) {
cfg := o.ragConfig()
if !cfg.IsEnabled() || strings.TrimSpace(characterID) == "" || strings.TrimSpace(query) == "" {
return nil, nil
}
svc, ok := o.ragService()
if !ok || o.charStore == nil {
return nil, nil
}
charDir := o.charStore.CharDir(characterID)
if charDir == "" {
return nil, nil
}
return svc.SearchRAG(ctx, inference.RAGSearchRequest{
CharacterID: characterID,
CharacterDir: charDir,
Query: query,
TopK: cfg.TopK,
MaxContextChars: cfg.MaxContextChars,
MinScore: cfg.MinScore,
})
}
func (o *Orchestrator) SearchKnowledge(ctx context.Context, characterID, query string) ([]inference.RAGSearchResult, error) {
return o.searchKnowledge(ctx, characterID, query)
}
func normalizedVisualInputConfig(cfg config.VisualInputConfig) config.VisualInputConfig {
if cfg.FrameIntervalMS == 0 {
cfg.FrameIntervalMS = 1000
}
if cfg.MaxWidth == 0 {
cfg.MaxWidth = 1280
}
if cfg.MaxHeight == 0 {
cfg.MaxHeight = 720
}
if cfg.MaxFrameBytes == 0 {
cfg.MaxFrameBytes = 512 * 1024
}
if cfg.MaxRecentFrames == 0 {
cfg.MaxRecentFrames = 2
}
if cfg.FrameTTLMS == 0 {
cfg.FrameTTLMS = 10000
}
return cfg
}
func (o *Orchestrator) visualInputConfig() config.VisualInputConfig {
if o == nil {
return normalizedVisualInputConfig(config.VisualInputConfig{})
}
return normalizedVisualInputConfig(o.pipelineCfg.VisualInput)
}
func qwenOmniVisualInputConfig(cfg config.VisualInputConfig) config.VisualInputConfig {
cfg = normalizedVisualInputConfig(cfg)
if cfg.FrameIntervalMS < 1000 {
cfg.FrameIntervalMS = 1000
}
if cfg.MaxFrameBytes <= 0 || cfg.MaxFrameBytes > qwenOmniMaxVisualFrameBytes {
cfg.MaxFrameBytes = qwenOmniMaxVisualFrameBytes
}
return cfg
}
func (o *Orchestrator) voiceLLMProviderForSession(session *Session) string {
if o.personaAgentEnabled(session) {
return "persona"
}
return o.characterVoiceLLMProviderForSession(session)
}
func (o *Orchestrator) characterVoiceLLMProviderForSession(session *Session) string {
if session == nil || session.Mode != ModeOmni {
return ""
}
if session.CharacterID == "" || o == nil || o.charStore == nil {
return voiceLLMProviderOrDefault("")
}
char, err := o.charStore.Get(session.CharacterID)
if err != nil {
log.Printf("voiceLLMProviderForSession: could not fetch character %s: %v", session.CharacterID, err)
return voiceLLMProviderOrDefault("")
}
return voiceLLMProviderOrDefault(char.VoiceProvider)
}
func (o *Orchestrator) personaAgentEnabled(session *Session) bool {
return o != nil && session != nil && session.Mode == ModeOmni
}
func (o *Orchestrator) sessionSupportsVisualInput(session *Session) bool {
if session == nil {
return false
}
if session.Mode == ModeStandard {
return true
}
return session.Mode == ModeOmni && o.characterVoiceLLMProviderForSession(session) == "qwen_omni"
}
func (o *Orchestrator) VisualInputConfigForSession(session *Session) (config.VisualInputConfig, bool) {
if !o.sessionSupportsVisualInput(session) {
return config.VisualInputConfig{}, false
}
cfg := o.visualInputConfig()
if session.Mode == ModeOmni {
cfg = qwenOmniVisualInputConfig(cfg)
}
return cfg, true
}
func (o *Orchestrator) globalSystemPrompt() string {
return strings.TrimSpace(standardGlobalSystemPrompt)
}
func validateVisualSource(source string) error {
switch source {
case "camera", "screen":
return nil
default:
return fmt.Errorf("invalid visual source")
}
}
func (o *Orchestrator) visualSession(sessionID string) (*Session, config.VisualInputConfig, error) {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return nil, config.VisualInputConfig{}, err
}
cfg, supported := o.VisualInputConfigForSession(session)
if !supported {
return nil, config.VisualInputConfig{}, ErrVisualInputUnsupported
}
if !cfg.IsEnabled() {
return nil, cfg, ErrVisualInputDisabled
}
return session, cfg, nil
}
func (o *Orchestrator) HandleVisualInputStart(sessionID string, source string) error {
if err := validateVisualSource(source); err != nil {
return err
}
session, _, err := o.visualSession(sessionID)
if err != nil {
return err
}
session.StartVisualInput(source)
return nil
}
func (o *Orchestrator) HandleVisualInputStop(sessionID string, source string) error {
if source != "" {
if err := validateVisualSource(source); err != nil {
return err
}
}
session, _, err := o.visualSession(sessionID)
if err != nil {
return err
}
session.StopVisualInput(source)
return nil
}
func (o *Orchestrator) HandleVisualFrame(sessionID string, msg ws.WSMessage) error {
if err := validateVisualSource(msg.Source); err != nil {
return err
}
session, cfg, err := o.visualSession(sessionID)
if err != nil {
return err
}
if msg.Mime != "image/jpeg" {
return fmt.Errorf("invalid visual frame mime")
}
if msg.Width <= 0 || msg.Height <= 0 || int(msg.Width) > cfg.MaxWidth || int(msg.Height) > cfg.MaxHeight {
return fmt.Errorf("invalid visual frame dimensions")
}
encoded := strings.TrimSpace(msg.Data)
if encoded == "" {
return fmt.Errorf("visual frame data is required")
}
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return fmt.Errorf("invalid visual frame data")
}
if len(decoded) == 0 || len(decoded) > cfg.MaxFrameBytes {
return fmt.Errorf("visual frame exceeds size limit")
}
if len(decoded) < 3 || decoded[0] != 0xff || decoded[1] != 0xd8 || decoded[2] != 0xff {
return fmt.Errorf("invalid visual frame jpeg data")
}
now := time.Now()
frame := VisualFrame{
Data: decoded,
MimeType: msg.Mime,
Width: msg.Width,
Height: msg.Height,
Source: msg.Source,
TimestampMS: msg.TimestampMS,
FrameSeq: msg.FrameSeq,
}
minInterval := time.Duration(cfg.FrameIntervalMS) * time.Millisecond
session.StoreVisualFrame(frame, cfg.MaxRecentFrames, minInterval, now)
return nil
}
func (o *Orchestrator) CheckVoice(ctx context.Context, provider string, voiceType string) (string, error) {
if o == nil || o.inference == nil {
return "", errors.New("inference service is not configured")
}
return o.inference.CheckVoice(ctx, inference.VoiceLLMSessionConfig{
Provider: voiceLLMProviderOrDefault(provider),
Voice: voiceType,
})
}
func (o *Orchestrator) AvatarInfo(ctx context.Context) (*pb.AvatarInfo, error) {
if o == nil || o.inference == nil {
return nil, errors.New("inference service is not configured")
}
return o.inference.AvatarInfo(ctx)
}
func (o *Orchestrator) idleVideoProfile() string {
return character.DefaultIdleVideoProfile
}
func (o *Orchestrator) idleVideoOutputSize(ctx context.Context) (int, int, error) {
info, err := o.AvatarInfo(ctx)
if err != nil {
return 0, 0, fmt.Errorf("get avatar info for idle video: %w", err)
}
width := int(info.GetOutputWidth())
height := int(info.GetOutputHeight())
if width <= 0 || height <= 0 {
return 0, 0, fmt.Errorf("invalid idle video output size: %dx%d", width, height)
}
return width, height, nil
}
func (o *Orchestrator) activeCharacterImage(characterID string) (*character.Character, string, error) {
if o == nil || o.charStore == nil {
return nil, "", errors.New("character store is not configured")
}
char, err := o.charStore.Get(characterID)
if err != nil {
return nil, "", err
}
if char.ActiveImage == "" {
return char, "", nil
}
return char, char.ActiveImage, nil
}
func normalizeImageFormat(imageFilename string) string {
ext := strings.TrimPrefix(strings.ToLower(filepath.Ext(imageFilename)), ".")
if ext == "" {
return "png"
}
if ext == "jpg" {
return "jpeg"
}
return ext
}
func buildDefaultAvatarPNG(width, height int) ([]byte, error) {
if width <= 0 {
width = 512
}
if height <= 0 {
height = 512
}
img := image.NewRGBA(image.Rect(0, 0, width, height))
draw.Draw(img, img.Bounds(), &image.Uniform{C: color.RGBA{128, 128, 128, 255}}, image.Point{}, draw.Src)
var buf bytes.Buffer
if err := png.Encode(&buf, img); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// AvatarSetupWarning converts avatar setup failures into a concise message
// suitable for browser console diagnostics.
func AvatarSetupWarning(err error) string {
if err == nil {
return ""
}
if warning, ok := AvatarImageTooLargeWarning(err); ok {
return warning
}
return fmt.Sprintf("角色头像设置失败,已使用默认头像:%v", err)
}
// AvatarImageTooLargeWarning reports whether a gRPC message-size failure was
// caused by an oversized avatar image.
func AvatarImageTooLargeWarning(err error) (string, bool) {
if err == nil {
return "", false
}
msg := strings.ToLower(err.Error())
if status.Code(err) == codes.ResourceExhausted && strings.Contains(msg, "message larger than max") {
return avatarImageMaxUploadHint, true
}
if strings.Contains(msg, "trying to send message larger than max") {
return avatarImageMaxUploadHint, true
}
return "", false
}
func (o *Orchestrator) setDefaultAvatarLocked(ctx context.Context, sessionID string) error {
if o == nil || o.inference == nil {
return errors.New("inference service is not configured")
}
width, height := 512, 512
if info, err := o.inference.AvatarInfo(ctx); err == nil && info != nil {
if int(info.GetOutputWidth()) > 0 {
width = int(info.GetOutputWidth())
}
if int(info.GetOutputHeight()) > 0 {
height = int(info.GetOutputHeight())
}
}
imageData, err := buildDefaultAvatarPNG(width, height)
if err != nil {
return fmt.Errorf("build default avatar image: %w", err)
}
if err := o.inference.SetAvatar(ctx, sessionID, imageData, "png"); err != nil {
return fmt.Errorf("set default avatar: %w", err)
}
return nil
}
func (o *Orchestrator) loadCharacterImage(characterID, imageFilename string) ([]byte, string, error) {
if o == nil || o.charStore == nil {
return nil, "", errors.New("character store is not configured")
}
if imageFilename == "" {
return nil, "", errors.New("active image is empty")
}
imgDir := o.charStore.ImagesDir(characterID)
if imgDir == "" {
return nil, "", fmt.Errorf("character images dir not found: %s", characterID)
}
path := filepath.Join(imgDir, filepath.Base(imageFilename))
data, err := os.ReadFile(path)
if err != nil {
return nil, "", fmt.Errorf("read character image %s: %w", path, err)
}
return data, normalizeImageFormat(imageFilename), nil
}
// buildTrailingSilence creates a 1.5-second silent PCM chunk (s16le mono)
// appended after TTS audio so the avatar can close its mouth before the idle switch.
func buildTrailingSilence(sampleRate int) *pb.AudioChunk {
if sampleRate <= 0 {
sampleRate = 16000
}
numSamples := sampleRate * 3 / 2 // 1.5 seconds
return &pb.AudioChunk{
Data: make([]byte, numSamples*2), // s16le: 2 bytes per sample
SampleRate: int32(sampleRate),
Channels: 1,
Format: "pcm_s16le",
IsFinal: true,
}
}
func buildIdleBreathingPCM(duration time.Duration, sampleRate int) []byte {
if sampleRate <= 0 {
sampleRate = 16000
}
totalSamples := int(math.Round(duration.Seconds() * float64(sampleRate)))
if totalSamples <= 0 {
return nil
}
out := make([]byte, totalSamples*2)
rng := rand.New(rand.NewSource(42))
fadeSamples := int(0.25 * float64(sampleRate))
for i := 0; i < totalSamples; i++ {
t := float64(i) / float64(sampleRate)
cyclePos := math.Mod(t, 3.8)
var env float64
switch {
case cyclePos < 1.1:
p := cyclePos / 1.1
env = 0.010 + 0.020*math.Sin(p*math.Pi/2)
case cyclePos < 1.5:
env = 0.028
case cyclePos < 3.0:
p := (cyclePos - 1.5) / 1.5
env = 0.030 + 0.020*math.Cos(p*math.Pi/2)
default:
env = 0.006
}
texture := 0.55*math.Sin(2*math.Pi*170*t) +
0.25*math.Sin(2*math.Pi*310*t+0.7) +
0.20*(rng.Float64()*2-1)
motion := 0.92 + 0.08*math.Sin(2*math.Pi*0.21*t+0.4)
sample := env * texture * motion
if fadeSamples > 0 {
if i < fadeSamples {
sample *= float64(i) / float64(fadeSamples)
} else if remain := totalSamples - i; remain < fadeSamples {
sample *= float64(remain) / float64(fadeSamples)
}
}
if sample > 0.95 {
sample = 0.95
}
if sample < -0.95 {
sample = -0.95
}
pcm := int16(sample * 32767)
binary.LittleEndian.PutUint16(out[i*2:], uint16(pcm))
}
return out
}
func fitPCMToVideoDuration(pcm []byte, sampleRate, frames, fps int) []byte {
if len(pcm) == 0 || sampleRate <= 0 || frames <= 0 || fps <= 0 {
return pcm
}
wantSamples := desiredSamplesForVideo(frames, fps, sampleRate)
if wantSamples <= 0 {
return pcm
}
wantBytes := wantSamples * 2
if len(pcm) == wantBytes {
return pcm
}
if len(pcm) > wantBytes {
return pcm[:wantBytes]
}
out := make([]byte, wantBytes)
copy(out, pcm)
return out
}
func audioChunkToPCM16(chunk *pb.AudioChunk) ([]byte, int) {
if chunk == nil || len(chunk.GetData()) == 0 {
return nil, 0
}
sampleRate := int(chunk.GetSampleRate())
format := strings.ToLower(strings.TrimSpace(chunk.GetFormat()))
data := chunk.GetData()
switch format {
case "float32", "f32", "pcm_f32le":
n := len(data) / 4
if n <= 0 {
return nil, sampleRate
}
out := make([]byte, n*2)
for i := 0; i < n; i++ {
v := math.Float32frombits(binary.LittleEndian.Uint32(data[i*4:]))
if math.IsNaN(float64(v)) || math.IsInf(float64(v), 0) {
v = 0
}
if v > 1 {
v = 1
} else if v < -1 {
v = -1
}
var sample int16
if v >= 0 {
sample = int16(v * 32767)
} else {
sample = int16(v * 32768)
}
binary.LittleEndian.PutUint16(out[i*2:], uint16(sample))
}
return out, sampleRate
default:
if len(data)%2 != 0 {
data = data[:len(data)-1]
}
if len(data) == 0 {
return nil, sampleRate
}
out := make([]byte, len(data))
copy(out, data)
return out, sampleRate
}
}
func (o *Orchestrator) setAvatarFromCharacterImage(ctx context.Context, sessionID, characterID, imageFilename string) error {
if o == nil || o.inference == nil {
return errors.New("inference service is not configured")
}
imageData, format, err := o.loadCharacterImage(characterID, imageFilename)
if err != nil {
return err
}
o.avatarMu.Lock()
defer o.avatarMu.Unlock()
if err := o.inference.SetAvatar(ctx, sessionID, imageData, format); err != nil {
if resetErr := o.setDefaultAvatarLocked(ctx, sessionID); resetErr != nil {
return fmt.Errorf("set avatar from image %q (%d bytes): %w; default avatar reset failed: %v", imageFilename, len(imageData), err, resetErr)
}
log.Printf("SetAvatar failed for image %q (%d bytes); reset inference avatar to default placeholder", imageFilename, len(imageData))
return fmt.Errorf("set avatar from image %q (%d bytes): %w", imageFilename, len(imageData), err)
}
return nil
}
// EnsureIdleVideo generates and caches the idle MP4 for the active image if missing.
func (o *Orchestrator) EnsureIdleVideo(ctx context.Context, characterID string) (string, error) {
if o == nil || o.charStore == nil {
return "", errors.New("character store is not configured")
}
if o.inference == nil {
return "", errors.New("inference service is not configured")
}
if !o.AvatarEnabled() {
return "", errors.New("avatar inference is disabled")
}
_, imageFilename, err := o.activeCharacterImage(characterID)
if err != nil || imageFilename == "" {
return "", err
}
profile := o.idleVideoProfile()
targetWidth, targetHeight, err := o.idleVideoOutputSize(ctx)
if err != nil {
return "", err
}
sizeDir := o.charStore.IdleVideosForSizeDir(characterID, imageFilename, targetWidth, targetHeight)
if sizeDir == "" {
return "", fmt.Errorf("idle video dir unavailable for character %s", characterID)
}
files, err := o.charStore.ListIdleVideos(characterID, imageFilename, targetWidth, targetHeight)
if err == nil && len(files) > 0 {
return filepath.Join(sizeDir, files[0]), nil
}
outPath := o.charStore.IdleVideoPath(characterID, imageFilename, profile, targetWidth, targetHeight)
if outPath == "" {
return "", fmt.Errorf("idle video path unavailable for character %s", characterID)
}
imageData, format, err := o.loadCharacterImage(characterID, imageFilename)
if err != nil {
return "", err
}
const (
idleDuration = 10 * time.Second
idleSampleRate = 16000
idleCRF = 23
)
pcm := buildIdleBreathingPCM(idleDuration, idleSampleRate)
audioChunk := &pb.AudioChunk{
Data: pcm,
SampleRate: idleSampleRate,
Channels: 1,
Format: "pcm_s16le",
IsFinal: true,
}
// Hold the mutex for the entire generation cycle (SetAvatar + GenerateAvatar
// + frame collection) so that a concurrent SetupSession or another
// EnsureIdleVideo call cannot change the inference server's avatar state
// while we are still collecting frames.
o.avatarMu.Lock()
defer o.avatarMu.Unlock()
jobID := fmt.Sprintf("idle-%s-%d", characterID, time.Now().UnixNano())
if err := o.inference.SetAvatar(ctx, jobID, imageData, format); err != nil {
if resetErr := o.setDefaultAvatarLocked(ctx, jobID); resetErr != nil {
log.Printf("EnsureIdleVideo: failed to reset default avatar after SetAvatar failure for character %s image=%s: %v", characterID, imageFilename, resetErr)
}
return "", fmt.Errorf("set avatar for idle video from image %q (%d bytes): %w", imageFilename, len(imageData), err)
}
videoCh, errCh := o.inference.GenerateAvatar(ctx, []*pb.AudioChunk{audioChunk})
rgbChunks := make([][]byte, 0, 8)
width, height, fps, totalFrames := 0, 0, 25, 0
loop:
for {
select {
case chunk, ok := <-videoCh:
if !ok {
break loop
}
if chunk == nil || len(chunk.Data) == 0 {
continue
}
if width == 0 {
width = int(chunk.Width)
height = int(chunk.Height)
if int(chunk.Fps) > 0 {
fps = int(chunk.Fps)
}
}
totalFrames += int(chunk.NumFrames)
rgbCopy := make([]byte, len(chunk.Data))
copy(rgbCopy, chunk.Data)
rgbChunks = append(rgbChunks, rgbCopy)
case genErr := <-errCh:
if genErr != nil {
// Drain videoCh so the gRPC stream can close cleanly.
for range videoCh {
}
return "", fmt.Errorf("generate idle avatar video: %w", genErr)
}
}
}
// Drain errCh after videoCh closes in case an error arrived concurrently.
select {
case genErr := <-errCh:
if genErr != nil {
return "", fmt.Errorf("generate idle avatar video: %w", genErr)
}
default:
}
if len(rgbChunks) == 0 || width <= 0 || height <= 0 || totalFrames <= 0 {
return "", errors.New("idle avatar generation produced no video frames")
}
if width != targetWidth || height != targetHeight {
sizeDir = o.charStore.IdleVideosForSizeDir(characterID, imageFilename, width, height)
if sizeDir == "" {
return "", fmt.Errorf("idle video dir unavailable for character %s", characterID)
}
outPath = o.charStore.IdleVideoPath(characterID, imageFilename, profile, width, height)
if outPath == "" {
return "", fmt.Errorf("idle video path unavailable for character %s", characterID)
}
}
pcm = fitPCMToVideoDuration(pcm, idleSampleRate, totalFrames, fps)
if err := recording.EncodeRGB24ToMP4(outPath, width, height, fps, rgbChunks, pcm, idleSampleRate, idleCRF); err != nil {
return "", fmt.Errorf("encode idle avatar mp4: %w", err)
}
return outPath, nil
}
// SetupSession creates a media peer (DirectPeer or LiveKit Bot) and prepares for streaming.
// When roomMgr is nil (direct mode), a DirectPeer is created instead of a LiveKit Bot.
func (o *Orchestrator) SetupSession(ctx context.Context, session *Session, roomMgr *livekit.RoomManager) (mediapeer.MediaPeer, []string, error) {
warnings := []string{}
// Best-effort: apply the character's active avatar image when realtime
// avatar inference is enabled. Pure voice sessions keep cached idle videos
// but do not touch the avatar model.
if o.AvatarEnabled() && session != nil && session.CharacterID != "" {
_, imageFilename, err := o.activeCharacterImage(session.CharacterID)
if err != nil {
log.Printf("SetupSession: could not resolve active image for character %s: %v", session.CharacterID, err)
} else if imageFilename != "" {
if err := o.setAvatarFromCharacterImage(ctx, session.ID, session.CharacterID, imageFilename); err != nil {
warning := AvatarSetupWarning(err)
warnings = append(warnings, warning)
log.Printf("SetupSession: %s character=%s image=%s details=%v", warning, session.CharacterID, imageFilename, err)
}
}
}
var peer mediapeer.MediaPeer
if o.streamingMode == "livekit" {
// LiveKit SFU mode
roomName := livekit.RoomName(session.ID)
if err := roomMgr.CreateRoom(ctx, roomName); err != nil {
return nil, warnings, err
}
bot := livekit.NewBot(
roomMgr.URL(),
roomMgr.APIKey(),
roomMgr.APISecret(),
roomName,
)
if err := bot.Connect(ctx); err != nil {
return nil, warnings, err
}
peer = bot
} else {
// Direct P2P WebRTC mode
signalingFn := func(sessionID string, msg map[string]any) {
o.broadcastJSON(sessionID, msg)
}
iceServers := make([]webrtc.ICEServer, 0, len(o.pipelineCfg.ICEServers))
for _, s := range o.pipelineCfg.ICEServers {
iceServers = append(iceServers, webrtc.ICEServer{
URLs: s.URLs,
Username: s.Username,
Credential: s.Credential,
})
}
dp := direct.NewDirectPeer(session.ID, signalingFn, iceServers, o.webrtcAPI, o.estimatorCh)
if err := dp.Connect(ctx); err != nil {
return nil, warnings, err
}
peer = dp
o.mu.Lock()
o.directPeers[session.ID] = dp
o.mu.Unlock()
}
// Use a detached context for the AV pipeline so it outlives the HTTP
// request / setup timeout that ctx may be derived from.
peer.StartAVPipeline(context.Background())
o.mu.Lock()
o.peers[session.ID] = peer
o.mu.Unlock()
session.SetState(StateConnected)
return peer, warnings, nil
}
func (o *Orchestrator) stopPipelineAndWait(session *Session, sessionID string, interruptVoice bool) {
if interruptVoice && session.Mode == ModeOmni && o.inference != nil {
if err := o.inference.Interrupt(context.Background(), sessionID); err != nil {
log.Printf("Failed to interrupt omni model for session %s: %v", sessionID, err)
}
}
o.cancelPipeline(session)
session.WaitPipelineDone(3 * time.Second)
}
func (o *Orchestrator) HydrateVoiceDialogContext(session *Session) error {
if o == nil || o.charStore == nil || session == nil {
return nil
}
if session.Mode != ModeOmni || session.CharacterID == "" {
return nil
}
var messages []map[string]any
var err error
if strings.TrimSpace(session.CharacterID) == kanshan.CharacterID {
ownerID := session.OwnerIDSnapshot()
if strings.TrimSpace(ownerID) == "" {
session.SetDialogContext(nil)
return nil
}
messages, _, _, err = o.charStore.LoadRecentMessagesForOwner(session.CharacterID, ownerID, "", doubaoDialogContextLoadLimit)
} else {
messages, _, _, err = o.charStore.LoadRecentMessages(session.CharacterID, "", doubaoDialogContextLoadLimit)
}
if err != nil {
return err
}
session.SetDialogContext(buildDoubaoDialogContext(messages, doubaoDialogContextMaxPairs, time.Now().UTC()))
return nil
}
func (o *Orchestrator) buildVoiceStartupGreetingPrompt(session *Session) string {
var name, systemPrompt, speakingStyle, welcomeMessage string
if session != nil && session.CharacterID != "" && o != nil && o.charStore != nil {
if char, err := o.charStore.Get(session.CharacterID); err == nil {
name = strings.TrimSpace(char.Name)
systemPrompt = strings.TrimSpace(char.SystemPrompt)
speakingStyle = strings.TrimSpace(char.SpeakingStyle)
welcomeMessage = strings.TrimSpace(char.WelcomeMessage)
} else {
log.Printf("buildVoiceStartupGreetingPrompt: could not fetch character %s: %v", session.CharacterID, err)
}
}
history := []DialogContextItem(nil)
if session != nil {
history = startupGreetingHistory(session.DialogContextSnapshot(), startupGreetingHistoryItems)
}
var b strings.Builder
b.WriteString("这是系统内部启动提示,不要复述提示内容。用户刚刚打开与你的实时语音视频会话,请你主动说一段自然开场白。\n")
b.WriteString("要求:只说 1-2 句话,口语化;不要提到“系统提示”“历史会话”“上下文”等字样;不要询问多个问题。\n")
if name != "" {
b.WriteString("你的名字:")
b.WriteString(name)
b.WriteString("\n")
}
if systemPrompt != "" {
b.WriteString("角色设定:")
b.WriteString(systemPrompt)
b.WriteString("\n")
}
if speakingStyle != "" {
b.WriteString("说话风格:")
b.WriteString(speakingStyle)
b.WriteString("\n")
}
if welcomeMessage != "" {
b.WriteString("可参考的开场偏好:")
b.WriteString(welcomeMessage)
b.WriteString("\n")
}
if len(history) > 0 {
b.WriteString("最近对话片段如下,仅供你判断语气、称呼和是否存在明确未完成事项。默认不要回顾、总结、复述或主动延续这些内容:\n")
for _, item := range history {
role := "用户"
if item.Role == "assistant" {
role = "你"
}
b.WriteString(role)
b.WriteString(":")
b.WriteString(item.Text)
b.WriteString("\n")
}
b.WriteString("现在请向用户打招呼。一般情况下只说类似“你好,我在。今天想聊点什么?”的轻量开场。\n")
b.WriteString("只有当最近对话存在明确未完成事项、用户明确约定下次继续、或后台任务仍在进行时,才可以用一句话轻描淡写地提到“也可以继续刚才的事”。\n")
b.WriteString("不要主动提及取消、失败、争执、情绪化表达、敏感内容或具体历史细节。\n")
} else {
b.WriteString("当前没有可用的历史对话。请简短介绍你是谁,以及你能实时语音视频聊天、回答问题、陪伴交流;如果用户需要,也能帮忙查询、调研、整理资料或生成报告和网页。\n")
}
return strings.TrimSpace(b.String())
}
func (o *Orchestrator) buildVoiceLLMSessionConfig(session *Session, sessionID string) inference.VoiceLLMSessionConfig {
return o.buildVoiceLLMSessionConfigExcludingTurn(session, sessionID, 0)
}
func (o *Orchestrator) buildVoiceLLMSessionConfigExcludingTurn(session *Session, sessionID string, excludeTurnSeq uint64) inference.VoiceLLMSessionConfig {
voiceConfig := inference.VoiceLLMSessionConfig{SessionID: sessionID}
if session.CharacterID != "" && o.charStore != nil {
if char, err := o.charStore.Get(session.CharacterID); err == nil {
voiceConfig.CharacterID = session.CharacterID
voiceConfig.CharacterDir = o.charStore.CharDir(session.CharacterID)
voiceConfig.Provider = voiceLLMProviderOrDefault(char.VoiceProvider)
if o.personaAgentEnabled(session) {
voiceConfig.Provider = "persona"
}
voiceConfig.SystemPrompt = char.SystemPrompt
voiceConfig.Voice = char.VoiceType
voiceConfig.BotName = char.Name
voiceConfig.SpeakingStyle = char.SpeakingStyle
} else {
log.Printf("buildVoiceLLMSessionConfig: could not fetch character %s: %v", session.CharacterID, err)
}
}
if o.personaAgentEnabled(session) {
voiceConfig.Provider = "persona"
}
for _, item := range buildVoiceDialogContextFromSession(session, excludeTurnSeq, doubaoDialogContextMaxPairs, time.Now().UTC()) {
voiceConfig.DialogContext = append(voiceConfig.DialogContext, inference.VoiceLLMDialogContextItem{
Role: item.Role,
Text: item.Text,
Timestamp: item.Timestamp,
})
}
return voiceConfig
}
func (o *Orchestrator) buildVoiceStartupGreetingSessionConfig(session *Session, sessionID string) inference.VoiceLLMSessionConfig {
voiceConfig := o.buildVoiceLLMSessionConfig(session, sessionID)
voiceConfig.Provider = o.characterVoiceLLMProviderForSession(session)
voiceConfig.WelcomeMessage = ""
return voiceConfig
}
func voiceLLMProviderOrDefault(provider string) string {
provider = strings.ToLower(strings.TrimSpace(provider))
switch provider {
case "persona":
return "persona"
case "qwen_omni":
return "qwen_omni"
default:
return "doubao"
}
}
func modeStringForLog(mode PipelineMode) string {
if mode == ModeStandard {
return "standard"
}
return "omni"
}
func (o *Orchestrator) standardComponentDefaults() character.Components {
defaults := character.Components{LLM: "qwen", ASR: "qwen", TTS: "qwen"}
if o.pipelineCfg.DefaultLLM != "" {
defaults.LLM = o.pipelineCfg.DefaultLLM
}
if o.pipelineCfg.DefaultASR != "" {
defaults.ASR = o.pipelineCfg.DefaultASR
}
if o.pipelineCfg.DefaultTTS != "" {
defaults.TTS = o.pipelineCfg.DefaultTTS
}
return defaults
}
func (o *Orchestrator) standardCharacterConfig(session *Session) (character.Components, string, string, string) {
components := character.NormalizeComponents(character.Components{}, o.standardComponentDefaults())
voice := ""
speakingStyle := ""
language := ""
if session.CharacterID != "" && o.charStore != nil {
if char, err := o.charStore.Get(session.CharacterID); err == nil {
components = character.NormalizeComponents(char.Components, components)
voice = strings.TrimSpace(char.VoiceType)
speakingStyle = strings.TrimSpace(char.SpeakingStyle)
} else {
log.Printf("standardCharacterConfig: could not fetch character %s: %v", session.CharacterID, err)
}
}
if voice == "" && components.TTS == "qwen" {
voice = "Momo"
}
return components, voice, speakingStyle, language
}
func (o *Orchestrator) standardSystemPrompt(session *Session) string {
if session.CharacterID == "" || o.charStore == nil {
return composeSystemPrompt(o.globalSystemPrompt(), "")
}
char, err := o.charStore.Get(session.CharacterID)
if err != nil {
log.Printf("standardSystemPrompt: could not fetch character %s: %v", session.CharacterID, err)
return composeSystemPrompt(o.globalSystemPrompt(), "")
}
return composeSystemPrompt(o.globalSystemPrompt(), characterSystemPrompt(char, true, true))
}
func appendRAGContext(rolePrompt string, ragContext string) string {
ragContext = strings.TrimSpace(ragContext)
if ragContext == "" {
return rolePrompt
}
rolePrompt = strings.TrimSpace(rolePrompt)
if rolePrompt == "" {
return ragContext
}
return rolePrompt + "\n\n" + ragContext
}
func formatRAGContext(results []inference.RAGSearchResult) string {
if len(results) == 0 {
return ""
}
var b strings.Builder
b.WriteString("【角色素材检索结果】\n")
b.WriteString("以下内容来自该角色导入的知识、文档或人物生平素材。只在与用户问题相关时使用;不要编造素材中没有的事实。\n")
for i, item := range results {
content := strings.TrimSpace(item.Content)
if content == "" {
continue
}
title := strings.TrimSpace(item.Title)
if title == "" {
title = strings.TrimSpace(item.Filename)
}
if title == "" {
title = "未命名素材"
}
b.WriteString(fmt.Sprintf("[%d] %s\n%s\n", i+1, title, content))
}
return strings.TrimSpace(b.String())
}
func (o *Orchestrator) standardSystemPromptWithRAG(session *Session, ragContext string) string {
if session.CharacterID == "" || o.charStore == nil {
return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext("", ragContext))
}
char, err := o.charStore.Get(session.CharacterID)
if err != nil {
log.Printf("standardSystemPrompt: could not fetch character %s: %v", session.CharacterID, err)
return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext("", ragContext))
}
return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext(characterSystemPrompt(char, true, true), ragContext))
}
func latestUserText(history []ChatMessage) string {
for i := len(history) - 1; i >= 0; i-- {
if history[i].Role == "user" {
return strings.TrimSpace(history[i].Content)
}
}
return ""
}
func wrapVoiceAudioInput(ctx context.Context, audioCh <-chan []byte) <-chan inference.VoiceLLMInputEvent {
inputCh := make(chan inference.VoiceLLMInputEvent, 64)
go func() {
defer close(inputCh)
for {
select {
case <-ctx.Done():
return
case data, ok := <-audioCh:
if !ok {
return
}
if len(data) == 0 {
continue
}
select {
case inputCh <- inference.VoiceLLMInputEvent{Audio: data}:
case <-ctx.Done():
return
}
}
}
}()
return inputCh
}
func wrapVoiceMultimodalInput(
ctx context.Context,
audioCh <-chan []byte,
frameCh <-chan VisualFrame,
unsubscribe func(),
initialFrames []VisualFrame,
) <-chan inference.VoiceLLMInputEvent {
inputCh := make(chan inference.VoiceLLMInputEvent, 64)
go func() {
defer close(inputCh)
if unsubscribe != nil {
defer unsubscribe()
}
for _, frame := range initialFrames {
select {
case inputCh <- inference.VoiceLLMInputEvent{
Image: &inference.ImageFrame{
Data: frame.Data,
MimeType: frame.MimeType,
Width: frame.Width,
Height: frame.Height,
Source: frame.Source,
TimestampMS: frame.TimestampMS,
FrameSeq: frame.FrameSeq,
},
}:
case <-ctx.Done():
return
}
}
for {
select {
case <-ctx.Done():
return
case data, ok := <-audioCh:
if !ok {
return
}
if len(data) == 0 {
continue
}
select {
case inputCh <- inference.VoiceLLMInputEvent{Audio: data}:
case <-ctx.Done():
return
}
case frame, ok := <-frameCh:
if !ok {
frameCh = nil
continue
}
select {
case inputCh <- inference.VoiceLLMInputEvent{
Image: &inference.ImageFrame{
Data: frame.Data,
MimeType: frame.MimeType,
Width: frame.Width,
Height: frame.Height,
Source: frame.Source,
TimestampMS: frame.TimestampMS,
FrameSeq: frame.FrameSeq,
},
}:
case <-ctx.Done():
return
}
}
}
}()
return inputCh
}
func singleVoiceTextInput(text string) <-chan inference.VoiceLLMInputEvent {
inputCh := make(chan inference.VoiceLLMInputEvent, 1)
inputCh <- inference.VoiceLLMInputEvent{Text: text}
close(inputCh)
return inputCh
}
func drainUserAudio(audioCh <-chan []byte, maxDrain int) {
for i := 0; i < maxDrain; i++ {
select {
case <-audioCh:
default:
return
}
}
}
func (o *Orchestrator) resumeVoiceAudioStream(sessionID string) error {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
if session.Mode != ModeOmni {
return nil
}
o.mu.RLock()
peer := o.peers[sessionID]
o.mu.RUnlock()
if peer == nil {
return errors.New("media peer not found")
}
audioCh := peer.SubscribeUserAudio()
drainUserAudio(audioCh, 256)
return o.HandleAudioStream(context.Background(), sessionID, audioCh)
}
func (o *Orchestrator) handleStandardTextInput(ctx context.Context, session *Session, sessionID string, text string) error {
o.stopPipelineAndWait(session, sessionID, false)
pipeCtx, cancel := context.WithCancel(ctx)
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq})
pipelineSeq := session.MarkPipelineRunning()
go o.runStandardPipeline(pipeCtx, session, sessionID, pipelineSeq, turnSeq)
return nil
}
func (o *Orchestrator) handleVoiceLLMTextInput(ctx context.Context, session *Session, sessionID string, text string) error {
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
if o.inference != nil {
if err := o.inference.Interrupt(context.Background(), sessionID); err != nil {
log.Printf("Failed to interrupt omni model for session %s: %v", sessionID, err)
}
}
o.cancelPipeline(session)
pipeCtx, cancel := context.WithCancel(ctx)
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq})
session.SetState(StateProcessing)
o.broadcastStatusTurn(sessionID, "processing", turnSeq)
pipelineSeq := session.MarkPipelineRunning()
inputCh := singleVoiceTextInput(text)
go func(seq uint64) {
o.runVoiceLLMPipeline(pipeCtx, session, sessionID, inputCh, seq, turnSeq)
if pipeCtx.Err() != nil || !session.IsCurrentPipeline(seq) {
return
}
if err := o.resumeVoiceAudioStream(sessionID); err != nil {
log.Printf("Failed to resume omni audio stream for session %s: %v", sessionID, err)
}
}(pipelineSeq)
return nil
}
func (o *Orchestrator) HandleTaskEvent(task *agenttask.Task, event *agenttask.Event) {
if o == nil || task == nil || event == nil {
return
}
if event.Status == agenttask.StatusCompleted {
msg := strings.TrimSpace(event.Message)
if msg == "" {
msg = "任务已经完成,我把整理好的资料放在聊天里了。"
} else {
msg = "任务已经完成。" + msg
}
_ = o.SpeakAssistantText(context.Background(), task.SessionID, msg, true)
return
}
if event.Status == agenttask.StatusFailed {
msg := strings.TrimSpace(event.Message)
if msg == "" {
msg = "后台任务失败了,我暂时拿不到结果。"
} else {
msg = "后台任务失败了:" + msg
}
_ = o.SpeakAssistantText(context.Background(), task.SessionID, msg, true)
}
}
func taskEventBroadcastPayload(task *agenttask.Task, event *agenttask.Event) map[string]any {
payload := map[string]any{
"type": "task_event",
"task_id": task.ID,
"session_id": task.SessionID,
"seq": event.Seq,
"event_type": event.EventType,
"status": event.Status,
"message": event.Message,
"progress": event.Progress,
"created_at": event.CreatedAt,
"task": task,
}
if strings.TrimSpace(string(event.Payload)) != "" {
var decoded any
if err := json.Unmarshal(event.Payload, &decoded); err == nil && decoded != nil {
payload["payload"] = decoded
}
}
return payload
}
func fallbackPersonaTaskEventPayload(sessionID string, payload map[string]any) map[string]any {
if payload == nil {
payload = map[string]any{}
}
payload["type"] = "task_event"
if stringValue(payload["session_id"]) == "" {
payload["session_id"] = sessionID
}
return payload
}
func sanitizePersonaArtifactPayload(payload map[string]any) map[string]any {
sanitized := make(map[string]any, len(payload))
for key, value := range payload {
if key == "content" {
continue
}
sanitized[key] = value
}
return sanitized
}
func (o *Orchestrator) persistPersonaArtifactEvent(ctx context.Context, store *agenttask.Store, taskID string, eventPayload map[string]any) map[string]any {
if eventPayload == nil {
return nil
}
sanitized := sanitizePersonaArtifactPayload(eventPayload)
content := stringValue(eventPayload["content"])
if content == "" {
return sanitized
}
artifactID := stringValue(eventPayload["artifact_id"])
if artifactID == "" {
artifactID = stringValue(eventPayload["id"])
}
if artifactID != "" {
if artifact, _, err := store.GetArtifact(ctx, taskID, artifactID); err == nil {
sanitized["artifact_id"] = artifact.ID
sanitized["title"] = artifact.Title
sanitized["type"] = artifact.Type
sanitized["mime_type"] = artifact.MimeType
return sanitized
} else if !errors.Is(err, agenttask.ErrNotFound) {
log.Printf("persona task artifact lookup failed task=%s artifact=%s: %v", taskID, artifactID, err)
}
}
artifactType := stringValue(eventPayload["type"])
if artifactType == "" {
artifactType = "html"
}
mimeType := stringValue(eventPayload["mime_type"])
if mimeType == "" && strings.Contains(strings.ToLower(artifactType), "html") {
mimeType = "text/html; charset=utf-8"
}
title := stringValue(eventPayload["title"])
if title == "" {
title = "任务产物"
}
metadata, _ := json.Marshal(mapValue(eventPayload["metadata"]))
artifact, err := store.CreateArtifact(ctx, taskID, agenttask.CreateArtifactInput{
ID: artifactID,
Type: artifactType,
Title: title,
MimeType: mimeType,
Content: content,
Metadata: metadata,
})
if err != nil {
log.Printf("persona task artifact persist failed task=%s artifact=%s: %v", taskID, artifactID, err)
return sanitized
}
sanitized["artifact_id"] = artifact.ID
sanitized["title"] = artifact.Title
sanitized["type"] = artifact.Type
sanitized["mime_type"] = artifact.MimeType
return sanitized
}
func (o *Orchestrator) persistPersonaTaskEvent(ctx context.Context, session *Session, sessionID string, payload map[string]any) map[string]any {
fallback := fallbackPersonaTaskEventPayload(sessionID, payload)
if o == nil || o.taskService == nil || o.taskService.Store() == nil {
return fallback
}
taskPayload := mapValue(payload["task"])
taskID := stringValue(payload["task_id"])
if taskID == "" {
taskID = stringValue(taskPayload["id"])
}
if taskID == "" {
return fallback
}
store := o.taskService.Store()
ownerID := ""
if session != nil && strings.TrimSpace(session.CharacterID) == kanshan.CharacterID {
ownerID = session.OwnerIDSnapshot()
}
task, err := store.GetTask(ctx, taskID)
if errors.Is(err, agenttask.ErrNotFound) {
taskSessionID := stringValue(payload["session_id"])
if taskSessionID == "" {
taskSessionID = stringValue(taskPayload["session_id"])
}
if taskSessionID == "" {
taskSessionID = sessionID
}
characterID := stringValue(taskPayload["character_id"])
if characterID == "" && session != nil {
characterID = session.CharacterID
}
title := stringValue(taskPayload["title"])
userRequest := stringValue(taskPayload["user_request"])
if userRequest == "" {
userRequest = title
}
if userRequest == "" {
userRequest = stringValue(payload["message"])
}
if userRequest == "" {
userRequest = "后台任务"
}
kind := stringValue(taskPayload["kind"])
if kind == "" {
kind = "research"
}
task, err = store.CreateTask(ctx, agenttask.CreateTaskInput{
ID: taskID,
SessionID: taskSessionID,
CharacterID: characterID,
OwnerID: ownerID,
Kind: kind,
Title: title,
UserRequest: userRequest,
})
}
if err != nil {
log.Printf("persona task persist failed task=%s: %v", taskID, err)
return fallback
}
if ownerID != "" && task.OwnerID != "" && task.OwnerID != ownerID {
log.Printf("persona task owner mismatch task=%s session=%s", taskID, sessionID)
return fallback
}
eventType := stringValue(payload["event_type"])
eventPayload := mapValue(payload["payload"])
if eventType == "artifact.created" {
eventPayload = o.persistPersonaArtifactEvent(ctx, store, task.ID, eventPayload)
}
var rawPayload json.RawMessage
if len(eventPayload) > 0 {
if raw, err := json.Marshal(eventPayload); err == nil {
rawPayload = raw
}
}
event, updated, err := store.AppendEvent(ctx, task.ID, agenttask.AppendEventInput{
EventType: eventType,
Status: taskStatusValue(payload["status"], task.Status),
Message: stringValue(payload["message"]),
Progress: intValue(payload["progress"], intValue(taskPayload["progress"], task.Progress)),
Payload: rawPayload,
})
if err != nil {
log.Printf("persona task event persist failed task=%s event=%s: %v", task.ID, eventType, err)
return fallback
}
return taskEventBroadcastPayload(updated, event)
}
// HandleTextInput processes a text message through either the standard
// LLM→TTS→Avatar pipeline or the omni text-query path.
func (o *Orchestrator) HandleTextInput(ctx context.Context, sessionID string, text string) error {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
text = strings.TrimSpace(text)
if text == "" {
return nil
}
if session.Mode == ModeOmni {
return o.handleVoiceLLMTextInput(ctx, session, sessionID, text)
}
return o.handleStandardTextInput(ctx, session, sessionID, text)
}
// runStandardPipeline executes: LLM → sentence detection → TTS → Avatar.
func (o *Orchestrator) runStandardPipeline(ctx context.Context, session *Session, sessionID string, pipelineSeq uint64, turnSeq uint64) {
var fullResponseCh chan string // set below; read in defer to store assistant message
recSessionDir := ""
recTurnID := "turn" + strconv.FormatUint(turnSeq, 10)
var recMu sync.Mutex
var recAudioBuf []byte
var recAudioSR int
var turnRec *recording.TurnRecording
var recFinished bool
if o.recorder != nil {
recSessionDir = o.sessionRecordingDir(session)
}
appendRecAudio := func(pcm []byte, sampleRate int) {
if o.recorder == nil || len(pcm) == 0 || sampleRate <= 0 {
return
}
pcmCopy := append([]byte(nil), pcm...)
recMu.Lock()
if recAudioSR == 0 {
recAudioSR = sampleRate
}
recAudioBuf = append(recAudioBuf, pcmCopy...)
activeRec := turnRec
if activeRec != nil && !recFinished {
activeRec.WriteAudioChunk(pcmCopy, sampleRate)
}
recMu.Unlock()
}
beginRec := func(width, height, fps int) {
if o.recorder == nil || width <= 0 || height <= 0 || fps <= 0 {
return
}
recMu.Lock()
if turnRec != nil || recFinished {
recMu.Unlock()
return
}
activeRec := o.recorder.BeginTurn(recSessionDir, recTurnID, width, height, fps)
turnRec = activeRec
audioCopy := append([]byte(nil), recAudioBuf...)
audioSR := recAudioSR
if activeRec != nil && len(audioCopy) > 0 && audioSR > 0 {
activeRec.WriteAudioChunk(audioCopy, audioSR)
}
recMu.Unlock()
}
writeRecVideo := func(rgb []byte) {
if o.recorder == nil || len(rgb) == 0 {
return
}
recMu.Lock()
activeRec := turnRec
if activeRec != nil && !recFinished {
activeRec.WriteVideoChunk(rgb)
}
recMu.Unlock()
}
finishRec := func() {
if o.recorder == nil {
return
}
recMu.Lock()
if recFinished {
recMu.Unlock()
return
}
activeRec := turnRec
turnRec = nil
recFinished = true
recMu.Unlock()
if activeRec != nil {
_ = activeRec.Finish()
}
}
defer func() {
// Store assistant message in session history
assistantResp := ""
if fullResponseCh != nil {
if resp, ok := <-fullResponseCh; ok && resp != "" {
assistantResp = resp
session.AddMessage(ChatMessage{Role: "assistant", Content: resp, TurnSeq: turnSeq})
if _, err := o.persistSessionConversation(session); err != nil {
log.Printf("conversation: SaveConversation error session=%s: %v", sessionID, err)
}
}
}
if o.recorder != nil {
recMu.Lock()
audioCopy := append([]byte(nil), recAudioBuf...)
audioSR := recAudioSR
recMu.Unlock()
if len(audioCopy) > 0 && audioSR > 0 {
if err := o.recorder.SaveRawAudio(recSessionDir, recTurnID, audioCopy, audioSR); err != nil {
log.Printf("recording: SaveRawAudio error session=%s turn=%s: %v", sessionID, recTurnID, err)
}
}
if strings.TrimSpace(assistantResp) != "" {
if err := o.recorder.SaveTranscript(recSessionDir, recTurnID, assistantResp); err != nil {
log.Printf("recording: SaveTranscript error session=%s turn=%s: %v", sessionID, recTurnID, err)
}
}
}
session.MarkPipelineFinished(pipelineSeq)
session.SetState(StateListening)
o.broadcastStatus(sessionID, "idle")
}()
session.SetState(StateProcessing)
o.broadcastStatus(sessionID, "processing")
pipelineStart := time.Now()
components, voice, speakingStyle, language := o.standardCharacterConfig(session)
// Prepare LLM messages
history := session.HistorySnapshot()
messages := make([]inference.ChatMessage, 0, len(history)+1)
ragContext := ""
if query := latestUserText(history); query != "" {
results, err := o.searchKnowledge(ctx, session.CharacterID, query)
if err != nil {
log.Printf("RAG search failed for session %s character %s: %v", sessionID, session.CharacterID, err)
} else {
ragContext = formatRAGContext(results)
}
}
if systemPrompt := o.standardSystemPromptWithRAG(session, ragContext); systemPrompt != "" {
messages = append(messages, inference.ChatMessage{Role: "system", Content: systemPrompt})
}
for _, m := range history {
messages = append(messages, inference.ChatMessage{Role: m.Role, Content: m.Content})
}
visualCfg := o.visualInputConfig()
visualFrames := session.LatestVisualFrames(time.Now(), time.Duration(visualCfg.FrameTTLMS)*time.Millisecond)
if len(visualFrames) > 0 {
images := make([]inference.ImageFrame, 0, len(visualFrames))
for _, frame := range visualFrames {
images = append(images, inference.ImageFrame{
Data: frame.Data,
MimeType: frame.MimeType,
Width: frame.Width,
Height: frame.Height,
Source: frame.Source,
TimestampMS: frame.TimestampMS,
FrameSeq: frame.FrameSeq,
})
}
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == "user" {
messages[i].Images = images
break
}
}
}
// 1. Start LLM stream
llmCh, llmErrCh := o.inference.GenerateLLMStream(ctx, sessionID, messages, inference.LLMConfig{
Temperature: 0.7,
Provider: components.LLM,
})
// 2. Collect the full LLM response, then feed it to TTS once.
textCh := make(chan string, 1)
fullResponseCh = make(chan string, 1) // captures full LLM response for history
go func() {
defer close(textCh)
var tokenBuffer strings.Builder
var fullResponse string
finalText := func() string {
if text := strings.TrimSpace(fullResponse); text != "" {
return text
}
return strings.TrimSpace(tokenBuffer.String())
}
finish := func(sendTTS bool) {
text := finalText()
if sendTTS && text != "" {
select {
case textCh <- text:
case <-ctx.Done():
}
}
if text != "" {
fullResponseCh <- text
}
close(fullResponseCh)
}
for {
select {
case <-ctx.Done():
finish(false)
return
case chunk, ok := <-llmCh:
if !ok {
finish(true)
return
}
// Broadcast LLM token to WebSocket
o.broadcastJSON(sessionID, map[string]any{
"type": "llm_token",
"token": chunk.Token,
"accumulated": chunk.AccumulatedText,
"is_final": chunk.IsFinal,
"turn_seq": turnSeq,
})
tokenBuffer.WriteString(chunk.Token)
// Track the latest accumulated text. A normally closed errCh
// can race with the final chunk, so keep this current even
// before the explicit final marker arrives.
if chunk.AccumulatedText != "" {
fullResponse = chunk.AccumulatedText
}
case err, ok := <-llmErrCh:
if !ok {
llmErrCh = nil
continue
}
if err != nil {
log.Printf("LLM stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "LLM generation failed")
}
if err != nil {
finish(true)
return
}
}
}
}()
// 3. Start TTS stream
ttsAudioCh, ttsErrCh := o.inference.SynthesizeSpeechStream(ctx, textCh, inference.TTSConfig{
Provider: components.TTS,
Voice: voice,
SpeakingStyle: speakingStyle,
Language: language,
SessionID: sessionID,
})
lookupStdPeer := func() mediapeer.MediaPeer {
o.mu.RLock()
defer o.mu.RUnlock()
return o.peers[sessionID]
}
if !o.AvatarEnabled() {
speakingBroadcasted := false
for ttsAudioCh != nil || ttsErrCh != nil {
select {
case <-ctx.Done():
return
case chunk, ok := <-ttsAudioCh:
if !ok {
ttsAudioCh = nil
continue
}
pcm, sampleRate := audioChunkToPCM16(chunk)
if len(pcm) == 0 {
continue
}
appendRecAudio(pcm, sampleRate)
if !speakingBroadcasted {
speakingBroadcasted = true
session.SetState(StateSpeaking)
o.broadcastStatus(sessionID, "speaking")
}
if peer := lookupStdPeer(); peer != nil {
if err := peer.PublishAudioFrame(pcm, sampleRate); err != nil {
log.Printf("std audio-only publish failed session=%s: %v", sessionID, err)
}
}
case err, ok := <-ttsErrCh:
if !ok {
ttsErrCh = nil
continue
}
if err == nil {
continue
}
log.Printf("TTS stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech synthesis failed")
return
}
}
return
}
// 4. Start Avatar stream
stdSyncBuf := newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples)
avatarAudioCh := make(chan *pb.AudioChunk, 8)
go func() {
defer close(avatarAudioCh)
for {
select {
case <-ctx.Done():
return
case chunk, ok := <-ttsAudioCh:
if !ok {
return
}
// Avatar decodes AudioChunk.Format itself; browser audio and
// recordings need signed 16-bit PCM.
pcm, pcmSampleRate := audioChunkToPCM16(chunk)
if len(pcm) > 0 {
appendRecAudio(pcm, pcmSampleRate)
if dropped := stdSyncBuf.appendPCM(pcm, pcmSampleRate); dropped > 0 {
bufferedSamples, _, _, _ := stdSyncBuf.snapshot()
log.Printf("std sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples)
}
}
// Forward original audio to avatar. Browser audio is published
// later as part of the same paced AV segment as the video.
select {
case avatarAudioCh <- chunk:
case <-ctx.Done():
return
}
case err, ok := <-ttsErrCh:
if !ok {
ttsErrCh = nil
continue
}
if err == nil {
continue
}
log.Printf("TTS stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech synthesis failed")
return
}
}
}()
// Delay speaking status until first video frame arrives (avoids frozen-frame stall on frontend).
speakingBroadcasted := false
// Serialize with concurrent avatar operations (see runVoiceLLMPipeline comment).
o.avatarMu.Lock()
videoCh, videoErrCh := o.inference.GenerateAvatarStream(ctx, avatarAudioCh)
// 5. Publish paced AV segments. The standard/Qwen chain receives audio
// before video, so PCM is buffered and sliced to match each video segment.
var (
segVideo []byte
segFrames int
segWidth int
segHeight int
segFPS int
segCount int
segSeq int64
segMediaStartMS int64
firstFrameSent bool
)
flushStdSeg := func(isFinalSeg bool) {
if segCount == 0 {
return
}
segSeq++
segDurationMS := durationMSForVideo(segFrames, segFPS)
if peer := lookupStdPeer(); peer != nil {
segPCM, outSamples, wantSamples, bufferedSamplesAfterTake := stdSyncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg)
_, _, _, sampleRate := stdSyncBuf.snapshot()
if sampleRate <= 0 {
sampleRate = 16000
}
if segSeq == 1 && outSamples < wantSamples {
bufferedSamples, _, _, _ := stdSyncBuf.snapshot()
log.Printf("std av drift for session %s: out_samples=%d want_samples=%d frames=%d fps=%d buffered_samples=%d",
sessionID, outSamples, wantSamples, segFrames, segFPS, bufferedSamples)
}
if isFinalSeg && bufferedSamplesAfterTake > 0 {
log.Printf("std av strict trim tail session=%s: trimmed_samples=%d", sessionID, bufferedSamplesAfterTake)
}
raw := &mediapeer.RawAVSegment{
TraceLabel: voiceTraceLabel(sessionID, turnSeq, "standard", "", segSeq),
Epoch: turnSeq,
SegmentSeq: segSeq,
MediaStartMS: segMediaStartMS,
DurationMS: segDurationMS,
RGB: segVideo,
PCM: segPCM,
SampleRate: sampleRate,
Width: segWidth,
Height: segHeight,
FPS: segFPS,
NumFrames: segFrames,
}
if err := peer.SendAVSegment(raw); err != nil {
log.Printf("std av SendAVSegment failed session=%s: %v", sessionID, err)
}
}
segMediaStartMS += segDurationMS
segVideo = nil
segFrames = 0
segCount = 0
}
for {
select {
case <-ctx.Done():
flushStdSeg(false)
finishRec()
o.avatarMu.Unlock()
return
case chunk, ok := <-videoCh:
if !ok {
flushStdSeg(true)
finishRec()
if err := <-videoErrCh; err != nil {
log.Printf("Avatar stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Avatar generation failed")
}
if ctx.Err() == nil {
if peer := lookupStdPeer(); peer != nil {
peer.WaitAVDrain(10 * time.Second)
}
}
o.avatarMu.Unlock()
return
}
nf := int(chunk.GetNumFrames())
if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 {
nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3)
}
fps := int(chunk.GetFps())
if fps <= 0 {
fps = 25
}
if !firstFrameSent {
firstFrameSent = true
log.Printf("TTFF std pipeline session=%s first_video_chunk=%.3fs", sessionID, time.Since(pipelineStart).Seconds())
if !speakingBroadcasted {
speakingBroadcasted = true
session.SetState(StateSpeaking)
o.broadcastStatus(sessionID, "speaking")
}
}
beginRec(int(chunk.GetWidth()), int(chunk.GetHeight()), fps)
writeRecVideo(chunk.GetData())
segVideo = append(segVideo, chunk.GetData()...)
segFrames += nf
segWidth = int(chunk.GetWidth())
segHeight = int(chunk.GetHeight())
segFPS = fps
segCount++
if segCount >= stdChunksPerSegment || chunk.GetIsFinal() {
flushStdSeg(chunk.GetIsFinal())
}
}
}
}
// HandleAudioStream processes incoming user audio through the session's pipeline.
func (o *Orchestrator) HandleAudioStream(ctx context.Context, sessionID string, audioCh <-chan []byte) error {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
if session.Mode == ModeStandard {
go o.runStandardASRLoop(ctx, session, sessionID, audioCh)
return nil
}
pipeCtx, cancel := context.WithCancel(ctx)
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
pipelineSeq := session.MarkPipelineRunning()
var inputCh <-chan inference.VoiceLLMInputEvent
if visualCfg, ok := o.VisualInputConfigForSession(session); ok {
frameCh, unsubscribe := session.SubscribeVisualFrames(2)
initialFrames := session.LatestVisualFrames(time.Now(), time.Duration(visualCfg.FrameTTLMS)*time.Millisecond)
inputCh = wrapVoiceMultimodalInput(pipeCtx, audioCh, frameCh, unsubscribe, initialFrames)
} else {
inputCh = wrapVoiceAudioInput(pipeCtx, audioCh)
}
go o.runVoiceLLMPipeline(pipeCtx, session, sessionID, inputCh, pipelineSeq, 0)
return nil
}
func (o *Orchestrator) runStandardASRLoop(ctx context.Context, session *Session, sessionID string, audioCh <-chan []byte) {
components, _, _, language := o.standardCharacterConfig(session)
transcriptCh, errCh := o.inference.TranscribeStream(ctx, audioCh, inference.ASRConfig{
Provider: components.ASR,
Language: language,
SessionID: sessionID,
})
session.SetState(StateListening)
o.broadcastStatus(sessionID, "idle")
for {
select {
case <-ctx.Done():
return
case event, ok := <-transcriptCh:
if !ok {
select {
case err, ok := <-errCh:
if ok && err != nil && ctx.Err() == nil {
log.Printf("ASR stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech recognition failed")
}
default:
}
return
}
text := strings.TrimSpace(event.GetText())
if text == "" {
continue
}
if !event.GetIsFinal() {
o.broadcastJSON(sessionID, map[string]any{
"type": "transcript",
"text": text,
"is_final": false,
"speaker": "user",
})
continue
}
o.stopPipelineAndWait(session, sessionID, false)
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq})
o.broadcastJSON(sessionID, map[string]any{
"type": "transcript",
"text": text,
"is_final": true,
"speaker": "user",
"turn_seq": turnSeq,
})
pipeCtx, cancel := context.WithCancel(context.Background())
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
pipelineSeq := session.MarkPipelineRunning()
go o.runStandardPipeline(pipeCtx, session, sessionID, pipelineSeq, turnSeq)
case err, ok := <-errCh:
if !ok {
errCh = nil
continue
}
if err == nil {
continue
}
if ctx.Err() == nil {
log.Printf("ASR stream error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech recognition failed")
}
return
}
}
}
// runVoiceLLMPipeline executes an omni turn source -> omni model -> Avatar (video).
func (o *Orchestrator) runVoiceLLMPipeline(ctx context.Context, session *Session, sessionID string, inputCh <-chan inference.VoiceLLMInputEvent, pipelineSeq uint64, initialTurnSeq uint64) {
o.runVoiceLLMPipelineWithConfig(
ctx,
session,
sessionID,
inputCh,
pipelineSeq,
initialTurnSeq,
o.buildVoiceLLMSessionConfigExcludingTurn(session, sessionID, initialTurnSeq),
false,
)
}
func (o *Orchestrator) runVoiceLLMPipelineWithConfig(
ctx context.Context,
session *Session,
sessionID string,
inputCh <-chan inference.VoiceLLMInputEvent,
pipelineSeq uint64,
initialTurnSeq uint64,
voiceConfig inference.VoiceLLMSessionConfig,
suppressUserTranscript bool,
) {
sessionDir := ""
if o.recorder != nil {
sessionDir = o.sessionRecordingDir(session)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
outputCh, errCh := o.inference.ConverseStream(ctx, inputCh, voiceConfig)
pendingTurnSeq := initialTurnSeq
pendingTurnAssistantReady := initialTurnSeq > 0
ignoredTurns := make(map[string]*voicePipelineTurn)
var currentTurn *voicePipelineTurn
var currentTurnDone <-chan voicePipelineTurnResult
var streamErr error
var pendingQuestionID string
var pendingReplyID string
var pendingUserFinalAt time.Time
var pendingAssistantMustMatchKey bool
lookupPeer := func() mediapeer.MediaPeer {
o.mu.RLock()
defer o.mu.RUnlock()
return o.peers[sessionID]
}
reservePendingTurn := func() uint64 {
if pendingTurnSeq == 0 {
pendingTurnSeq = session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, pendingTurnSeq)
}
return pendingTurnSeq
}
broadcastProcessing := func(turnSeq uint64) {
if turnSeq == 0 || !session.IsCurrentPipeline(pipelineSeq) {
return
}
session.SetState(StateProcessing)
o.broadcastStatusTurn(sessionID, "processing", turnSeq)
}
abortTurn := func(turn *voicePipelineTurn, ignoreKey bool) {
if turn == nil {
return
}
turn.aborted = true
if ignoreKey && turn.key != "" {
ignoredTurns[turn.key] = turn
}
if turn.avatarCancel != nil {
turn.avatarCancel()
}
}
saveAssistantMessage := func(turn *voicePipelineTurn) {
if turn == nil || turn.historySaved || strings.TrimSpace(turn.assistantText) == "" {
return
}
session.AddMessage(ChatMessage{Role: "assistant", Content: turn.assistantText, TurnSeq: turn.seq})
turn.historySaved = true
}
saveTurnTranscript := func(turn *voicePipelineTurn) {
if turn == nil || turn.transcriptSaved || strings.TrimSpace(turn.assistantText) == "" {
return
}
if o.recorder == nil || turn.recTurnID == "" {
return
}
if err := o.recorder.SaveTranscript(turn.sessionDir, turn.recTurnID, turn.assistantText); err != nil {
log.Printf("recording: SaveTranscript error session=%s turn=%s: %v", sessionID, turn.recTurnID, err)
return
}
turn.transcriptSaved = true
}
saveTurnRawAudio := func(turn *voicePipelineTurn) {
if turn == nil || turn.rawAudioSaved || len(turn.recAudioBuf) == 0 {
return
}
if o.recorder == nil || turn.recTurnID == "" {
return
}
if err := o.recorder.SaveRawAudio(turn.sessionDir, turn.recTurnID, turn.recAudioBuf, turn.recAudioSR); err != nil {
log.Printf("recording: SaveRawAudio error session=%s turn=%s: %v", sessionID, turn.recTurnID, err)
return
}
turn.rawAudioSaved = true
}
saveTurnConversation := func(turn *voicePipelineTurn) {
if turn == nil || turn.conversationSaved || strings.TrimSpace(turn.assistantText) == "" {
return
}
saved, err := o.persistSessionConversation(session)
if err != nil {
log.Printf("conversation: SaveConversation error session=%s turn=%d: %v", sessionID, turn.seq, err)
return
}
if saved {
turn.conversationSaved = true
}
}
saveCompletedTurn := func(turn *voicePipelineTurn) {
if turn == nil || turn.aborted {
return
}
saveAssistantMessage(turn)
saveTurnRawAudio(turn)
saveTurnTranscript(turn)
saveTurnConversation(turn)
}
recordIgnoredTurnOutput := func(turn *voicePipelineTurn, output *pb.VoiceLLMOutput) bool {
if turn == nil || output == nil {
return false
}
isFinal := voiceOutputIsFinal(output)
if transcript := output.GetTranscript(); transcript != "" {
if isFinal {
turn.assistantText = transcript
} else {
turn.assistantText += transcript
}
}
if audio := output.GetAudio(); audio != nil && len(audio.GetData()) > 0 {
turn.recAudioBuf = append(turn.recAudioBuf, audio.GetData()...)
if int(audio.GetSampleRate()) > 0 {
turn.recAudioSR = int(audio.GetSampleRate())
}
}
if isFinal {
shouldBroadcast := !turn.historySaved && strings.TrimSpace(turn.assistantText) != ""
saveAssistantMessage(turn)
saveTurnRawAudio(turn)
saveTurnTranscript(turn)
saveTurnConversation(turn)
if shouldBroadcast {
o.broadcastJSON(sessionID, map[string]any{
"type": "transcript",
"text": turn.assistantText,
"is_final": true,
"speaker": "assistant",
"turn_seq": turn.seq,
})
}
}
return isFinal
}
setIdleIfCurrent := func(turnSeq uint64) {
if turnSeq == 0 || !session.IsCurrentPipeline(pipelineSeq) || !session.IsCurrentTurn(turnSeq) {
return
}
session.SetState(StateListening)
o.broadcastStatusTurn(sessionID, "idle", turnSeq)
}
startTurn := func(key string) *voicePipelineTurn {
seq := pendingTurnSeq
if seq == 0 {
seq = session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, seq)
}
if key != "" {
delete(ignoredTurns, key)
}
turn := &voicePipelineTurn{
seq: seq,
key: key,
questionID: pendingQuestionID,
replyID: pendingReplyID,
recTurnID: fmt.Sprintf("turn%d", seq),
recAudioSR: 16000,
sessionDir: sessionDir,
turnStart: time.Now(),
userFinalAt: pendingUserFinalAt,
syncBuf: newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples),
}
pendingTurnSeq = 0
pendingTurnAssistantReady = false
pendingQuestionID = ""
pendingReplyID = ""
pendingUserFinalAt = time.Time{}
pendingAssistantMustMatchKey = false
logVoiceTrace("go_turn_started", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt)
broadcastProcessing(seq)
return turn
}
pendingTurnKey := func() string {
if pendingReplyID != "" {
return "reply:" + pendingReplyID
}
if pendingQuestionID != "" {
return "question:" + pendingQuestionID
}
return ""
}
startAvatarWorker := func(turn *voicePipelineTurn) {
if turn == nil || turn.aborted || turn.avatarStarted {
return
}
turn.avatarStarted = true
turn.doneCh = make(chan voicePipelineTurnResult, 1)
turn.avatarAudioCh = make(chan *pb.AudioChunk, 64)
turn.avatarCtx, turn.avatarCancel = context.WithCancel(ctx)
turn.avatarWorkerAt = time.Now()
logVoiceTrace("go_avatar_worker_started", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt)
currentTurnDone = turn.doneCh
go func(turn *voicePipelineTurn) {
result := voicePipelineTurnResult{turn: turn}
defer func() {
turn.doneCh <- result
}()
o.avatarMu.Lock()
defer o.avatarMu.Unlock()
if turn.avatarCtx.Err() != nil {
return
}
avatarCtx := inference.WithTraceContext(turn.avatarCtx, inference.TraceContext{
SessionID: sessionID,
QuestionID: turn.questionID,
ReplyID: turn.replyID,
TurnSeq: turn.seq,
UserFinalAt: turn.userFinalAt,
})
videoCh, avatarErrCh := o.inference.GenerateAvatarStream(avatarCtx, turn.avatarAudioCh)
logVoiceTrace("go_avatar_grpc_stream_opened", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt)
var turnRec *recording.TurnRecording
var (
segVideo []byte
segFrames int
segWidth int
segHeight int
segFPS int
segCount int
segSeq int64
segMediaStartMS int64
lastKnownSampleRate int
firstFrameSent bool
speakingBroadcasted bool
)
flushVoiceSeg := func(isFinalSeg bool) {
if segCount == 0 {
return
}
segSeq++
segDurationMS := durationMSForVideo(segFrames, segFPS)
peer := lookupPeer()
if peer != nil {
traceLabel := ""
if segSeq == 1 {
traceLabel = voiceTraceLabel(sessionID, turn.seq, turn.replyID, turn.questionID, segSeq)
}
segPCM, outSamples, wantSamples, bufferedSamplesAfterTake := turn.syncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg)
bufferedSamples, _, _, sampleRate := turn.syncBuf.snapshot()
if sampleRate > 0 {
lastKnownSampleRate = sampleRate
} else {
sampleRate = lastKnownSampleRate
}
if sampleRate <= 0 {
sampleRate = 16000
}
if segSeq == 1 && outSamples < wantSamples {
log.Printf("voice av drift for session %s: out_samples=%d want_samples=%d frames=%d fps=%d buffered_samples=%d",
sessionID, outSamples, wantSamples, segFrames, segFPS, bufferedSamples)
}
if segSeq == 1 && isFinalSeg && bufferedSamplesAfterTake > 0 {
log.Printf("voice av strict trim tail session=%s: trimmed_samples=%d", sessionID, bufferedSamplesAfterTake)
}
raw := &mediapeer.RawAVSegment{
TraceLabel: traceLabel,
Epoch: turn.seq,
SegmentSeq: segSeq,
MediaStartMS: segMediaStartMS,
DurationMS: segDurationMS,
RGB: segVideo,
PCM: segPCM,
UserFinalAt: turn.userFinalAt,
SampleRate: sampleRate,
Width: segWidth,
Height: segHeight,
FPS: segFPS,
NumFrames: segFrames,
}
if err := peer.SendAVSegment(raw); err != nil {
log.Printf("voice av SendAVSegment failed session=%s seg=%d: %v", sessionID, segSeq, err)
}
if turnRec != nil {
turnRec.WriteVideoChunk(segVideo)
turnRec.WriteAudioChunk(segPCM, sampleRate)
}
}
segMediaStartMS += segDurationMS
segVideo = nil
segFrames = 0
segCount = 0
}
for {
select {
case <-turn.avatarCtx.Done():
flushVoiceSeg(false)
if turnRec != nil {
_ = turnRec.Finish()
}
return
case chunk, ok := <-videoCh:
if !ok {
flushVoiceSeg(false)
if turnRec != nil {
_ = turnRec.Finish()
turnRec = nil
}
if remain, totalIn, totalOut, _ := turn.syncBuf.snapshot(); remain > 0 {
log.Printf("voice sync tail flush for session %s: dropping_unaligned_samples=%d total_in=%d total_out=%d", sessionID, remain, totalIn, totalOut)
}
if err := <-avatarErrCh; err != nil && turn.avatarCtx.Err() == nil && !errors.Is(err, context.Canceled) {
result.err = err
}
if turn.avatarCtx.Err() == nil {
if peer := lookupPeer(); peer != nil {
peer.WaitAVDrain(10 * time.Second)
}
}
return
}
nf := int(chunk.GetNumFrames())
if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 {
nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3)
}
fps := int(chunk.GetFps())
if fps <= 0 {
fps = 20
}
if !firstFrameSent {
firstFrameSent = true
turn.firstVideoAt = time.Now()
logVoiceTrace(
"go_avatar_first_video_received",
sessionID,
turn.seq,
turn.replyID,
turn.questionID,
turn.userFinalAt,
"avatar_ms="+strconv.FormatInt(time.Since(turn.avatarWorkerAt).Milliseconds(), 10),
)
log.Printf("TTFF voice pipeline session=%s turn=%d first_video_chunk=%.3fs", sessionID, turn.seq, time.Since(turn.turnStart).Seconds())
if !speakingBroadcasted && session.IsCurrentPipeline(pipelineSeq) && session.IsCurrentTurn(turn.seq) {
speakingBroadcasted = true
session.SetState(StateSpeaking)
o.broadcastStatusTurn(sessionID, "speaking", turn.seq)
}
}
if turnRec == nil && o.recorder != nil && turn.recTurnID != "" && nf > 0 {
turnRec = o.recorder.BeginTurn(turn.sessionDir, turn.recTurnID, int(chunk.GetWidth()), int(chunk.GetHeight()), fps)
}
segVideo = append(segVideo, chunk.GetData()...)
segFrames += nf
segWidth = int(chunk.GetWidth())
segHeight = int(chunk.GetHeight())
segFPS = fps
segCount++
flushVoiceSeg(chunk.GetIsFinal())
if chunk.GetIsFinal() && turnRec != nil {
_ = turnRec.Finish()
turnRec = nil
}
}
}
}(turn)
}
closeTurnInput := func(turn *voicePipelineTurn) {
if turn == nil || !turn.avatarStarted || turn.avatarInputClosed {
return
}
sampleRate := turn.recAudioSR
if sampleRate <= 0 {
if _, _, _, bufferedSR := turn.syncBuf.snapshot(); bufferedSR > 0 {
sampleRate = bufferedSR
}
}
silence := buildTrailingSilence(sampleRate)
if dropped := turn.syncBuf.appendPCM(silence.GetData(), int(silence.GetSampleRate())); dropped > 0 {
bufferedSamples, _, _, _ := turn.syncBuf.snapshot()
log.Printf("voice sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples)
}
select {
case turn.avatarAudioCh <- silence:
case <-turn.avatarCtx.Done():
}
close(turn.avatarAudioCh)
turn.avatarInputClosed = true
turn.avatarInputClosedAt = time.Now()
if turn.firstVideoAt.IsZero() {
logVoiceTrace("go_avatar_input_closed", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt)
}
}
currentStatusTurnSeq := func() uint64 {
if currentTurn != nil {
return currentTurn.seq
}
if pendingTurnSeq != 0 {
return pendingTurnSeq
}
return session.CurrentTurnSeq()
}
defer func() {
if currentTurn != nil {
abortTurn(currentTurn, false)
}
session.MarkPipelineFinished(pipelineSeq)
if session.IsCurrentPipeline(pipelineSeq) {
session.SetState(StateListening)
o.broadcastStatusTurn(sessionID, "idle", currentStatusTurnSeq())
}
}()
if initialTurnSeq == 0 && session.IsCurrentPipeline(pipelineSeq) {
session.SetState(StateListening)
}
for outputCh != nil || currentTurn != nil || errCh != nil {
select {
case <-ctx.Done():
return
case result := <-currentTurnDone:
if currentTurn == nil || result.turn != currentTurn {
continue
}
turn := currentTurn
currentTurn = nil
currentTurnDone = nil
if result.err != nil && !turn.aborted {
log.Printf("Avatar stream error for session %s (omni): %v", sessionID, result.err)
if session.IsCurrentPipeline(pipelineSeq) {
o.broadcastError(sessionID, "Avatar generation failed")
}
}
if turn.aborted {
continue
}
saveCompletedTurn(turn)
setIdleIfCurrent(turn.seq)
case output, ok := <-outputCh:
if !ok {
outputCh = nil
if currentTurn != nil && !o.AvatarEnabled() && !currentTurn.avatarStarted {
turn := currentTurn
currentTurn = nil
currentTurnDone = nil
saveCompletedTurn(turn)
setIdleIfCurrent(turn.seq)
}
continue
}
if rawTaskEvent := strings.TrimSpace(output.GetTaskEventJson()); rawTaskEvent != "" {
var payload map[string]any
if err := json.Unmarshal([]byte(rawTaskEvent), &payload); err != nil {
log.Printf("invalid persona task event json session=%s: %v", sessionID, err)
} else {
o.broadcastJSON(sessionID, o.persistPersonaTaskEvent(ctx, session, sessionID, payload))
}
if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) && strings.TrimSpace(output.GetUserTranscript()) == "" && !output.GetBargeIn() {
continue
}
}
outputQuestionID := output.GetQuestionId()
outputReplyID := output.GetReplyId()
if output.GetBargeIn() {
if currentTurn != nil || pendingTurnSeq == 0 {
seq := reservePendingTurn()
if outputQuestionID != "" {
pendingQuestionID = outputQuestionID
}
if outputReplyID != "" {
pendingReplyID = outputReplyID
}
pendingTurnAssistantReady = false
logVoiceTrace("go_barge_in_received", sessionID, seq, pendingReplyID, pendingQuestionID, time.Time{})
if currentTurn != nil {
abortTurn(currentTurn, true)
currentTurn = nil
currentTurnDone = nil
pendingAssistantMustMatchKey = true
broadcastProcessing(seq)
}
}
continue
}
if currentTurn != nil {
if outputQuestionID != "" {
currentTurn.questionID = outputQuestionID
}
if outputReplyID != "" {
currentTurn.replyID = outputReplyID
}
} else {
if outputQuestionID != "" {
pendingQuestionID = outputQuestionID
}
if outputReplyID != "" {
pendingReplyID = outputReplyID
}
}
if userText := strings.TrimSpace(output.GetUserTranscript()); userText != "" {
if suppressUserTranscript {
if pendingUserFinalAt.IsZero() {
pendingUserFinalAt = time.Now()
}
pendingTurnAssistantReady = true
if outputQuestionID != "" {
pendingQuestionID = outputQuestionID
}
if outputReplyID != "" {
pendingReplyID = outputReplyID
}
if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) {
continue
}
} else {
if currentTurn != nil {
abortTurn(currentTurn, true)
currentTurn = nil
currentTurnDone = nil
pendingAssistantMustMatchKey = true
}
if outputQuestionID != "" {
pendingQuestionID = outputQuestionID
}
if outputReplyID != "" {
pendingReplyID = outputReplyID
}
seq := reservePendingTurn()
pendingTurnAssistantReady = true
pendingUserFinalAt = time.Now()
logVoiceTrace(
"go_user_transcript_received",
sessionID,
seq,
pendingReplyID,
pendingQuestionID,
pendingUserFinalAt,
)
session.AddMessage(ChatMessage{Role: "user", Content: userText, TurnSeq: seq})
o.broadcastJSON(sessionID, map[string]any{
"type": "transcript",
"text": userText,
"is_final": true,
"speaker": "user",
"turn_seq": seq,
})
broadcastProcessing(seq)
}
}
turnKey := voiceOutputTurnKey(output)
if turnKey != "" {
if ignoredTurn, ignored := ignoredTurns[turnKey]; ignored {
if recordIgnoredTurnOutput(ignoredTurn, output) {
delete(ignoredTurns, turnKey)
}
continue
}
}
if pendingTurnSeq != 0 && !pendingTurnAssistantReady && currentTurn == nil && voiceOutputHasAssistantContent(output) {
continue
}
if pendingTurnSeq != 0 && pendingTurnAssistantReady && pendingAssistantMustMatchKey && currentTurn == nil && voiceOutputHasAssistantContent(output) {
if expectedKey := pendingTurnKey(); expectedKey != "" && turnKey != expectedKey {
continue
}
}
if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) {
continue
}
if currentTurn == nil && !voiceOutputHasAssistantContent(output) {
continue
}
if currentTurn != nil {
if currentTurn.key == "" && turnKey != "" {
currentTurn.key = turnKey
} else if turnKey != "" && currentTurn.key != "" && turnKey != currentTurn.key {
abortTurn(currentTurn, true)
currentTurn = nil
currentTurnDone = nil
}
}
if currentTurn == nil {
if pendingTurnSeq != 0 && !pendingTurnAssistantReady {
continue
}
currentTurn = startTurn(turnKey)
}
if currentTurn.key == "" && turnKey != "" {
currentTurn.key = turnKey
}
isFinal := voiceOutputIsFinal(output)
if transcript := output.GetTranscript(); transcript != "" {
o.broadcastJSON(sessionID, map[string]any{
"type": "transcript",
"text": transcript,
"is_final": isFinal,
"speaker": "assistant",
"turn_seq": currentTurn.seq,
})
if isFinal {
currentTurn.assistantText = transcript
} else {
currentTurn.assistantText += transcript
}
}
audio := output.GetAudio()
if audio != nil && len(audio.GetData()) > 0 {
pcm, pcmSampleRate := audioChunkToPCM16(audio)
if currentTurn.firstAudioAt.IsZero() {
currentTurn.firstAudioAt = time.Now()
logVoiceTrace(
"go_first_voice_audio_received",
sessionID,
currentTurn.seq,
currentTurn.replyID,
currentTurn.questionID,
currentTurn.userFinalAt,
)
}
if len(pcm) > 0 {
currentTurn.recAudioBuf = append(currentTurn.recAudioBuf, pcm...)
if pcmSampleRate > 0 {
currentTurn.recAudioSR = pcmSampleRate
}
}
if !o.AvatarEnabled() {
if len(pcm) > 0 {
if !currentTurn.audioOnlyStarted && session.IsCurrentPipeline(pipelineSeq) && session.IsCurrentTurn(currentTurn.seq) {
currentTurn.audioOnlyStarted = true
session.SetState(StateSpeaking)
o.broadcastStatusTurn(sessionID, "speaking", currentTurn.seq)
}
if peer := lookupPeer(); peer != nil {
if err := peer.PublishAudioFrame(pcm, pcmSampleRate); err != nil {
log.Printf("voice audio-only publish failed session=%s turn=%d: %v", sessionID, currentTurn.seq, err)
}
}
}
} else {
if !currentTurn.avatarStarted {
startAvatarWorker(currentTurn)
}
if len(pcm) > 0 {
if dropped := currentTurn.syncBuf.appendPCM(pcm, pcmSampleRate); dropped > 0 {
bufferedSamples, _, _, _ := currentTurn.syncBuf.snapshot()
log.Printf("voice sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples)
}
}
audioClone := proto.Clone(audio).(*pb.AudioChunk)
if currentTurn.firstAvatarAudioAt.IsZero() {
currentTurn.firstAvatarAudioAt = time.Now()
logVoiceTrace(
"go_avatar_first_audio_enqueued",
sessionID,
currentTurn.seq,
currentTurn.replyID,
currentTurn.questionID,
currentTurn.userFinalAt,
)
}
select {
case currentTurn.avatarAudioCh <- audioClone:
case <-currentTurn.avatarCtx.Done():
}
}
}
if !isFinal {
continue
}
if currentTurn.audioFinalAt.IsZero() {
currentTurn.audioFinalAt = time.Now()
if currentTurn.firstVideoAt.IsZero() {
logVoiceTrace("go_voice_audio_final_received", sessionID, currentTurn.seq, currentTurn.replyID, currentTurn.questionID, currentTurn.userFinalAt)
}
}
saveAssistantMessage(currentTurn)
saveTurnRawAudio(currentTurn)
saveTurnTranscript(currentTurn)
saveTurnConversation(currentTurn)
if currentTurn.avatarStarted {
closeTurnInput(currentTurn)
continue
}
turn := currentTurn
currentTurn = nil
currentTurnDone = nil
saveCompletedTurn(turn)
setIdleIfCurrent(turn.seq)
case err, ok := <-errCh:
if !ok {
errCh = nil
continue
}
if err != nil && !errors.Is(err, context.Canceled) {
streamErr = err
}
errCh = nil
}
}
if streamErr != nil {
log.Printf("Omni stream error for session %s: %v", sessionID, streamErr)
if session.IsCurrentPipeline(pipelineSeq) {
o.broadcastError(sessionID, "Voice conversation failed")
}
}
}
// HandleClientMediaReady starts the one-time proactive greeting for omni sessions
// after the browser confirms that realtime media is connected.
func (o *Orchestrator) HandleClientMediaReady(ctx context.Context, sessionID string) error {
if o == nil || o.sessionMgr == nil {
return errors.New("orchestrator is not configured")
}
if o.inference == nil {
return errors.New("inference service is not configured")
}
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
if session.Mode != ModeOmni {
log.Printf("startup greeting ignored for non-omni session %s mode=%s", sessionID, modeStringForLog(session.Mode))
return nil
}
if !session.TryStartVoiceStartupGreeting() {
log.Printf("startup greeting already started for session %s", sessionID)
return nil
}
if ctx == nil {
ctx = context.Background()
}
prompt := o.buildVoiceStartupGreetingPrompt(session)
voiceConfig := o.buildVoiceStartupGreetingSessionConfig(session, sessionID)
log.Printf("startup greeting starting for session %s provider=%s history_items=%d", sessionID, voiceConfig.Provider, len(session.DialogContextSnapshot()))
o.stopPipelineAndWait(session, sessionID, true)
pipeCtx, cancel := context.WithCancel(ctx)
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
session.SetState(StateProcessing)
o.broadcastStatusTurn(sessionID, "processing", turnSeq)
pipelineSeq := session.MarkPipelineRunning()
inputCh := singleVoiceTextInput(prompt)
go func(seq uint64) {
o.runVoiceLLMPipelineWithConfig(
pipeCtx,
session,
sessionID,
inputCh,
seq,
turnSeq,
voiceConfig,
true,
)
if pipeCtx.Err() != nil || !session.IsCurrentPipeline(seq) {
return
}
if err := o.resumeVoiceAudioStream(sessionID); err != nil {
log.Printf("Failed to resume omni audio stream after startup greeting for session %s: %v", sessionID, err)
}
}(pipelineSeq)
return nil
}
// Interrupt cancels the current pipeline for a session.
func (o *Orchestrator) Interrupt(sessionID string) error {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
o.cancelPipeline(session)
// Also interrupt the omni model on the inference side
if session.Mode == ModeOmni {
_ = o.inference.Interrupt(context.Background(), sessionID)
}
session.SetState(StateListening)
o.broadcastStatusTurn(sessionID, "idle", turnSeq)
return nil
}
// TeardownSession cleans up all resources for a session.
func (o *Orchestrator) TeardownSession(sessionID string) error {
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
o.cancelPipeline(session)
// Wait for pipeline goroutine to finish storing messages (up to 3s)
session.WaitPipelineDone(3 * time.Second)
// Disconnect media peer
o.mu.Lock()
peer, ok := o.peers[sessionID]
if ok {
delete(o.peers, sessionID)
}
delete(o.directPeers, sessionID)
o.mu.Unlock()
if peer != nil {
peer.StopAVPipeline()
_ = peer.Disconnect()
}
// Close WebSocket connections
o.wsHub.CloseSession(sessionID)
session.SetState(StateClosed)
return nil
}
// TeardownAll cleans up all sessions. Called during server shutdown.
func (o *Orchestrator) TeardownAll() {
o.mu.Lock()
peers := make(map[string]mediapeer.MediaPeer, len(o.peers))
for k, v := range o.peers {
peers[k] = v
}
o.peers = make(map[string]mediapeer.MediaPeer)
o.directPeers = make(map[string]*direct.DirectPeer)
o.mu.Unlock()
for _, peer := range peers {
peer.StopAVPipeline()
_ = peer.Disconnect()
}
}
// cancelPipeline cancels the active pipeline for a session if one exists.
func (o *Orchestrator) cancelPipeline(session *Session) {
session.mu.Lock()
cancel := session.PipelineCancel
session.PipelineCancel = nil
session.mu.Unlock()
if cancel != nil {
cancel()
}
}
func (o *Orchestrator) advancePlaybackEpoch(sessionID string, turnSeq uint64) {
if turnSeq == 0 {
return
}
o.mu.RLock()
peer := o.peers[sessionID]
o.mu.RUnlock()
if peer != nil {
peer.AdvancePlaybackEpoch(turnSeq)
}
}
// broadcastStatus sends an avatar_status message to all WebSocket clients.
func (o *Orchestrator) broadcastStatus(sessionID, status string) {
o.broadcastJSON(sessionID, map[string]string{
"type": "avatar_status",
"status": status,
})
}
func (o *Orchestrator) broadcastStatusTurn(sessionID, status string, turnSeq uint64) {
payload := map[string]any{
"type": "avatar_status",
"status": status,
}
if turnSeq > 0 {
payload["turn_seq"] = turnSeq
}
o.broadcastJSON(sessionID, payload)
}
// broadcastError sends an error message to all WebSocket clients.
func (o *Orchestrator) broadcastError(sessionID, message string) {
o.broadcastJSON(sessionID, map[string]string{
"type": "error",
"message": message,
})
}
func (o *Orchestrator) SpeakAssistantText(ctx context.Context, sessionID, text string, persist bool) error {
if o == nil {
return errors.New("orchestrator is nil")
}
session, err := o.sessionMgr.Get(sessionID)
if err != nil {
return err
}
text = strings.TrimSpace(text)
if text == "" {
return nil
}
o.stopPipelineAndWait(session, sessionID, session.Mode == ModeOmni)
turnSeq := session.MarkTurnStarted()
o.advancePlaybackEpoch(sessionID, turnSeq)
o.broadcastJSON(sessionID, map[string]any{
"type": "assistant_message",
"text": text,
"turn_seq": turnSeq,
})
if persist {
session.AddMessage(ChatMessage{Role: "assistant", Content: text, TurnSeq: turnSeq})
if _, err := o.persistSessionConversation(session); err != nil {
log.Printf("conversation: SaveConversation assistant speech error session=%s: %v", sessionID, err)
}
}
if o.inference == nil {
return nil
}
pipeCtx, cancel := context.WithCancel(ctx)
session.mu.Lock()
session.PipelineCancel = cancel
session.mu.Unlock()
pipelineSeq := session.MarkPipelineRunning()
resumeOmni := session.Mode == ModeOmni
go func() {
o.runAssistantSpeechPipeline(pipeCtx, session, sessionID, text, pipelineSeq, turnSeq)
if resumeOmni && pipeCtx.Err() == nil && session.IsCurrentPipeline(pipelineSeq) {
if err := o.resumeVoiceAudioStream(sessionID); err != nil {
log.Printf("Failed to resume omni audio stream after assistant speech for session %s: %v", sessionID, err)
}
}
}()
return nil
}
func (o *Orchestrator) runAssistantSpeechPipeline(ctx context.Context, session *Session, sessionID, text string, pipelineSeq uint64, turnSeq uint64) {
defer func() {
session.MarkPipelineFinished(pipelineSeq)
if session.IsCurrentPipeline(pipelineSeq) {
session.SetState(StateListening)
o.broadcastStatusTurn(sessionID, "idle", turnSeq)
}
}()
session.SetState(StateProcessing)
o.broadcastStatusTurn(sessionID, "processing", turnSeq)
components, voice, speakingStyle, language := o.standardCharacterConfig(session)
textCh := make(chan string, 1)
textCh <- text
close(textCh)
ttsAudioCh, ttsErrCh := o.inference.SynthesizeSpeechStream(ctx, textCh, inference.TTSConfig{
Provider: components.TTS,
Voice: voice,
SpeakingStyle: speakingStyle,
Language: language,
SessionID: sessionID,
})
lookupPeer := func() mediapeer.MediaPeer {
o.mu.RLock()
defer o.mu.RUnlock()
return o.peers[sessionID]
}
if !o.AvatarEnabled() {
speakingBroadcasted := false
for ttsAudioCh != nil || ttsErrCh != nil {
select {
case <-ctx.Done():
return
case chunk, ok := <-ttsAudioCh:
if !ok {
ttsAudioCh = nil
continue
}
pcm, sampleRate := audioChunkToPCM16(chunk)
if len(pcm) == 0 {
continue
}
if !speakingBroadcasted {
speakingBroadcasted = true
session.SetState(StateSpeaking)
o.broadcastStatusTurn(sessionID, "speaking", turnSeq)
}
if peer := lookupPeer(); peer != nil {
if err := peer.PublishAudioFrame(pcm, sampleRate); err != nil {
log.Printf("assistant speech audio-only publish failed session=%s: %v", sessionID, err)
}
}
case err, ok := <-ttsErrCh:
if !ok {
ttsErrCh = nil
continue
}
if err != nil {
log.Printf("assistant speech TTS error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech synthesis failed")
return
}
}
}
return
}
syncBuf := newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples)
avatarAudioCh := make(chan *pb.AudioChunk, 8)
go func() {
defer close(avatarAudioCh)
for {
select {
case <-ctx.Done():
return
case chunk, ok := <-ttsAudioCh:
if !ok {
return
}
pcm, sampleRate := audioChunkToPCM16(chunk)
if len(pcm) > 0 {
_ = syncBuf.appendPCM(pcm, sampleRate)
}
select {
case avatarAudioCh <- chunk:
case <-ctx.Done():
return
}
case err, ok := <-ttsErrCh:
if !ok {
ttsErrCh = nil
continue
}
if err != nil {
log.Printf("assistant speech TTS error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Speech synthesis failed")
return
}
}
}
}()
o.avatarMu.Lock()
defer o.avatarMu.Unlock()
videoCh, videoErrCh := o.inference.GenerateAvatarStream(ctx, avatarAudioCh)
var (
segVideo []byte
segFrames int
segWidth int
segHeight int
segFPS int
segCount int
segSeq int64
segMediaStartMS int64
firstFrameSent bool
)
flushSeg := func(isFinalSeg bool) {
if segCount == 0 {
return
}
segSeq++
segDurationMS := durationMSForVideo(segFrames, segFPS)
if peer := lookupPeer(); peer != nil {
segPCM, _, _, _ := syncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg)
_, _, _, sampleRate := syncBuf.snapshot()
if sampleRate <= 0 {
sampleRate = 16000
}
raw := &mediapeer.RawAVSegment{
TraceLabel: voiceTraceLabel(sessionID, turnSeq, "task", "", segSeq),
Epoch: turnSeq,
SegmentSeq: segSeq,
MediaStartMS: segMediaStartMS,
DurationMS: segDurationMS,
RGB: segVideo,
PCM: segPCM,
SampleRate: sampleRate,
Width: segWidth,
Height: segHeight,
FPS: segFPS,
NumFrames: segFrames,
}
if err := peer.SendAVSegment(raw); err != nil {
log.Printf("assistant speech SendAVSegment failed session=%s: %v", sessionID, err)
}
}
segMediaStartMS += segDurationMS
segVideo = nil
segFrames = 0
segCount = 0
}
for {
select {
case <-ctx.Done():
flushSeg(false)
return
case chunk, ok := <-videoCh:
if !ok {
flushSeg(true)
if err := <-videoErrCh; err != nil {
log.Printf("assistant speech avatar error for session %s: %v", sessionID, err)
o.broadcastError(sessionID, "Avatar generation failed")
}
if ctx.Err() == nil {
if peer := lookupPeer(); peer != nil {
peer.WaitAVDrain(10 * time.Second)
}
}
return
}
nf := int(chunk.GetNumFrames())
if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 {
nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3)
}
fps := int(chunk.GetFps())
if fps <= 0 {
fps = 25
}
if !firstFrameSent {
firstFrameSent = true
session.SetState(StateSpeaking)
o.broadcastStatusTurn(sessionID, "speaking", turnSeq)
}
segVideo = append(segVideo, chunk.GetData()...)
segFrames += nf
segWidth = int(chunk.GetWidth())
segHeight = int(chunk.GetHeight())
segFPS = fps
segCount++
if segCount >= stdChunksPerSegment || chunk.GetIsFinal() {
flushSeg(chunk.GetIsFinal())
}
}
}
}
// PersistSessionConversation writes the current session history to session.json.
func (o *Orchestrator) PersistSessionConversation(session *Session) (bool, error) {
return o.persistSessionConversation(session)
}
func (o *Orchestrator) persistSessionConversation(session *Session) (bool, error) {
if o == nil || o.charStore == nil || session == nil {
return false, nil
}
sessionID, characterID, startedAt, endedAt, history := session.ConversationSnapshot()
if characterID == "" || len(history) == 0 {
return false, nil
}
messages := make([]map[string]any, len(history))
for i, m := range history {
ts := m.Timestamp
if ts.IsZero() {
ts = startedAt
}
messages[i] = map[string]any{
"role": m.Role,
"content": m.Content,
"timestamp": ts.UTC().Format(time.RFC3339Nano),
}
if m.TurnSeq > 0 {
messages[i]["turn_seq"] = m.TurnSeq
}
}
if strings.TrimSpace(characterID) == kanshan.CharacterID {
ownerID := session.OwnerIDSnapshot()
if strings.TrimSpace(ownerID) == "" {
return false, nil
}
if err := o.charStore.SaveConversationForOwner(characterID, ownerID, sessionID, startedAt, endedAt, messages); err != nil {
return false, err
}
} else {
if err := o.charStore.SaveConversation(characterID, sessionID, startedAt, endedAt, messages); err != nil {
return false, err
}
}
return true, nil
}
// sessionRecordingDir returns the directory for recording output.
// If the session has a character, records go into the character's sessions/ dir.
// Otherwise falls back to a timestamp-based dir (used by the recorder's OutputDir).
func (o *Orchestrator) sessionRecordingDir(session *Session) string {
if session.CharacterID != "" && o.charStore != nil {
var dir string
if strings.TrimSpace(session.CharacterID) == kanshan.CharacterID {
ownerID := session.OwnerIDSnapshot()
if strings.TrimSpace(ownerID) != "" {
dir = o.charStore.SessionRecordingDirForOwner(session.CharacterID, ownerID, session.ID, session.CreatedAt)
}
} else {
dir = o.charStore.SessionRecordingDir(session.CharacterID, session.ID, session.CreatedAt)
}
if dir != "" {
session.mu.Lock()
session.RecordingDir = dir
session.mu.Unlock()
return dir
}
}
// Fallback: legacy timestamp-based dir
return time.Now().Format("20060102-150405")
}
// broadcastJSON marshals and broadcasts a JSON message.
func (o *Orchestrator) broadcastJSON(sessionID string, v any) {
if o.wsHub == nil {
return
}
data, err := json.Marshal(v)
if err != nil {
log.Printf("Failed to marshal broadcast: %v", err)
return
}
o.wsHub.Broadcast(sessionID, data)
}