src/server/internal/api/tasks.go
6,727 bytes · 218 lines · capsule://quake0day/[email protected]
raw on github
package api
import (
"context"
"encoding/json"
"errors"
"net/http"
"os"
"strconv"
"strings"
"github.com/cyberverse/server/internal/agenttask"
)
type internalTaskEventRequest struct {
EventType string `json:"event_type"`
Status agenttask.Status `json:"status"`
Message string `json:"message"`
Progress int `json:"progress"`
Payload json.RawMessage `json:"payload"`
}
type internalTaskArtifactRequest struct {
Type string `json:"type"`
Title string `json:"title"`
MimeType string `json:"mime_type"`
Content string `json:"content"`
Metadata json.RawMessage `json:"metadata"`
}
func (r *Router) handleListSessionTasks(w http.ResponseWriter, req *http.Request) {
if r.taskSvc == nil || r.taskSvc.Store() == nil {
writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "agent task service is disabled"})
return
}
sessionID := req.PathValue("id")
if strings.TrimSpace(sessionID) == "" {
writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "session id is required"})
return
}
limit := parsePositiveInt(req.URL.Query().Get("limit"), 50, 200)
if session, err := r.sessionMgr.Get(sessionID); err == nil && isKanshanCharacter(session.CharacterID) {
if !r.authorizeKanshanSessionAccess(w, req, session) {
return
}
tasks, err := r.taskSvc.Store().ListSessionTasksForOwner(req.Context(), sessionID, session.OwnerIDSnapshot(), limit)
if err != nil {
writeJSON(w, http.StatusInternalServerError, ErrorResponse{Error: err.Error()})
return
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": tasks})
return
}
tasks, err := r.taskSvc.Store().ListSessionTasks(req.Context(), sessionID, limit)
if err != nil {
writeJSON(w, http.StatusInternalServerError, ErrorResponse{Error: err.Error()})
return
}
var ok bool
tasks, ok = r.filterVisibleTasks(w, req, tasks)
if !ok {
return
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": tasks})
}
func (r *Router) handleGetTask(w http.ResponseWriter, req *http.Request) {
if r.taskSvc == nil || r.taskSvc.Store() == nil {
writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "agent task service is disabled"})
return
}
task, err := r.taskSvc.Store().GetTask(req.Context(), req.PathValue("task_id"))
if err != nil {
writeTaskError(w, err)
return
}
if !r.authorizeTaskAccess(w, req, task) {
return
}
writeJSON(w, http.StatusOK, task)
}
func (r *Router) handleListTaskEvents(w http.ResponseWriter, req *http.Request) {
if r.taskSvc == nil || r.taskSvc.Store() == nil {
writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "agent task service is disabled"})
return
}
afterSeq, _ := strconv.ParseInt(req.URL.Query().Get("after_seq"), 10, 64)
limit := parsePositiveInt(req.URL.Query().Get("limit"), 200, 500)
task, err := r.taskSvc.Store().GetTask(req.Context(), req.PathValue("task_id"))
if err != nil {
writeTaskError(w, err)
return
}
if !r.authorizeTaskAccess(w, req, task) {
return
}
events, err := r.taskSvc.Store().ListEventsAfter(req.Context(), req.PathValue("task_id"), afterSeq, limit)
if err != nil {
writeTaskError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"events": events})
}
func (r *Router) handleGetTaskArtifact(w http.ResponseWriter, req *http.Request) {
if r.taskSvc == nil || r.taskSvc.Store() == nil {
writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "agent task service is disabled"})
return
}
task, err := r.taskSvc.Store().GetTask(req.Context(), req.PathValue("task_id"))
if err != nil {
writeTaskError(w, err)
return
}
if !r.authorizeTaskAccess(w, req, task) {
return
}
artifact, content, err := r.taskSvc.Store().GetArtifact(req.Context(), req.PathValue("task_id"), req.PathValue("artifact_id"))
if err != nil {
writeTaskError(w, err)
return
}
w.Header().Set("Content-Type", artifact.MimeType)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(content)
}
func (r *Router) handleInternalTaskEvent(w http.ResponseWriter, req *http.Request) {
if !r.authorizeInternalTaskRequest(w, req) {
return
}
var body internalTaskEventRequest
if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "invalid JSON: " + err.Error()})
return
}
event, task, err := r.taskSvc.AppendEvent(req.Context(), req.PathValue("task_id"), agenttask.AppendEventInput{
EventType: body.EventType,
Status: body.Status,
Message: body.Message,
Progress: body.Progress,
Payload: body.Payload,
})
if err != nil {
writeTaskError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"event": event, "task": task})
}
func (r *Router) handleInternalTaskArtifact(w http.ResponseWriter, req *http.Request) {
if !r.authorizeInternalTaskRequest(w, req) {
return
}
var body internalTaskArtifactRequest
if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
writeJSON(w, http.StatusBadRequest, ErrorResponse{Error: "invalid JSON: " + err.Error()})
return
}
artifact, err := r.taskSvc.CreateArtifact(req.Context(), req.PathValue("task_id"), agenttask.CreateArtifactInput{
Type: body.Type,
Title: body.Title,
MimeType: body.MimeType,
Content: body.Content,
Metadata: body.Metadata,
})
if err != nil {
writeTaskError(w, err)
return
}
writeJSON(w, http.StatusCreated, artifact)
}
func (r *Router) authorizeInternalTaskRequest(w http.ResponseWriter, req *http.Request) bool {
if r.taskSvc == nil || r.taskSvc.Store() == nil {
writeJSON(w, http.StatusServiceUnavailable, ErrorResponse{Error: "agent task service is disabled"})
return false
}
expected := strings.TrimSpace(os.Getenv("AGENT_INTERNAL_TOKEN"))
if expected == "" {
return true
}
got := strings.TrimSpace(req.Header.Get("Authorization"))
if got != "Bearer "+expected {
writeJSON(w, http.StatusUnauthorized, ErrorResponse{Error: "invalid internal token"})
return false
}
return true
}
func parsePositiveInt(raw string, fallback, max int) int {
v, err := strconv.Atoi(raw)
if err != nil || v <= 0 {
return fallback
}
if max > 0 && v > max {
return max
}
return v
}
func writeTaskError(w http.ResponseWriter, err error) {
if errors.Is(err, agenttask.ErrNotFound) {
writeJSON(w, http.StatusNotFound, ErrorResponse{Error: err.Error()})
return
}
if errors.Is(err, agenttask.ErrTerminal) {
writeJSON(w, http.StatusConflict, ErrorResponse{Error: err.Error()})
return
}
if errors.Is(err, context.Canceled) {
writeJSON(w, http.StatusRequestTimeout, ErrorResponse{Error: err.Error()})
return
}
writeJSON(w, http.StatusInternalServerError, ErrorResponse{Error: err.Error()})
}