capsule AI-native Unix-like composition layer

src/server/cmd/cyberverse-server/main.go

6,371 bytes · 203 lines · capsule://quake0day/[email protected] raw on github

package main

import (
	"flag"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"

	"github.com/cyberverse/server/internal/agenttask"
	"github.com/cyberverse/server/internal/api"
	"github.com/cyberverse/server/internal/character"
	"github.com/cyberverse/server/internal/config"
	"github.com/cyberverse/server/internal/direct"
	"github.com/cyberverse/server/internal/inference"
	"github.com/cyberverse/server/internal/livekit"
	"github.com/cyberverse/server/internal/orchestrator"
	"github.com/cyberverse/server/internal/recording"
	"github.com/cyberverse/server/internal/ws"
)

func main() {
	configPath := flag.String("config", "../../cyberverse_config.yaml", "path to config file")
	flag.Parse()

	// Load .env before config so ${VAR} placeholders in YAML expand correctly.
	envPath := filepath.Join(filepath.Dir(*configPath), ".env")
	if err := config.LoadDotenv(envPath); err != nil {
		log.Printf("Warning: failed to load .env: %v", err)
	}

	if _, err := os.Stat(*configPath); os.IsNotExist(err) {
		log.Fatalf("Config file %s not found. Copy infra/cyberverse_config.example.yaml to cyberverse_config.yaml first.", *configPath)
	}

	cfg, err := config.Load(*configPath)
	if err != nil {
		log.Fatalf("Failed to load config: %v", err)
	}

	// Create session manager
	sessionMgr := orchestrator.NewSessionManager(cfg.Session.MaxConcurrent)

	// Create WebSocket hub
	wsHub := ws.NewHub()

	// Create inference gRPC client
	inferenceClient, err := inference.NewClient(cfg.Inference.Addr)
	if err != nil {
		log.Printf("Warning: failed to connect to inference server at %s: %v", cfg.Inference.Addr, err)
		log.Printf("Server will start but inference features will be unavailable")
	}

	// Create LiveKit room manager
	roomMgr := livekit.NewRoomManager(cfg.LiveKit.URL, cfg.LiveKit.APIKey, cfg.LiveKit.APISecret)

	// Create character store (directory-based, one dir per character)
	dataDir := filepath.Join(filepath.Dir(*configPath), "data")
	os.MkdirAll(dataDir, 0755)
	charStore, err := character.NewStore(filepath.Join(dataDir, "characters"))
	if err != nil {
		log.Fatalf("Failed to init character store: %v", err)
	}

	taskDBPath := filepath.Join(dataDir, "tasks", "tasks.db")
	artifactDir := filepath.Join(dataDir, "tasks", "artifacts")
	taskStore, err := agenttask.OpenStore(taskDBPath, artifactDir)
	if err != nil {
		log.Fatalf("Failed to init agent task store: %v", err)
	}
	taskSvc := agenttask.NewService(taskStore, wsHub)
	log.Printf("Agent task projection store initialized: db=%s", taskDBPath)

	// Create orchestrator (needs charStore for recording paths)
	recorder := recording.NewVideoRecorder(cfg.Recording)
	orch := orchestrator.New(inferenceClient, wsHub, sessionMgr, recorder, charStore, cfg.Pipeline)
	if taskSvc != nil {
		orch.SetTaskService(taskSvc)
		taskSvc.SetEventHandler(orch.HandleTaskEvent)
	}

	// Embedded TURN-over-TCP server for NAT traversal (AutoDL, SSH tunnel, etc.)
	var turnServer *direct.TURNServer
	if cfg.Pipeline.TURNEnabled && cfg.Pipeline.TURNPort > 0 {
		publicIP := cfg.Pipeline.ICEPublicIP
		// Resolve hostname to IP if needed
		if publicIP != "" && net.ParseIP(publicIP) == nil {
			addrs, err := net.LookupHost(publicIP)
			if err != nil || len(addrs) == 0 {
				log.Fatalf("Cannot resolve ice_public_ip %q: %v", publicIP, err)
			}
			publicIP = addrs[0]
			log.Printf("Resolved ice_public_ip %q -> %s", cfg.Pipeline.ICEPublicIP, publicIP)
		}
		if publicIP == "" {
			publicIP = "127.0.0.1"
		}
		ts, err := direct.NewTURNServer(
			cfg.Pipeline.TURNPort, publicIP,
			cfg.Pipeline.TURNRealm,
			cfg.Pipeline.TURNUsername,
			cfg.Pipeline.TURNPassword,
		)
		if err != nil {
			log.Fatalf("TURN server setup failed: %v", err)
		}
		turnServer = ts
		orch.SetTURNServer(ts)
		log.Printf("TURN server enabled on TCP port %d (relay IP: %s)", cfg.Pipeline.TURNPort, publicIP)
	}

	// WebRTC API with interceptors (NACK, TWCC, GCC pacer) for direct streaming mode
	if cfg.Pipeline.StreamingMode == "direct" {
		api, estimatorCh, err := direct.NewWebRTCAPI(direct.WebRTCAPIConfig{
			InitialBitrate: 2_500_000,
			MinBitrate:     800_000,
			MaxBitrate:     4_000_000,
		})
		if err != nil {
			log.Fatalf("WebRTC API setup failed: %v", err)
		}
		orch.SetWebRTCAPI(api, estimatorCh)
		log.Println("WebRTC API initialized with interceptors (NACK, TWCC, GCC)")
	}

	// Register session end callback to persist conversation history
	sessionMgr.OnSessionEnd = func(s *orchestrator.Session) {
		sessionID, characterID, _, _, history := s.ConversationSnapshot()
		log.Printf("OnSessionEnd: session=%s character=%s historyLen=%d", sessionID, characterID, len(history))
		saved, err := orch.PersistSessionConversation(s)
		if err != nil {
			log.Printf("Failed to save conversation for session %s: %v", sessionID, err)
			return
		}
		if !saved {
			log.Printf("OnSessionEnd: skipping save — characterID=%q historyLen=%d", characterID, len(history))
			return
		}
		log.Printf("Conversation saved for session %s (character %s)", sessionID, characterID)
	}

	// Create router with all dependencies
	router := api.NewRouter(sessionMgr, orch, wsHub, roomMgr, cfg, charStore, envPath, *configPath, taskSvc)

	addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.HTTPPort)
	srv := &http.Server{
		Addr:    addr,
		Handler: router.Handler(),
	}

	// Graceful shutdown
	go func() {
		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
		<-sigCh
		log.Println("Shutting down server...")

		// Teardown all orchestrator sessions
		orch.TeardownAll()

		// Close inference client
		if inferenceClient != nil {
			inferenceClient.Close()
		}

		// Close TURN server
		if turnServer != nil {
			turnServer.Close()
		}

		// Stop session manager cleanup
		sessionMgr.Stop()

		if taskStore != nil {
			taskStore.Close()
		}

		srv.Close()
	}()

	log.Printf("CyberVerse Server starting on %s", addr)
	log.Printf("Inference server: %s", cfg.Inference.Addr)
	log.Printf("LiveKit URL: %s", cfg.LiveKit.URL)
	log.Printf("Streaming mode: %s", cfg.Pipeline.StreamingMode)
	if err := srv.ListenAndServe(); err != http.ErrServerClosed {
		log.Fatalf("Server error: %v", err)
	}
}

func resolveConfigPath(configDir, value, fallback string) string {
	if value == "" {
		return fallback
	}
	if filepath.IsAbs(value) {
		return value
	}
	return filepath.Join(configDir, value)
}