src/server/cmd/cyberverse-server/main.go
6,371 bytes · 203 lines · capsule://quake0day/[email protected]
raw on github
package main
import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"github.com/cyberverse/server/internal/agenttask"
"github.com/cyberverse/server/internal/api"
"github.com/cyberverse/server/internal/character"
"github.com/cyberverse/server/internal/config"
"github.com/cyberverse/server/internal/direct"
"github.com/cyberverse/server/internal/inference"
"github.com/cyberverse/server/internal/livekit"
"github.com/cyberverse/server/internal/orchestrator"
"github.com/cyberverse/server/internal/recording"
"github.com/cyberverse/server/internal/ws"
)
func main() {
configPath := flag.String("config", "../../cyberverse_config.yaml", "path to config file")
flag.Parse()
// Load .env before config so ${VAR} placeholders in YAML expand correctly.
envPath := filepath.Join(filepath.Dir(*configPath), ".env")
if err := config.LoadDotenv(envPath); err != nil {
log.Printf("Warning: failed to load .env: %v", err)
}
if _, err := os.Stat(*configPath); os.IsNotExist(err) {
log.Fatalf("Config file %s not found. Copy infra/cyberverse_config.example.yaml to cyberverse_config.yaml first.", *configPath)
}
cfg, err := config.Load(*configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Create session manager
sessionMgr := orchestrator.NewSessionManager(cfg.Session.MaxConcurrent)
// Create WebSocket hub
wsHub := ws.NewHub()
// Create inference gRPC client
inferenceClient, err := inference.NewClient(cfg.Inference.Addr)
if err != nil {
log.Printf("Warning: failed to connect to inference server at %s: %v", cfg.Inference.Addr, err)
log.Printf("Server will start but inference features will be unavailable")
}
// Create LiveKit room manager
roomMgr := livekit.NewRoomManager(cfg.LiveKit.URL, cfg.LiveKit.APIKey, cfg.LiveKit.APISecret)
// Create character store (directory-based, one dir per character)
dataDir := filepath.Join(filepath.Dir(*configPath), "data")
os.MkdirAll(dataDir, 0755)
charStore, err := character.NewStore(filepath.Join(dataDir, "characters"))
if err != nil {
log.Fatalf("Failed to init character store: %v", err)
}
taskDBPath := filepath.Join(dataDir, "tasks", "tasks.db")
artifactDir := filepath.Join(dataDir, "tasks", "artifacts")
taskStore, err := agenttask.OpenStore(taskDBPath, artifactDir)
if err != nil {
log.Fatalf("Failed to init agent task store: %v", err)
}
taskSvc := agenttask.NewService(taskStore, wsHub)
log.Printf("Agent task projection store initialized: db=%s", taskDBPath)
// Create orchestrator (needs charStore for recording paths)
recorder := recording.NewVideoRecorder(cfg.Recording)
orch := orchestrator.New(inferenceClient, wsHub, sessionMgr, recorder, charStore, cfg.Pipeline)
if taskSvc != nil {
orch.SetTaskService(taskSvc)
taskSvc.SetEventHandler(orch.HandleTaskEvent)
}
// Embedded TURN-over-TCP server for NAT traversal (AutoDL, SSH tunnel, etc.)
var turnServer *direct.TURNServer
if cfg.Pipeline.TURNEnabled && cfg.Pipeline.TURNPort > 0 {
publicIP := cfg.Pipeline.ICEPublicIP
// Resolve hostname to IP if needed
if publicIP != "" && net.ParseIP(publicIP) == nil {
addrs, err := net.LookupHost(publicIP)
if err != nil || len(addrs) == 0 {
log.Fatalf("Cannot resolve ice_public_ip %q: %v", publicIP, err)
}
publicIP = addrs[0]
log.Printf("Resolved ice_public_ip %q -> %s", cfg.Pipeline.ICEPublicIP, publicIP)
}
if publicIP == "" {
publicIP = "127.0.0.1"
}
ts, err := direct.NewTURNServer(
cfg.Pipeline.TURNPort, publicIP,
cfg.Pipeline.TURNRealm,
cfg.Pipeline.TURNUsername,
cfg.Pipeline.TURNPassword,
)
if err != nil {
log.Fatalf("TURN server setup failed: %v", err)
}
turnServer = ts
orch.SetTURNServer(ts)
log.Printf("TURN server enabled on TCP port %d (relay IP: %s)", cfg.Pipeline.TURNPort, publicIP)
}
// WebRTC API with interceptors (NACK, TWCC, GCC pacer) for direct streaming mode
if cfg.Pipeline.StreamingMode == "direct" {
api, estimatorCh, err := direct.NewWebRTCAPI(direct.WebRTCAPIConfig{
InitialBitrate: 2_500_000,
MinBitrate: 800_000,
MaxBitrate: 4_000_000,
})
if err != nil {
log.Fatalf("WebRTC API setup failed: %v", err)
}
orch.SetWebRTCAPI(api, estimatorCh)
log.Println("WebRTC API initialized with interceptors (NACK, TWCC, GCC)")
}
// Register session end callback to persist conversation history
sessionMgr.OnSessionEnd = func(s *orchestrator.Session) {
sessionID, characterID, _, _, history := s.ConversationSnapshot()
log.Printf("OnSessionEnd: session=%s character=%s historyLen=%d", sessionID, characterID, len(history))
saved, err := orch.PersistSessionConversation(s)
if err != nil {
log.Printf("Failed to save conversation for session %s: %v", sessionID, err)
return
}
if !saved {
log.Printf("OnSessionEnd: skipping save — characterID=%q historyLen=%d", characterID, len(history))
return
}
log.Printf("Conversation saved for session %s (character %s)", sessionID, characterID)
}
// Create router with all dependencies
router := api.NewRouter(sessionMgr, orch, wsHub, roomMgr, cfg, charStore, envPath, *configPath, taskSvc)
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.HTTPPort)
srv := &http.Server{
Addr: addr,
Handler: router.Handler(),
}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down server...")
// Teardown all orchestrator sessions
orch.TeardownAll()
// Close inference client
if inferenceClient != nil {
inferenceClient.Close()
}
// Close TURN server
if turnServer != nil {
turnServer.Close()
}
// Stop session manager cleanup
sessionMgr.Stop()
if taskStore != nil {
taskStore.Close()
}
srv.Close()
}()
log.Printf("CyberVerse Server starting on %s", addr)
log.Printf("Inference server: %s", cfg.Inference.Addr)
log.Printf("LiveKit URL: %s", cfg.LiveKit.URL)
log.Printf("Streaming mode: %s", cfg.Pipeline.StreamingMode)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}
func resolveConfigPath(configDir, value, fallback string) string {
if value == "" {
return fallback
}
if filepath.IsAbs(value) {
return value
}
return filepath.Join(configDir, value)
}