capsule AI-native Unix-like composition layer

src/server/internal/api/knowledge.go

10,076 bytes · 327 lines · capsule://quake0day/[email protected] raw on github

package api

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"log"
	"mime/multipart"
	"net/http"
	"os"
	"strings"
	"time"

	ragstore "github.com/cyberverse/server/internal/rag"
)

type internalKnowledgeSearchRequest struct {
	Query string `json:"query"`
}

type skippedKnowledgeFile struct {
	Filename string `json:"filename"`
	Reason   string `json:"reason"`
}

type uploadKnowledgeFilesResponse struct {
	Sources []ragstore.Source      `json:"sources"`
	Skipped []skippedKnowledgeFile `json:"skipped,omitempty"`
}

type uploadedKnowledgeFile struct {
	Filename string
	MimeType string
	TempPath string
}

func (r *Router) handleListKnowledgeSources(w http.ResponseWriter, req *http.Request) {
	if r.ragStore == nil {
		writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "knowledge store is disabled"})
		return
	}
	sources, err := r.ragStore.List(req.PathValue("id"))
	if err != nil {
		writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
		return
	}
	writeJSON(w, http.StatusOK, map[string]any{"sources": sources})
}

func (r *Router) handleUploadKnowledgeFiles(w http.ResponseWriter, req *http.Request) {
	if r.ragStore == nil {
		writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "knowledge store is disabled"})
		return
	}
	reader, err := req.MultipartReader()
	if err != nil {
		writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "invalid multipart form: " + err.Error()})
		return
	}

	characterID := req.PathValue("id")
	files, relativePaths, skipped, err := readKnowledgeMultipart(reader)
	defer cleanupUploadedKnowledgeFiles(files)
	if err != nil {
		writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "invalid multipart form: " + err.Error()})
		return
	}
	if len(files) == 0 {
		writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "file is required"})
		return
	}

	sources := make([]ragstore.Source, 0, len(files))
	for i, upload := range files {
		file, err := os.Open(upload.TempPath)
		if err != nil {
			skipped = append(skipped, skippedKnowledgeFile{Filename: upload.Filename, Reason: err.Error()})
			continue
		}
		relativePath := upload.Filename
		if i < len(relativePaths) && strings.TrimSpace(relativePaths[i]) != "" {
			relativePath = relativePaths[i]
		}
		result, createErr := r.ragStore.SaveFile(characterID, relativePath, upload.MimeType, file)
		closeErr := file.Close()
		if createErr != nil {
			skipped = append(skipped, skippedKnowledgeFile{Filename: upload.Filename, Reason: createErr.Error()})
			continue
		}
		if closeErr != nil {
			skipped = append(skipped, skippedKnowledgeFile{Filename: upload.Filename, Reason: closeErr.Error()})
			continue
		}
		if result.Source.Indexable {
			r.scheduleKnowledgeIndex(characterID, result.Source.ID)
		} else if result.PreviousIndexable && r.orch != nil {
			go func(sourceID string) {
				ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
				defer cancel()
				if err := r.orch.DeleteKnowledgeSource(ctx, characterID, r.charStore.CharDir(characterID), sourceID); err != nil {
					log.Printf("knowledge delete stale index failed character=%s source=%s: %v", characterID, sourceID, err)
				}
			}(result.Source.ID)
		}
		sources = append(sources, *result.Source)
	}

	resp := uploadKnowledgeFilesResponse{Sources: sources, Skipped: skipped}
	if len(sources) == 0 {
		writeJSON(w, http.StatusBadRequest, map[string]any{
			"error":   "no files uploaded",
			"skipped": skipped,
		})
		return
	}
	writeJSON(w, http.StatusCreated, resp)
}

func readKnowledgeMultipart(reader *multipart.Reader) ([]uploadedKnowledgeFile, []string, []skippedKnowledgeFile, error) {
	files := make([]uploadedKnowledgeFile, 0)
	relativePaths := make([]string, 0)
	skipped := make([]skippedKnowledgeFile, 0)

	for {
		part, err := reader.NextPart()
		if errors.Is(err, io.EOF) {
			break
		}
		if err != nil {
			return files, relativePaths, skipped, err
		}

		switch part.FormName() {
		case "file", "files":
			filename := part.FileName()
			if strings.TrimSpace(filename) == "" {
				skipped = append(skipped, skippedKnowledgeFile{Filename: "", Reason: "filename is required"})
				_ = part.Close()
				continue
			}
			tmp, err := os.CreateTemp("", "cyberverse-knowledge-upload-*")
			if err != nil {
				_ = part.Close()
				return files, relativePaths, skipped, err
			}
			tempPath := tmp.Name()
			_, copyErr := io.Copy(tmp, part)
			closeErr := tmp.Close()
			_ = part.Close()
			if copyErr != nil {
				_ = os.Remove(tempPath)
				skipped = append(skipped, skippedKnowledgeFile{Filename: filename, Reason: copyErr.Error()})
				continue
			}
			if closeErr != nil {
				_ = os.Remove(tempPath)
				skipped = append(skipped, skippedKnowledgeFile{Filename: filename, Reason: closeErr.Error()})
				continue
			}
			files = append(files, uploadedKnowledgeFile{
				Filename: filename,
				MimeType: part.Header.Get("Content-Type"),
				TempPath: tempPath,
			})
		case "relative_paths":
			data, err := io.ReadAll(io.LimitReader(part, 64*1024))
			_ = part.Close()
			if err != nil {
				return files, relativePaths, skipped, err
			}
			relativePaths = append(relativePaths, strings.TrimSpace(string(data)))
		default:
			_, _ = io.Copy(io.Discard, part)
			_ = part.Close()
		}
	}

	return files, relativePaths, skipped, nil
}

func cleanupUploadedKnowledgeFiles(files []uploadedKnowledgeFile) {
	for _, file := range files {
		if file.TempPath != "" {
			_ = os.Remove(file.TempPath)
		}
	}
}

func (r *Router) handleDeleteKnowledgeSource(w http.ResponseWriter, req *http.Request) {
	if r.ragStore == nil {
		writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "knowledge store is disabled"})
		return
	}
	characterID := req.PathValue("id")
	sourceID := req.PathValue("source_id")
	source, err := r.ragStore.Get(characterID, sourceID)
	if err != nil {
		writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
		return
	}
	charDir := r.charStore.CharDir(characterID)
	if err := r.ragStore.Delete(characterID, source.ID); err != nil {
		writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
		return
	}
	if r.orch != nil {
		go func() {
			ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
			defer cancel()
			if err := r.orch.DeleteKnowledgeSource(ctx, characterID, charDir, source.ID); err != nil {
				log.Printf("knowledge delete index failed character=%s source=%s: %v", characterID, source.ID, err)
			}
		}()
	}
	w.WriteHeader(http.StatusNoContent)
}

func (r *Router) handleReindexKnowledgeSource(w http.ResponseWriter, req *http.Request) {
	if r.ragStore == nil {
		writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "knowledge store is disabled"})
		return
	}
	characterID := req.PathValue("id")
	sourceID := req.PathValue("source_id")
	source, err := r.ragStore.MarkIndexing(characterID, sourceID)
	if err != nil {
		writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
		return
	}
	if !source.Indexable {
		source, err = r.ragStore.MarkStoredReady(characterID, sourceID)
		if err != nil {
			writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
			return
		}
		writeJSON(w, http.StatusAccepted, source)
		return
	}
	r.scheduleKnowledgeIndex(characterID, source.ID)
	writeJSON(w, http.StatusAccepted, source)
}

func (r *Router) handleInternalKnowledgeSearch(w http.ResponseWriter, req *http.Request) {
	if !authorizeInternalRequest(w, req) {
		return
	}
	if r.orch == nil {
		writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "inference service is not configured"})
		return
	}
	var body internalKnowledgeSearchRequest
	if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
		writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "invalid JSON: " + err.Error()})
		return
	}
	if strings.TrimSpace(body.Query) == "" {
		writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "query is required"})
		return
	}
	results, err := r.orch.SearchKnowledge(req.Context(), req.PathValue("id"), body.Query)
	if err != nil {
		writeJSON(w, http.StatusInternalServerError, ErrorResponse{Error: err.Error()})
		return
	}
	writeJSON(w, http.StatusOK, map[string]any{"results": results})
}

func authorizeInternalRequest(w http.ResponseWriter, req *http.Request) bool {
	expected := strings.TrimSpace(os.Getenv("AGENT_INTERNAL_TOKEN"))
	if expected == "" {
		return true
	}
	got := strings.TrimSpace(req.Header.Get("Authorization"))
	if got != "Bearer "+expected {
		writeJSON(w, http.StatusUnauthorized, ErrorResponse{Error: "invalid internal token"})
		return false
	}
	return true
}

func (r *Router) scheduleKnowledgeIndex(characterID, sourceID string) {
	go func() {
		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
		defer cancel()
		if err := r.indexKnowledgeSource(ctx, characterID, sourceID); err != nil {
			log.Printf("knowledge index failed character=%s source=%s: %v", characterID, sourceID, err)
		}
	}()
}

func (r *Router) indexKnowledgeSource(ctx context.Context, characterID, sourceID string) error {
	if r == nil || r.ragStore == nil {
		return errors.New("knowledge store is disabled")
	}
	source, err := r.ragStore.Get(characterID, sourceID)
	if err != nil {
		return err
	}
	if !source.Indexable {
		_, err := r.ragStore.MarkStoredReady(characterID, sourceID)
		return err
	}
	path, err := r.ragStore.SourcePath(characterID, source)
	if err != nil {
		_, _ = r.ragStore.MarkFailed(characterID, sourceID, err)
		return err
	}
	if r.cfg != nil && !r.cfg.Pipeline.RAG.IsEnabled() {
		err := errors.New("RAG is disabled")
		_, _ = r.ragStore.MarkFailed(characterID, sourceID, err)
		return err
	}
	if r.orch == nil {
		err := errors.New("inference service is not configured")
		_, _ = r.ragStore.MarkFailed(characterID, sourceID, err)
		return err
	}
	chunkCount, err := r.orch.IndexKnowledgeSource(ctx, characterID, r.charStore.CharDir(characterID), source, path)
	if err != nil {
		_, _ = r.ragStore.MarkFailed(characterID, sourceID, err)
		return err
	}
	_, err = r.ragStore.MarkReady(characterID, sourceID, chunkCount)
	return err
}