diff --git a/components/public-api/go.mod b/components/public-api/go.mod index cf7fd578d..8fb265dac 100644 --- a/components/public-api/go.mod +++ b/components/public-api/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/gin-contrib/cors v1.7.6 github.com/gin-gonic/gin v1.11.0 + github.com/google/uuid v1.6.0 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.65.0 @@ -31,7 +32,7 @@ require ( github.com/go-playground/validator/v10 v10.30.1 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/goccy/go-yaml v1.19.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect diff --git a/components/public-api/handlers/integration_test.go b/components/public-api/handlers/integration_test.go index 5f1485dd4..765711341 100644 --- a/components/public-api/handlers/integration_test.go +++ b/components/public-api/handlers/integration_test.go @@ -27,6 +27,14 @@ func setupTestRouter() *gin.Engine { v1.POST("/sessions", CreateSession) v1.GET("/sessions/:id", GetSession) v1.DELETE("/sessions/:id", DeleteSession) + + v1.POST("/sessions/:id/runs", CreateRun) + v1.GET("/sessions/:id/runs", GetSessionRuns) + v1.POST("/sessions/:id/message", SendMessage) + v1.GET("/sessions/:id/output", GetSessionOutput) + v1.POST("/sessions/:id/start", StartSession) + v1.POST("/sessions/:id/stop", StopSession) + v1.POST("/sessions/:id/interrupt", InterruptSession) } return r @@ -109,8 +117,43 @@ func TestE2E_CreateSession(t *testing.T) { } // Verify request body was transformed correctly - if !strings.Contains(requestBody, "prompt") { - t.Errorf("Expected request body to contain 'prompt', got %s", requestBody) + if !strings.Contains(requestBody, "initialPrompt") { + t.Errorf("Expected request body to contain 'initialPrompt', got %s", requestBody) + } +} + +func TestE2E_CreateSession_WithDisplayName(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"name": "session-123"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions", + strings.NewReader(`{"task": "Fix the bug", "display_name": "Bug Fix Session"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify display_name was forwarded as displayName (camelCase) to backend + if receivedBody["displayName"] != "Bug Fix Session" { + t.Errorf("Expected displayName 'Bug Fix Session' in backend request, got %v", receivedBody["displayName"]) } } diff --git a/components/public-api/handlers/lifecycle.go b/components/public-api/handlers/lifecycle.go new file mode 100644 index 000000000..8b774c29e --- /dev/null +++ b/components/public-api/handlers/lifecycle.go @@ -0,0 +1,223 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" +) + +// StartSession handles POST /v1/sessions/:id/start +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. +func StartSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase == "" { + log.Printf("Session %s has no phase, treating as unknown", sessionID) + c.JSON(http.StatusConflict, gin.H{"error": "Session state is unknown"}) + return + } + + if phase == "running" || phase == "pending" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is already running or pending"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/start", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, nil) + if err != nil { + log.Printf("Backend request failed for start session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + c.JSON(http.StatusAccepted, transformSession(backendResp)) +} + +// StopSession handles POST /v1/sessions/:id/stop +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. +func StopSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return + } + + if phase == "" { + log.Printf("Session %s has no phase, treating as unknown", sessionID) + c.JSON(http.StatusConflict, gin.H{"error": "Session state is unknown"}) + return + } + + if phase == "completed" || phase == "failed" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/stop", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, nil) + if err != nil { + log.Printf("Backend request failed for stop session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + c.JSON(http.StatusAccepted, transformSession(backendResp)) +} + +// InterruptSession handles POST /v1/sessions/:id/interrupt +func InterruptSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/interrupt", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, []byte("{}")) + if err != nil { + log.Printf("Backend request failed for interrupt session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + c.JSON(http.StatusOK, types.MessageResponse{Message: "Interrupt signal sent"}) +} + +// getSessionPhase fetches the session from the backend and returns its normalized phase. +// On error, it writes the appropriate error response to the gin context. +func getSessionPhase(c *gin.Context, project, sessionID string) (string, error) { + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) + if err != nil { + log.Printf("Backend request failed for get session phase %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return "", fmt.Errorf("backend unavailable") + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return "", fmt.Errorf("internal server error") + } + + if resp.StatusCode != http.StatusOK { + forwardErrorResponse(c, resp.StatusCode, body) + return "", fmt.Errorf("backend returned %d", resp.StatusCode) + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return "", fmt.Errorf("internal server error") + } + + phase := "" + if status, ok := backendResp["status"].(map[string]interface{}); ok { + if p, ok := status["phase"].(string); ok { + phase = normalizePhase(p) + } + } + + return phase, nil +} diff --git a/components/public-api/handlers/lifecycle_test.go b/components/public-api/handlers/lifecycle_test.go new file mode 100644 index 000000000..9908cf989 --- /dev/null +++ b/components/public-api/handlers/lifecycle_test.go @@ -0,0 +1,281 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "ambient-code-public-api/types" +) + +func makeSessionBackend(t *testing.T, phase string, lifecyclePath string, lifecycleStatus int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "test-session", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Test task", + }, + "status": map[string]interface{}{ + "phase": phase, + }, + }) + return + } + + // POST lifecycle action + if r.Method == http.MethodPost && strings.Contains(r.URL.Path, lifecyclePath) { + w.WriteHeader(lifecycleStatus) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "test-session", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Test task", + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }) + return + } + + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Not found"}) + })) +} + +func TestE2E_StartSession(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SessionResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.ID != "test-session" { + t.Errorf("Expected ID 'test-session', got %q", response.ID) + } +} + +func TestE2E_StartSession_AlreadyRunning(t *testing.T) { + backend := makeSessionBackend(t, "Running", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StartSession_AlreadyPending(t *testing.T) { + backend := makeSessionBackend(t, "Pending", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for pending session, got %d", w.Code) + } +} + +func TestE2E_StopSession(t *testing.T) { + backend := makeSessionBackend(t, "Running", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StopSession_AlreadyCompleted(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StopSession_AlreadyFailed(t *testing.T) { + backend := makeSessionBackend(t, "Failed", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for failed session, got %d", w.Code) + } +} + +func TestE2E_InterruptSession(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("Expected POST, got %s", r.Method) + } + if !strings.Contains(r.URL.Path, "/agui/interrupt") { + t.Errorf("Expected path to contain /agui/interrupt, got %s", r.URL.Path) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"message": "ok"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/interrupt", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.MessageResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Message != "Interrupt signal sent" { + t.Errorf("Expected message 'Interrupt signal sent', got %q", response.Message) + } +} + +func TestE2E_InterruptSession_BackendError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadGateway) + json.NewEncoder(w).Encode(map[string]string{"error": "Runner unavailable"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/interrupt", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadGateway { + t.Errorf("Expected status 502, got %d", w.Code) + } +} + +func TestE2E_StartSession_InvalidSessionID(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/INVALID/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d", w.Code) + } +} + +func TestE2E_StopSession_BackendReturns404(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Session not found"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/nonexistent/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d", w.Code) + } +} diff --git a/components/public-api/handlers/output.go b/components/public-api/handlers/output.go new file mode 100644 index 000000000..647c92f5c --- /dev/null +++ b/components/public-api/handlers/output.go @@ -0,0 +1,227 @@ +package handlers + +import ( + "fmt" + "net/http" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// GetSessionOutput handles GET /v1/sessions/:id/output +func GetSessionOutput(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + format := c.DefaultQuery("format", "transcript") + if format != "transcript" && format != "compact" && format != "events" { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid format %q, must be one of: transcript, compact, events", format)}) + return + } + + runIDFilter := c.Query("run_id") + if runIDFilter != "" { + if _, err := uuid.Parse(runIDFilter); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid run_id format, must be a valid UUID"}) + return + } + } + + events, statusCode, err := fetchSessionEvents(c, project, sessionID) + if err != nil { + if statusCode > 0 { + c.JSON(statusCode, gin.H{"error": "Request failed"}) + } else { + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + } + return + } + + // Filter by run_id if provided + if runIDFilter != "" { + filtered := make([]map[string]interface{}, 0) + for _, event := range events { + if runID, _ := event["runId"].(string); runID == runIDFilter { + filtered = append(filtered, event) + } + } + events = filtered + } + + switch format { + case "events": + c.JSON(http.StatusOK, types.EventsOutputResponse{ + SessionID: sessionID, + Format: "events", + Events: events, + }) + case "compact": + compacted := compactEvents(events) + c.JSON(http.StatusOK, types.EventsOutputResponse{ + SessionID: sessionID, + Format: "compact", + Events: compacted, + }) + case "transcript": + messages := extractTranscript(events) + c.JSON(http.StatusOK, types.TranscriptOutputResponse{ + SessionID: sessionID, + Format: "transcript", + Messages: messages, + }) + } +} + +// compactEvents merges consecutive TEXT_MESSAGE_CONTENT and TOOL_CALL_ARGS deltas. +func compactEvents(events []map[string]interface{}) []map[string]interface{} { + result := make([]map[string]interface{}, 0, len(events)) + + for i := 0; i < len(events); i++ { + event := events[i] + eventType, _ := event["type"].(string) + + if eventType == "TEXT_MESSAGE_CONTENT" { + messageID, _ := event["messageId"].(string) + merged := copyEvent(event) + delta, _ := merged["delta"].(string) + + for i+1 < len(events) { + next := events[i+1] + nextType, _ := next["type"].(string) + nextMsgID, _ := next["messageId"].(string) + if nextType == "TEXT_MESSAGE_CONTENT" && nextMsgID == messageID { + nextDelta, _ := next["delta"].(string) + delta += nextDelta + i++ + } else { + break + } + } + merged["delta"] = delta + result = append(result, merged) + } else if eventType == "TOOL_CALL_ARGS" { + toolCallID, _ := event["toolCallId"].(string) + merged := copyEvent(event) + delta, _ := merged["delta"].(string) + + for i+1 < len(events) { + next := events[i+1] + nextType, _ := next["type"].(string) + nextTCID, _ := next["toolCallId"].(string) + if nextType == "TOOL_CALL_ARGS" && nextTCID == toolCallID { + nextDelta, _ := next["delta"].(string) + delta += nextDelta + i++ + } else { + break + } + } + merged["delta"] = delta + result = append(result, merged) + } else { + result = append(result, event) + } + } + + return result +} + +// extractTranscript finds the last MESSAGES_SNAPSHOT event and extracts messages. +func extractTranscript(events []map[string]interface{}) []types.TranscriptMessage { + // Find last MESSAGES_SNAPSHOT event + var snapshotMessages []interface{} + for i := len(events) - 1; i >= 0; i-- { + eventType, _ := events[i]["type"].(string) + if eventType == "MESSAGES_SNAPSHOT" { + if msgs, ok := events[i]["messages"].([]interface{}); ok { + snapshotMessages = msgs + } + break + } + } + + if snapshotMessages == nil { + return []types.TranscriptMessage{} + } + + messages := make([]types.TranscriptMessage, 0, len(snapshotMessages)) + for _, raw := range snapshotMessages { + msg, ok := raw.(map[string]interface{}) + if !ok { + continue + } + + tm := types.TranscriptMessage{} + if id, ok := msg["id"].(string); ok { + tm.ID = id + } + if role, ok := msg["role"].(string); ok { + tm.Role = role + } + if content, ok := msg["content"].(string); ok { + tm.Content = content + } + if toolCallID, ok := msg["toolCallId"].(string); ok { + tm.ToolCallID = toolCallID + } + if name, ok := msg["name"].(string); ok { + tm.Name = name + } + if timestamp, ok := msg["timestamp"].(string); ok { + tm.Timestamp = timestamp + } + + // Extract tool calls if present + if toolCalls, ok := msg["toolCalls"].([]interface{}); ok { + for _, tcRaw := range toolCalls { + tc, ok := tcRaw.(map[string]interface{}) + if !ok { + continue + } + ttc := types.TranscriptToolCall{} + if id, ok := tc["id"].(string); ok { + ttc.ID = id + } + if name, ok := tc["name"].(string); ok { + ttc.Name = name + } + if args, ok := tc["args"].(string); ok { + ttc.Args = args + } + if result, ok := tc["result"].(string); ok { + ttc.Result = result + } + if status, ok := tc["status"].(string); ok { + ttc.Status = status + } + if duration, ok := tc["duration"]; ok { + ttc.Duration = toInt64(duration) + } + tm.ToolCalls = append(tm.ToolCalls, ttc) + } + } + + messages = append(messages, tm) + } + + return messages +} + +// copyEvent creates a shallow copy of an event map. +func copyEvent(event map[string]interface{}) map[string]interface{} { + copied := make(map[string]interface{}, len(event)) + for k, v := range event { + copied[k] = v + } + return copied +} diff --git a/components/public-api/handlers/output_test.go b/components/public-api/handlers/output_test.go new file mode 100644 index 000000000..8743c96b2 --- /dev/null +++ b/components/public-api/handlers/output_test.go @@ -0,0 +1,338 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "ambient-code-public-api/types" +) + +func TestCompactEvents(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1", "messageId": "m1", "role": "assistant"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "Hello "}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "world"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "!"}, + {"type": "TEXT_MESSAGE_END", "runId": "r1", "messageId": "m1"}, + {"type": "TOOL_CALL_START", "runId": "r1", "toolCallId": "tc1", "toolCallName": "read"}, + {"type": "TOOL_CALL_ARGS", "runId": "r1", "toolCallId": "tc1", "delta": `{"file"`}, + {"type": "TOOL_CALL_ARGS", "runId": "r1", "toolCallId": "tc1", "delta": `: "main.go"}`}, + {"type": "TOOL_CALL_END", "runId": "r1", "toolCallId": "tc1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + compacted := compactEvents(events) + + // Should merge TEXT_MESSAGE_CONTENT: 3 deltas → 1 + // Should merge TOOL_CALL_ARGS: 2 deltas → 1 + // Other events pass through: RUN_STARTED, TEXT_MESSAGE_START, TEXT_MESSAGE_END, TOOL_CALL_START, TOOL_CALL_END, RUN_FINISHED = 6 + // Total: 6 + 1 + 1 = 8 + if len(compacted) != 8 { + t.Fatalf("Expected 8 compacted events, got %d", len(compacted)) + } + + // Find the merged text content event + for _, e := range compacted { + if e["type"] == "TEXT_MESSAGE_CONTENT" { + if e["delta"] != "Hello world!" { + t.Errorf("Expected merged delta 'Hello world!', got %q", e["delta"]) + } + } + if e["type"] == "TOOL_CALL_ARGS" { + if e["delta"] != `{"file": "main.go"}` { + t.Errorf("Expected merged delta '{\"file\": \"main.go\"}', got %q", e["delta"]) + } + } + } +} + +func TestCompactEvents_DifferentMessageIDs(t *testing.T) { + events := []map[string]interface{}{ + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "A"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m2", "delta": "B"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "C"}, + } + + compacted := compactEvents(events) + + // Different messageIds should NOT merge + if len(compacted) != 3 { + t.Fatalf("Expected 3 events (different messageIds), got %d", len(compacted)) + } +} + +func TestCompactEvents_Empty(t *testing.T) { + compacted := compactEvents([]map[string]interface{}{}) + if len(compacted) != 0 { + t.Errorf("Expected 0 events, got %d", len(compacted)) + } +} + +func TestExtractTranscript(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1"}, + {"type": "MESSAGES_SNAPSHOT", "runId": "r1", "messages": []interface{}{ + map[string]interface{}{ + "id": "msg-1", + "role": "user", + "content": "Hello", + }, + map[string]interface{}{ + "id": "msg-2", + "role": "assistant", + "content": "Hi there!", + "toolCalls": []interface{}{ + map[string]interface{}{ + "id": "tc-1", + "name": "read", + "args": `{"file": "main.go"}`, + "status": "completed", + "duration": float64(100), + }, + }, + }, + map[string]interface{}{ + "id": "msg-3", + "role": "tool", + "content": "file content here", + "toolCallId": "tc-1", + "name": "read", + }, + }}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + messages := extractTranscript(events) + + if len(messages) != 3 { + t.Fatalf("Expected 3 messages, got %d", len(messages)) + } + + if messages[0].Role != "user" || messages[0].Content != "Hello" { + t.Errorf("First message: role=%q content=%q", messages[0].Role, messages[0].Content) + } + + if messages[1].Role != "assistant" || messages[1].Content != "Hi there!" { + t.Errorf("Second message: role=%q content=%q", messages[1].Role, messages[1].Content) + } + if len(messages[1].ToolCalls) != 1 { + t.Fatalf("Expected 1 tool call, got %d", len(messages[1].ToolCalls)) + } + if messages[1].ToolCalls[0].Name != "read" { + t.Errorf("Expected tool call name 'read', got %q", messages[1].ToolCalls[0].Name) + } + if messages[1].ToolCalls[0].Duration != 100 { + t.Errorf("Expected tool call duration 100, got %d", messages[1].ToolCalls[0].Duration) + } + + if messages[2].Role != "tool" || messages[2].ToolCallID != "tc-1" { + t.Errorf("Third message: role=%q toolCallId=%q", messages[2].Role, messages[2].ToolCallID) + } +} + +func TestExtractTranscript_NoSnapshot(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + messages := extractTranscript(events) + if len(messages) != 0 { + t.Errorf("Expected 0 messages when no snapshot, got %d", len(messages)) + } +} + +func TestExtractTranscript_UsesLastSnapshot(t *testing.T) { + events := []map[string]interface{}{ + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "old", "role": "user", "content": "Old"}, + }}, + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "new", "role": "user", "content": "New"}, + }}, + } + + messages := extractTranscript(events) + if len(messages) != 1 { + t.Fatalf("Expected 1 message, got %d", len(messages)) + } + if messages[0].Content != "New" { + t.Errorf("Expected last snapshot content 'New', got %q", messages[0].Content) + } +} + +func makeExportBackend(t *testing.T, events []map[string]interface{}) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + eventsJSON, _ := json.Marshal(events) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "sessionId": "test-session", + "aguiEvents": json.RawMessage(eventsJSON), + }) + })) +} + +func TestE2E_GetOutput_Transcript(t *testing.T) { + events := []map[string]interface{}{ + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "m1", "role": "user", "content": "Hello"}, + }}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.TranscriptOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "transcript" { + t.Errorf("Expected format 'transcript', got %q", response.Format) + } + if len(response.Messages) != 1 { + t.Fatalf("Expected 1 message, got %d", len(response.Messages)) + } +} + +func TestE2E_GetOutput_Events(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=events", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "events" { + t.Errorf("Expected format 'events', got %q", response.Format) + } + if len(response.Events) != 2 { + t.Errorf("Expected 2 events, got %d", len(response.Events)) + } +} + +func TestE2E_GetOutput_Compact(t *testing.T) { + events := []map[string]interface{}{ + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "A"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "B"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=compact", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "compact" { + t.Errorf("Expected format 'compact', got %q", response.Format) + } + if len(response.Events) != 1 { + t.Errorf("Expected 1 compacted event, got %d", len(response.Events)) + } +} + +func TestE2E_GetOutput_InvalidFormat(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=xml", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected 400 for invalid format, got %d", w.Code) + } +} + +func TestE2E_GetOutput_InvalidRunID(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?run_id=not-a-uuid", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected 400 for invalid run_id, got %d", w.Code) + } +} + +func TestE2E_GetOutput_RunIDFilter(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "11111111-1111-1111-1111-111111111111"}, + {"type": "RUN_FINISHED", "runId": "11111111-1111-1111-1111-111111111111"}, + {"type": "RUN_STARTED", "runId": "22222222-2222-2222-2222-222222222222"}, + {"type": "RUN_FINISHED", "runId": "22222222-2222-2222-2222-222222222222"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=events&run_id=11111111-1111-1111-1111-111111111111", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if len(response.Events) != 2 { + t.Errorf("Expected 2 events after filtering, got %d", len(response.Events)) + } +} diff --git a/components/public-api/handlers/proxy.go b/components/public-api/handlers/proxy.go index b4e8c6ad7..23719e939 100644 --- a/components/public-api/handlers/proxy.go +++ b/components/public-api/handlers/proxy.go @@ -1,3 +1,4 @@ +// Package handlers implements HTTP request handlers for the public API gateway. package handlers import ( @@ -42,8 +43,9 @@ func getEnvOrDefault(key, defaultValue string) string { return defaultValue } -// ProxyRequest forwards a request to the backend and returns the response -func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Response, error) { +// ProxyRequest forwards a request to the backend and returns the response. +// The caller MUST call the returned context.CancelFunc after reading the response body. +func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Response, context.CancelFunc, error) { fullURL := fmt.Sprintf("%s%s", BackendURL, path) var bodyReader io.Reader @@ -54,11 +56,11 @@ func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Respo // Create context with explicit timeout (in addition to HTTP client timeout) // This ensures we respect context cancellation from the client ctx, cancel := context.WithTimeout(c.Request.Context(), BackendTimeout) - defer cancel() req, err := http.NewRequestWithContext(ctx, method, fullURL, bodyReader) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + cancel() + return nil, nil, fmt.Errorf("failed to create request: %w", err) } // Forward the token @@ -77,15 +79,16 @@ func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Respo resp, err := HTTPClient.Do(req) if err != nil { - return nil, fmt.Errorf("backend request failed: %w", err) + cancel() + return nil, nil, fmt.Errorf("backend request failed: %w", err) } - return resp, nil + return resp, cancel, nil } // ProxyAndRespond proxies a request and writes the response directly func ProxyAndRespond(c *gin.Context, method, path string, body []byte) { - resp, err := ProxyRequest(c, method, path, body) + resp, cancel, err := ProxyRequest(c, method, path, body) if err != nil { // Log detailed error internally, return generic message to user // SECURITY: Never expose internal error details (may contain URLs, tokens, etc.) @@ -93,6 +96,7 @@ func ProxyAndRespond(c *gin.Context, method, path string, body []byte) { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() // Read response body diff --git a/components/public-api/handlers/proxy_test.go b/components/public-api/handlers/proxy_test.go index 95cd54697..45f37f586 100644 --- a/components/public-api/handlers/proxy_test.go +++ b/components/public-api/handlers/proxy_test.go @@ -79,9 +79,10 @@ func TestProxyRequest_BackendUnavailable(t *testing.T) { c.Set(ContextKeyToken, "test-token") c.Set(ContextKeyProject, "test-project") - resp, err := ProxyRequest(c, http.MethodGet, "/api/projects/test/sessions", nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, "/api/projects/test/sessions", nil) if err == nil { + cancel() resp.Body.Close() t.Error("Expected error for unavailable backend") } diff --git a/components/public-api/handlers/runs.go b/components/public-api/handlers/runs.go new file mode 100644 index 000000000..4523503b0 --- /dev/null +++ b/components/public-api/handlers/runs.go @@ -0,0 +1,399 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "slices" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// CreateRun handles POST /v1/sessions/:id/runs +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. +func CreateRun(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase != "running" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + + var req types.CreateRunRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) + return + } + + // Build AG-UI RunAgentInput messages + messages := make([]map[string]interface{}, len(req.Messages)) + for i, msg := range req.Messages { + m := map[string]interface{}{ + "role": msg.Role, + "content": msg.Content, + } + if msg.ID != "" { + m["id"] = msg.ID + } else { + m["id"] = uuid.New().String() + } + messages[i] = m + } + + messagesJSON, err := json.Marshal(messages) + if err != nil { + log.Printf("Failed to marshal messages: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + // Build RunAgentInput + backendReq := map[string]interface{}{ + "messages": json.RawMessage(messagesJSON), + } + + if req.ThreadID != "" { + backendReq["threadId"] = req.ThreadID + } else { + backendReq["threadId"] = sessionID + } + + if req.RunID != "" { + backendReq["runId"] = req.RunID + } + + reqBody, err := json.Marshal(backendReq) + if err != nil { + log.Printf("Failed to marshal request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) + if err != nil { + log.Printf("Backend request failed for create run: %v", err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + runID, _ := backendResp["runId"].(string) + threadID, _ := backendResp["threadId"].(string) + + c.JSON(http.StatusAccepted, types.CreateRunResponse{ + RunID: runID, + ThreadID: threadID, + }) +} + +// SendMessage handles POST /v1/sessions/:id/message +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. +func SendMessage(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase != "running" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + + var req types.SendMessageRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) + return + } + + runID := uuid.New().String() + messageID := uuid.New().String() + + messages := []map[string]interface{}{ + { + "id": messageID, + "role": "user", + "content": req.Content, + }, + } + + messagesJSON, err := json.Marshal(messages) + if err != nil { + log.Printf("Failed to marshal messages: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + backendReq := map[string]interface{}{ + "threadId": sessionID, + "runId": runID, + "messages": json.RawMessage(messagesJSON), + } + + reqBody, err := json.Marshal(backendReq) + if err != nil { + log.Printf("Failed to marshal request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) + if err != nil { + log.Printf("Backend request failed for send message: %v", err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + respRunID, _ := backendResp["runId"].(string) + respThreadID, _ := backendResp["threadId"].(string) + + c.JSON(http.StatusAccepted, types.SendMessageResponse{ + RunID: respRunID, + ThreadID: respThreadID, + }) +} + +// GetSessionRuns handles GET /v1/sessions/:id/runs +func GetSessionRuns(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + events, statusCode, err := fetchSessionEvents(c, project, sessionID) + if err != nil { + if statusCode > 0 { + c.JSON(statusCode, gin.H{"error": "Request failed"}) + } else { + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + } + return + } + + runs := deriveRunSummaries(events) + + c.JSON(http.StatusOK, types.SessionRunsResponse{ + SessionID: sessionID, + Runs: runs, + }) +} + +// fetchSessionEvents retrieves AG-UI events from the backend export endpoint. +// Returns the events array, an HTTP status code for errors, and any error. +func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string]interface{}, int, error) { + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/export", project, sessionID) + + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) + if err != nil { + log.Printf("Backend request failed for export: %v", err) + return nil, 0, fmt.Errorf("backend unavailable") + } + defer cancel() + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read export response: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") + } + + if resp.StatusCode != http.StatusOK { + // Try to extract error message from backend response + var errorResp map[string]interface{} + if jsonErr := json.Unmarshal(body, &errorResp); jsonErr == nil { + if errMsg, ok := errorResp["error"].(string); ok { + return nil, resp.StatusCode, fmt.Errorf("%s", errMsg) + } + } + return nil, resp.StatusCode, fmt.Errorf("request failed") + } + + var exportResp struct { + AGUIEvents json.RawMessage `json:"aguiEvents"` + } + if err := json.Unmarshal(body, &exportResp); err != nil { + log.Printf("Failed to parse export response: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") + } + + var events []map[string]interface{} + if err := json.Unmarshal(exportResp.AGUIEvents, &events); err != nil { + log.Printf("Failed to parse aguiEvents: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") + } + + return events, 0, nil +} + +// deriveRunSummaries groups events by runId and builds run summaries. +func deriveRunSummaries(events []map[string]interface{}) []types.RunSummary { + type runData struct { + summary types.RunSummary + order int + } + + runMap := make(map[string]*runData) + orderCounter := 0 + + for _, event := range events { + runID, _ := event["runId"].(string) + if runID == "" { + continue + } + + rd, exists := runMap[runID] + if !exists { + rd = &runData{ + summary: types.RunSummary{ + RunID: runID, + Status: "running", + }, + order: orderCounter, + } + orderCounter++ + runMap[runID] = rd + } + + rd.summary.EventCount++ + + eventType, _ := event["type"].(string) + timestamp := toInt64(event["timestamp"]) + + switch eventType { + case "RUN_STARTED": + if timestamp > 0 { + rd.summary.StartedAt = timestamp + } + case "RUN_FINISHED": + rd.summary.Status = "completed" + if timestamp > 0 { + rd.summary.FinishedAt = timestamp + } + case "RUN_ERROR": + rd.summary.Status = "error" + if timestamp > 0 { + rd.summary.FinishedAt = timestamp + } + case "TEXT_MESSAGE_START": + role, _ := event["role"].(string) + content, _ := event["content"].(string) + if role == "user" && content != "" && rd.summary.UserMessage == "" { + rd.summary.UserMessage = content + } + } + } + + // Build sorted slice + sorted := make([]*runData, 0, len(runMap)) + for _, rd := range runMap { + sorted = append(sorted, rd) + } + slices.SortFunc(sorted, func(a, b *runData) int { + return a.order - b.order + }) + runs := make([]types.RunSummary, 0, len(sorted)) + for _, rd := range sorted { + runs = append(runs, rd.summary) + } + + return runs +} + +// toInt64 converts a JSON number (float64) to int64. +func toInt64(v interface{}) int64 { + switch n := v.(type) { + case float64: + return int64(n) + case int64: + return n + case json.Number: + i, _ := n.Int64() + return i + default: + return 0 + } +} diff --git a/components/public-api/handlers/runs_test.go b/components/public-api/handlers/runs_test.go new file mode 100644 index 000000000..4d205d2fe --- /dev/null +++ b/components/public-api/handlers/runs_test.go @@ -0,0 +1,441 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "ambient-code-public-api/types" +) + +func TestDeriveRunSummaries(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-1", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-1", "role": "user", "content": "Hello"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "run-1", "delta": "Hi"}, + {"type": "RUN_FINISHED", "runId": "run-1", "timestamp": float64(2000)}, + {"type": "RUN_STARTED", "runId": "run-2", "timestamp": float64(3000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-2", "role": "user", "content": "Fix bug"}, + {"type": "RUN_ERROR", "runId": "run-2", "timestamp": float64(4000)}, + } + + runs := deriveRunSummaries(events) + + if len(runs) != 2 { + t.Fatalf("Expected 2 runs, got %d", len(runs)) + } + + // First run + if runs[0].RunID != "run-1" { + t.Errorf("Expected run-1, got %s", runs[0].RunID) + } + if runs[0].Status != "completed" { + t.Errorf("Expected completed, got %s", runs[0].Status) + } + if runs[0].StartedAt != 1000 { + t.Errorf("Expected started_at 1000, got %d", runs[0].StartedAt) + } + if runs[0].FinishedAt != 2000 { + t.Errorf("Expected finished_at 2000, got %d", runs[0].FinishedAt) + } + if runs[0].UserMessage != "Hello" { + t.Errorf("Expected user message 'Hello', got %q", runs[0].UserMessage) + } + if runs[0].EventCount != 4 { + t.Errorf("Expected 4 events, got %d", runs[0].EventCount) + } + + // Second run + if runs[1].RunID != "run-2" { + t.Errorf("Expected run-2, got %s", runs[1].RunID) + } + if runs[1].Status != "error" { + t.Errorf("Expected error, got %s", runs[1].Status) + } + if runs[1].UserMessage != "Fix bug" { + t.Errorf("Expected user message 'Fix bug', got %q", runs[1].UserMessage) + } +} + +func TestDeriveRunSummaries_Empty(t *testing.T) { + runs := deriveRunSummaries([]map[string]interface{}{}) + if len(runs) != 0 { + t.Errorf("Expected 0 runs, got %d", len(runs)) + } +} + +func TestDeriveRunSummaries_NoRunID(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED"}, + {"type": "TEXT_MESSAGE_START", "role": "user"}, + } + runs := deriveRunSummaries(events) + if len(runs) != 0 { + t.Errorf("Expected 0 runs for events without runId, got %d", len(runs)) + } +} + +func TestDeriveRunSummaries_RunningStatus(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-1", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-1", "role": "user", "content": "Hello"}, + } + runs := deriveRunSummaries(events) + if len(runs) != 1 { + t.Fatalf("Expected 1 run, got %d", len(runs)) + } + if runs[0].Status != "running" { + t.Errorf("Expected running, got %s", runs[0].Status) + } +} + +func TestToInt64(t *testing.T) { + tests := []struct { + name string + input interface{} + expected int64 + }{ + {"float64", float64(1234), 1234}, + {"int64", int64(5678), 5678}, + {"json.Number", json.Number("9999"), 9999}, + {"string", "not-a-number", 0}, + {"nil", nil, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := toInt64(tt.input) + if result != tt.expected { + t.Errorf("toInt64(%v) = %d, want %d", tt.input, result, tt.expected) + } + }) + } +} + +func TestE2E_CreateRun(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + return + } + + if r.Method != http.MethodPost { + t.Errorf("Expected POST, got %s", r.Method) + } + if !strings.Contains(r.URL.Path, "/agui/run") { + t.Errorf("Expected path to contain /agui/run, got %s", r.URL.Path) + } + + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "runId": "test-run-id", + "threadId": "test-session", + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": [{"role": "user", "content": "Hello"}]}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.CreateRunResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.RunID != "test-run-id" { + t.Errorf("Expected run_id 'test-run-id', got %q", response.RunID) + } + if response.ThreadID != "test-session" { + t.Errorf("Expected thread_id 'test-session', got %q", response.ThreadID) + } + + // Verify messages were forwarded + if receivedBody["messages"] == nil { + t.Error("Expected messages in request body") + } +} + +func makeRunningSessionBackend(t *testing.T) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + })) +} + +func TestE2E_CreateRun_InvalidBody(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": []}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_CreateRun_NoMessages(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_CreateRun_SessionNotRunning(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/agui/run", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": [{"role": "user", "content": "Hello"}]}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for completed session, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_GetSessionRuns(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(r.URL.Path, "/export") { + t.Errorf("Expected path to contain /export, got %s", r.URL.Path) + } + + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-abc", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-abc", "role": "user", "content": "Hello"}, + {"type": "RUN_FINISHED", "runId": "run-abc", "timestamp": float64(2000)}, + } + eventsJSON, _ := json.Marshal(events) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "sessionId": "test-session", + "aguiEvents": json.RawMessage(eventsJSON), + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/runs", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SessionRunsResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.SessionID != "test-session" { + t.Errorf("Expected session_id 'test-session', got %q", response.SessionID) + } + if len(response.Runs) != 1 { + t.Fatalf("Expected 1 run, got %d", len(response.Runs)) + } + if response.Runs[0].Status != "completed" { + t.Errorf("Expected completed status, got %q", response.Runs[0].Status) + } + if response.Runs[0].UserMessage != "Hello" { + t.Errorf("Expected user message 'Hello', got %q", response.Runs[0].UserMessage) + } +} + +func TestE2E_GetSessionRuns_BackendError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Session not found"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/nonexistent/runs", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_SendMessage(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + return + } + + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "runId": "generated-run-id", + "threadId": "test-session", + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": "Fix the bug please"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SendMessageResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.RunID != "generated-run-id" { + t.Errorf("Expected run_id 'generated-run-id', got %q", response.RunID) + } + if response.ThreadID != "test-session" { + t.Errorf("Expected thread_id 'test-session', got %q", response.ThreadID) + } + + // Verify threadId was set to session ID + if receivedBody["threadId"] != "test-session" { + t.Errorf("Expected threadId 'test-session', got %v", receivedBody["threadId"]) + } + // Verify runId was generated + if receivedBody["runId"] == nil || receivedBody["runId"] == "" { + t.Error("Expected runId to be generated") + } +} + +func TestE2E_SendMessage_EmptyContent(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": ""}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400 for empty content, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_SendMessage_SessionNotRunning(t *testing.T) { + backend := makeSessionBackend(t, "Failed", "/agui/run", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": "Hello"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for failed session, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index 758914e18..b6beea75a 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -6,6 +6,7 @@ import ( "io" "log" "net/http" + "strings" "ambient-code-public-api/types" @@ -21,11 +22,12 @@ func ListSessions(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions", project) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() // Read response body @@ -77,12 +79,13 @@ func GetSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { log.Printf("Backend request failed for session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -117,13 +120,16 @@ func CreateSession(c *gin.Context) { var req types.CreateSessionRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) return } // Transform to backend format backendReq := map[string]interface{}{ - "prompt": req.Task, + "initialPrompt": req.Task, + } + if req.DisplayName != "" { + backendReq["displayName"] = req.DisplayName } if req.Model != "" { backendReq["model"] = req.Model @@ -131,15 +137,31 @@ func CreateSession(c *gin.Context) { if len(req.Repos) > 0 { repos := make([]map[string]interface{}, len(req.Repos)) for i, r := range req.Repos { - repos[i] = map[string]interface{}{ - "input": map[string]interface{}{ - "url": r.URL, - "branch": r.Branch, - }, + repo := map[string]interface{}{ + "url": r.URL, + } + if r.Branch != "" { + repo["branch"] = r.Branch } + repos[i] = repo } backendReq["repos"] = repos } + if req.ActiveWorkflow != nil && strings.TrimSpace(req.ActiveWorkflow.GitURL) != "" { + wf := map[string]interface{}{ + "gitUrl": req.ActiveWorkflow.GitURL, + } + if req.ActiveWorkflow.Branch != "" { + wf["branch"] = req.ActiveWorkflow.Branch + } + if req.ActiveWorkflow.Path != "" { + wf["path"] = req.ActiveWorkflow.Path + } + backendReq["activeWorkflow"] = wf + } + if len(req.EnvironmentVariables) > 0 { + backendReq["environmentVariables"] = req.EnvironmentVariables + } reqBody, err := json.Marshal(backendReq) if err != nil { @@ -150,12 +172,13 @@ func CreateSession(c *gin.Context) { path := fmt.Sprintf("/api/projects/%s/agentic-sessions", project) - resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) if err != nil { log.Printf("Backend request failed for create session: %v", err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) @@ -199,12 +222,13 @@ func DeleteSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) - resp, err := ProxyRequest(c, http.MethodDelete, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodDelete, path, nil) if err != nil { log.Printf("Backend request failed for delete session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusOK { @@ -221,17 +245,19 @@ func DeleteSession(c *gin.Context) { forwardErrorResponse(c, resp.StatusCode, body) } -// forwardErrorResponse forwards backend error with consistent JSON format +// forwardErrorResponse forwards backend error with consistent JSON format. +// SECURITY: Only forwards the "error" field to prevent leaking internal details. func forwardErrorResponse(c *gin.Context, statusCode int, body []byte) { - // Try to parse as JSON error response + // Try to parse as JSON and extract only the "error" field var errorResp map[string]interface{} if err := json.Unmarshal(body, &errorResp); err == nil { - // Backend returned valid JSON, forward it - c.JSON(statusCode, errorResp) - return + if errMsg, ok := errorResp["error"].(string); ok { + c.JSON(statusCode, gin.H{"error": errMsg}) + return + } } - // Backend returned non-JSON, wrap in standard error format + // Backend returned non-JSON or no "error" field, wrap in standard error format c.JSON(statusCode, gin.H{"error": "Request failed"}) } @@ -258,12 +284,37 @@ func transformSession(data map[string]interface{}) types.SessionResponse { // Extract spec if spec, ok := data["spec"].(map[string]interface{}); ok { - if prompt, ok := spec["prompt"].(string); ok { + if prompt, ok := spec["initialPrompt"].(string); ok { + session.Task = prompt + } else if prompt, ok := spec["prompt"].(string); ok { session.Task = prompt } if model, ok := spec["model"].(string); ok { session.Model = model } + if displayName, ok := spec["displayName"].(string); ok { + session.DisplayName = displayName + } + if repos, ok := spec["repos"].([]interface{}); ok { + for _, r := range repos { + repo, ok := r.(map[string]interface{}) + if !ok { + continue + } + sr := types.SessionRepo{} + if input, ok := repo["input"].(map[string]interface{}); ok { + if url, ok := input["url"].(string); ok { + sr.URL = url + } + if branch, ok := input["branch"].(string); ok { + sr.Branch = branch + } + } + if sr.URL != "" { + session.Repos = append(session.Repos, sr) + } + } + } } // Extract status @@ -290,7 +341,8 @@ func transformSession(data map[string]interface{}) types.SessionResponse { return session } -// normalizePhase converts K8s phase to simplified status +// normalizePhase converts K8s phase to simplified lowercase status. +// The public API contract guarantees status values are always lowercase. func normalizePhase(phase string) string { switch phase { case "Pending", "Creating", "Initializing": @@ -302,6 +354,6 @@ func normalizePhase(phase string) string { case "Failed", "Error": return "failed" default: - return phase + return strings.ToLower(phase) } } diff --git a/components/public-api/handlers/sessions_test.go b/components/public-api/handlers/sessions_test.go index 43e7ff920..3ea114706 100644 --- a/components/public-api/handlers/sessions_test.go +++ b/components/public-api/handlers/sessions_test.go @@ -1,6 +1,7 @@ package handlers import ( + "encoding/json" "net/http/httptest" "testing" @@ -23,8 +24,8 @@ func TestTransformSession(t *testing.T) { "creationTimestamp": "2026-01-29T10:00:00Z", }, "spec": map[string]interface{}{ - "prompt": "Fix the bug", - "model": "claude-sonnet-4", + "initialPrompt": "Fix the bug", + "model": "claude-sonnet-4", }, "status": map[string]interface{}{ "phase": "Running", @@ -39,6 +40,27 @@ func TestTransformSession(t *testing.T) { CreatedAt: "2026-01-29T10:00:00Z", }, }, + { + name: "Legacy prompt field fallback", + input: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "session-legacy", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Legacy prompt", + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }, + expected: types.SessionResponse{ + ID: "session-legacy", + Status: "running", + Task: "Legacy prompt", + CreatedAt: "2026-01-29T10:00:00Z", + }, + }, { name: "Completed session with result", input: map[string]interface{}{ @@ -104,6 +126,40 @@ func TestTransformSession(t *testing.T) { Task: "List item task", }, }, + { + name: "Session with displayName and repos", + input: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "session-with-repos", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Fix the bug", + "displayName": "My Cool Session", + "repos": []interface{}{ + map[string]interface{}{ + "input": map[string]interface{}{ + "url": "https://github.com/org/repo", + "branch": "main", + }, + }, + }, + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }, + expected: types.SessionResponse{ + ID: "session-with-repos", + Status: "running", + DisplayName: "My Cool Session", + Task: "Fix the bug", + Repos: []types.SessionRepo{ + {URL: "https://github.com/org/repo", Branch: "main"}, + }, + CreatedAt: "2026-01-29T10:00:00Z", + }, + }, { name: "Empty session", input: map[string]interface{}{}, @@ -141,6 +197,21 @@ func TestTransformSession(t *testing.T) { if result.Error != tt.expected.Error { t.Errorf("Error = %q, want %q", result.Error, tt.expected.Error) } + if result.DisplayName != tt.expected.DisplayName { + t.Errorf("DisplayName = %q, want %q", result.DisplayName, tt.expected.DisplayName) + } + if len(result.Repos) != len(tt.expected.Repos) { + t.Errorf("Repos count = %d, want %d", len(result.Repos), len(tt.expected.Repos)) + } else { + for i, r := range result.Repos { + if r.URL != tt.expected.Repos[i].URL { + t.Errorf("Repos[%d].URL = %q, want %q", i, r.URL, tt.expected.Repos[i].URL) + } + if r.Branch != tt.expected.Repos[i].Branch { + t.Errorf("Repos[%d].Branch = %q, want %q", i, r.Branch, tt.expected.Repos[i].Branch) + } + } + } }) } } @@ -159,7 +230,8 @@ func TestNormalizePhase(t *testing.T) { {"Succeeded", "completed"}, {"Failed", "failed"}, {"Error", "failed"}, - {"Unknown", "Unknown"}, + {"Unknown", "unknown"}, + {"Stopping", "stopping"}, } for _, tt := range tests { @@ -217,6 +289,20 @@ func TestForwardErrorResponse(t *testing.T) { expectedStatus: 500, expectJSON: true, // Should be wrapped in generic JSON }, + { + name: "Backend returns JSON with extra internal fields", + statusCode: 500, + body: []byte(`{"error": "Session failed", "internal_trace": "k8s.io/xyz:123", "namespace": "secret-ns"}`), + expectedStatus: 500, + expectJSON: true, // Should only forward "error" field + }, + { + name: "Backend returns JSON without error field", + statusCode: 500, + body: []byte(`{"message": "some internal detail"}`), + expectedStatus: 500, + expectJSON: true, // Should fall back to generic error + }, } for _, tt := range tests { @@ -241,6 +327,51 @@ func TestForwardErrorResponse(t *testing.T) { } } +func TestForwardErrorResponse_FiltersInternalFields(t *testing.T) { + gin.SetMode(gin.TestMode) + + // Backend returns JSON with extra internal fields that should be stripped + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + + forwardErrorResponse(c, 500, []byte(`{"error": "Session failed", "internal_trace": "k8s.io/xyz:123", "namespace": "secret-ns"}`)) + + var response map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &response) + + if response["error"] != "Session failed" { + t.Errorf("Expected error 'Session failed', got %v", response["error"]) + } + if _, exists := response["internal_trace"]; exists { + t.Error("Expected internal_trace to be stripped from response") + } + if _, exists := response["namespace"]; exists { + t.Error("Expected namespace to be stripped from response") + } +} + +func TestForwardErrorResponse_NoErrorField(t *testing.T) { + gin.SetMode(gin.TestMode) + + // Backend returns JSON without an "error" field + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + + forwardErrorResponse(c, 500, []byte(`{"message": "some internal detail"}`)) + + var response map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &response) + + if response["error"] != "Request failed" { + t.Errorf("Expected generic error 'Request failed', got %v", response["error"]) + } + if _, exists := response["message"]; exists { + t.Error("Expected message to not be forwarded") + } +} + func TestTransformSession_TypeSafety(t *testing.T) { // Test that transformSession handles incorrect types gracefully tests := []struct { diff --git a/components/public-api/main.go b/components/public-api/main.go index fb20ac418..1aeb39938 100644 --- a/components/public-api/main.go +++ b/components/public-api/main.go @@ -96,6 +96,21 @@ func main() { v1.POST("/sessions", handlers.CreateSession) v1.GET("/sessions/:id", handlers.GetSession) v1.DELETE("/sessions/:id", handlers.DeleteSession) + + // Runs + v1.POST("/sessions/:id/runs", handlers.CreateRun) + v1.GET("/sessions/:id/runs", handlers.GetSessionRuns) + + // Messaging + v1.POST("/sessions/:id/message", handlers.SendMessage) + + // Output + v1.GET("/sessions/:id/output", handlers.GetSessionOutput) + + // Lifecycle + v1.POST("/sessions/:id/start", handlers.StartSession) + v1.POST("/sessions/:id/stop", handlers.StopSession) + v1.POST("/sessions/:id/interrupt", handlers.InterruptSession) } // Get port from environment or default to 8081 @@ -143,8 +158,8 @@ func getAllowedOrigins() []string { // Default: allow common development origins return []string{ - "http://localhost:3000", // Next.js dev server - "http://localhost:8080", // Frontend in kind + "http://localhost:3000", // Next.js dev server + "http://localhost:8080", // Frontend in kind "https://*.apps-crc.testing", // CRC routes } } diff --git a/components/public-api/observability/logging.go b/components/public-api/observability/logging.go index 03acda5f5..d424c5af1 100644 --- a/components/public-api/observability/logging.go +++ b/components/public-api/observability/logging.go @@ -1,3 +1,4 @@ +// Package observability provides structured logging and tracing for the public API. package observability import ( diff --git a/components/public-api/openapi.yaml b/components/public-api/openapi.yaml new file mode 100644 index 000000000..dd9833d82 --- /dev/null +++ b/components/public-api/openapi.yaml @@ -0,0 +1,797 @@ +openapi: 3.0.3 +info: + title: Ambient Code Public API + description: Simplified, versioned REST API gateway for the Ambient Code Platform. + version: 1.0.0 +servers: + - url: https://public-api-ambient-code.apps.okd1.timslab/v1 + description: Lab OKD cluster + - url: http://localhost:8081/v1 + description: Local development + +security: + - BearerAuth: [] + +paths: + /sessions: + get: + summary: List sessions + operationId: listSessions + parameters: + - $ref: '#/components/parameters/ProjectHeader' + responses: + '200': + description: List of sessions + content: + application/json: + schema: + $ref: '#/components/schemas/SessionListResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + post: + summary: Create a session + operationId: createSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateSessionRequest' + responses: + '201': + description: Session created + content: + application/json: + schema: + type: object + properties: + id: + type: string + example: session-abc123 + message: + type: string + example: Session created + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}: + get: + summary: Get session details + operationId: getSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: Session details + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + delete: + summary: Delete a session + operationId: deleteSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '204': + description: Session deleted + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/message: + post: + summary: Send a message to a session + description: > + Sends a user message to the session. The public API handles all + AG-UI protocol details (run ID generation, message envelope + construction, thread mapping). The resulting run executes + asynchronously; use GET /sessions/{id}/runs to track progress + and GET /sessions/{id}/output to retrieve results. + operationId: sendMessage + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SendMessageRequest' + responses: + '202': + description: Message accepted, run created + content: + application/json: + schema: + $ref: '#/components/schemas/SendMessageResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/output: + get: + summary: Get session output + description: > + Returns session output in one of three formats controlled by the + `format` query parameter. + + + - **transcript** (default) — Assembled conversation messages extracted + from MESSAGES_SNAPSHOT events. Each message has a role, full content, + and optional tool calls. This is the smallest and most useful format + for most consumers. + + - **compact** — AG-UI events with streaming deltas merged. All + TEXT_MESSAGE_CONTENT deltas for the same message are concatenated + into a single event, and likewise for TOOL_CALL_ARGS. Significantly + smaller than raw events while preserving full event structure. + + - **events** — Raw AG-UI events exactly as persisted. Includes every + individual streaming delta. Can be very large even for short sessions. + + + All formats support optional `run_id` filtering. + operationId: getSessionOutput + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + - name: format + in: query + required: false + description: Output format + schema: + type: string + enum: [transcript, compact, events] + default: transcript + - name: run_id + in: query + required: false + description: Filter output to a specific run (must be a valid UUID) + schema: + type: string + format: uuid + responses: + '200': + description: > + Session output. Response schema depends on the `format` parameter. + content: + application/json: + schema: + oneOf: + - $ref: '#/components/schemas/TranscriptOutputResponse' + - $ref: '#/components/schemas/EventsOutputResponse' + discriminator: + propertyName: format + mapping: + transcript: '#/components/schemas/TranscriptOutputResponse' + compact: '#/components/schemas/EventsOutputResponse' + events: '#/components/schemas/EventsOutputResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + description: Session or run not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/runs: + post: + summary: Create a run + description: > + Creates a new AG-UI run with full control over the run input. + The caller provides the complete RunAgentInput payload including + run ID, thread ID, and messages. For a simpler interface that + handles AG-UI details automatically, use POST /sessions/{id}/message. + operationId: createRun + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRunRequest' + responses: + '202': + description: Run accepted + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRunResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + get: + summary: List runs in a session + description: > + Returns a summary of all AG-UI runs in the session, including + status, timestamps, event counts, and the originating user message. + operationId: getSessionRuns + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: List of runs + content: + application/json: + schema: + $ref: '#/components/schemas/SessionRunsResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/start: + post: + summary: Start (resume) a session + description: > + Resumes a stopped or completed session. The session transitions + back to Running state and can accept new messages. Returns 422 + if the session is already running or pending, or 409 if the + session state is unknown. + operationId: startSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '202': + description: Start accepted + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '409': + description: Session state is unknown + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '422': + description: Session is already running or pending + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/stop: + post: + summary: Stop a session + description: > + Stops a running session. The session's pod is terminated and the + session transitions to a completed state. This is a session-level + lifecycle action — use interrupt instead to cancel only the + current run without killing the session. Returns 422 if the + session is already completed or failed, or 409 if the session + state is unknown. + operationId: stopSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '202': + description: Stop accepted + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '409': + description: Session state is unknown + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/interrupt: + post: + summary: Interrupt the current run + description: > + Sends an interrupt signal to cancel the agent's current execution + without killing the session. The session remains in Running state + and can accept new messages. Equivalent to the red "Stop" button + in the chat UI. + operationId: interruptSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: Interrupt signal sent + content: + application/json: + schema: + $ref: '#/components/schemas/MessageResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + +components: + securitySchemes: + BearerAuth: + type: http + scheme: bearer + description: OpenShift token or access key + + parameters: + ProjectHeader: + name: X-Ambient-Project + in: header + required: true + description: Target project/namespace + schema: + type: string + pattern: '^[a-z0-9][a-z0-9-]*[a-z0-9]$' + example: my-project + + SessionID: + name: id + in: path + required: true + description: Session ID (valid Kubernetes name) + schema: + type: string + pattern: '^[a-z0-9][a-z0-9-]*[a-z0-9]$' + example: session-abc123 + + schemas: + SessionResponse: + type: object + properties: + id: + type: string + status: + type: string + enum: [pending, running, completed, failed] + display_name: + type: string + task: + type: string + model: + type: string + repos: + type: array + description: Repositories configured for this session + items: + $ref: '#/components/schemas/SessionRepo' + created_at: + type: string + format: date-time + completed_at: + type: string + format: date-time + result: + type: string + error: + type: string + + SessionListResponse: + type: object + properties: + items: + type: array + items: + $ref: '#/components/schemas/SessionResponse' + total: + type: integer + + CreateSessionRequest: + type: object + required: [task] + properties: + task: + type: string + description: The initial prompt / task for the agent + display_name: + type: string + description: Human-readable name for the session + model: + type: string + repos: + type: array + items: + type: object + required: [url] + properties: + url: + type: string + branch: + type: string + activeWorkflow: + $ref: '#/components/schemas/WorkflowRef' + environmentVariables: + type: object + additionalProperties: + type: string + description: Environment variables to inject into the session pod + + WorkflowRef: + type: object + required: [gitUrl] + description: Identifies a workflow by git repository location + properties: + gitUrl: + type: string + description: Git clone URL of the repository containing the workflow + branch: + type: string + description: Branch to check out (defaults to main) + path: + type: string + description: Path within the repository to the workflow directory + + CreateRunRequest: + type: object + required: [messages] + properties: + run_id: + type: string + format: uuid + description: Caller-provided run ID. Generated server-side if omitted. + thread_id: + type: string + description: AG-UI thread ID. Defaults to the session ID if omitted. + messages: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/RunMessage' + + CreateRunResponse: + type: object + properties: + run_id: + type: string + format: uuid + thread_id: + type: string + + RunMessage: + type: object + required: [role, content] + properties: + id: + type: string + format: uuid + description: Message ID. Generated server-side if omitted. + role: + type: string + enum: [user, assistant, system] + content: + type: string + + SendMessageRequest: + type: object + required: [content] + properties: + content: + type: string + description: The user message to send to the agent + + SendMessageResponse: + type: object + properties: + run_id: + type: string + format: uuid + description: UUID identifying this run + thread_id: + type: string + description: Session ID (same as the path parameter) + + TranscriptOutputResponse: + type: object + description: > + Transcript format (default). Returns assembled conversation messages + extracted from MESSAGES_SNAPSHOT events emitted at the end of each run. + properties: + session_id: + type: string + format: + type: string + enum: [transcript] + messages: + type: array + description: Assembled conversation messages in chronological order + items: + $ref: '#/components/schemas/TranscriptMessage' + + TranscriptMessage: + type: object + description: > + A single assembled message from the conversation. Extracted from + MESSAGES_SNAPSHOT events which the runner emits at the end of each + run containing the full conversation up to that point. + properties: + id: + type: string + description: Message ID + role: + type: string + enum: [user, assistant, system, tool, developer] + content: + type: string + description: Full message content + tool_calls: + type: array + description: Tool calls made by the assistant (present when role is assistant) + items: + $ref: '#/components/schemas/TranscriptToolCall' + tool_call_id: + type: string + description: ID of the tool call this message is a response to (present when role is tool) + name: + type: string + description: Tool name (present when role is tool) + timestamp: + type: string + + TranscriptToolCall: + type: object + properties: + id: + type: string + name: + type: string + args: + type: string + description: JSON-encoded tool call arguments + result: + type: string + description: Tool call result (if completed) + status: + type: string + enum: [pending, running, completed, error] + duration: + type: integer + format: int64 + description: Execution time in milliseconds + + EventsOutputResponse: + type: object + description: > + Events format. Used for both `compact` and `events` formats. + In compact mode, streaming deltas (TEXT_MESSAGE_CONTENT, + TOOL_CALL_ARGS) are merged per message/tool call. In events + mode, raw events are returned as persisted. + properties: + session_id: + type: string + format: + type: string + enum: [compact, events] + events: + type: array + description: AG-UI protocol events in emission order + items: + $ref: '#/components/schemas/AGUIEvent' + + AGUIEvent: + type: object + description: > + A single AG-UI protocol event. The `type` field determines which + additional fields are present. All events carry `type` and `runId`; + other fields depend on the event type. + required: [type] + properties: + type: + type: string + enum: + - RUN_STARTED + - RUN_FINISHED + - RUN_ERROR + - TEXT_MESSAGE_START + - TEXT_MESSAGE_CONTENT + - TEXT_MESSAGE_END + - TOOL_CALL_START + - TOOL_CALL_ARGS + - TOOL_CALL_END + - STEP_STARTED + - STEP_FINISHED + - STATE_DELTA + - CUSTOM + description: AG-UI event type + runId: + type: string + format: uuid + description: Run this event belongs to + threadId: + type: string + description: Thread (session) this event belongs to + timestamp: + type: integer + format: int64 + description: Unix timestamp in milliseconds + messageId: + type: string + description: Message ID (present on TEXT_MESSAGE_* events) + role: + type: string + enum: [user, assistant, system] + description: Message role (present on TEXT_MESSAGE_START) + content: + type: string + description: Full message content (present on TEXT_MESSAGE_START for user messages) + delta: + type: string + description: > + Incremental content chunk (present on TEXT_MESSAGE_CONTENT, + TOOL_CALL_ARGS). In compact format, all deltas for the same + message/tool call are merged into a single event. + toolCallId: + type: string + description: Tool call ID (present on TOOL_CALL_* events) + toolCallName: + type: string + description: Tool name (present on TOOL_CALL_START) + stepName: + type: string + description: Step name (present on STEP_STARTED, STEP_FINISHED) + rawEvent: + type: object + description: Custom event payload (present on CUSTOM events, e.g. thinking blocks) + additionalProperties: true + + SessionRunsResponse: + type: object + properties: + session_id: + type: string + runs: + type: array + items: + $ref: '#/components/schemas/RunSummary' + + RunSummary: + type: object + properties: + run_id: + type: string + format: uuid + started_at: + type: integer + format: int64 + description: Unix timestamp in milliseconds + finished_at: + type: integer + format: int64 + description: Unix timestamp in milliseconds + status: + type: string + enum: [running, completed, error] + user_message: + type: string + description: The user message that initiated this run + event_count: + type: integer + description: Total number of AG-UI events in this run + + SessionRepo: + type: object + properties: + url: + type: string + branch: + type: string + + MessageResponse: + type: object + properties: + message: + type: string + example: Interrupt signal sent + + ErrorResponse: + type: object + properties: + error: + type: string + + responses: + BadRequest: + description: Invalid input + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + Unauthorized: + description: Missing or invalid authentication + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + NotFound: + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + BadGateway: + description: Backend unavailable + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' diff --git a/components/public-api/types/dto.go b/components/public-api/types/dto.go index 9faafa8d1..69b09b599 100644 --- a/components/public-api/types/dto.go +++ b/components/public-api/types/dto.go @@ -1,15 +1,18 @@ +// Package types defines data transfer objects for the public API. package types // SessionResponse is the simplified session response for the public API type SessionResponse struct { - ID string `json:"id"` - Status string `json:"status"` // "pending", "running", "completed", "failed" - Task string `json:"task"` - Model string `json:"model,omitempty"` - CreatedAt string `json:"createdAt"` - CompletedAt string `json:"completedAt,omitempty"` - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` + ID string `json:"id"` + Status string `json:"status"` // "pending", "running", "completed", "failed" + DisplayName string `json:"display_name,omitempty"` + Task string `json:"task"` + Model string `json:"model,omitempty"` + Repos []SessionRepo `json:"repos,omitempty"` + CreatedAt string `json:"created_at"` + CompletedAt string `json:"completed_at,omitempty"` + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` } // SessionListResponse is the response for listing sessions @@ -20,9 +23,19 @@ type SessionListResponse struct { // CreateSessionRequest is the request body for creating a session type CreateSessionRequest struct { - Task string `json:"task" binding:"required"` - Model string `json:"model,omitempty"` - Repos []Repo `json:"repos,omitempty"` + Task string `json:"task" binding:"required"` + DisplayName string `json:"display_name,omitempty"` + Model string `json:"model,omitempty"` + Repos []Repo `json:"repos,omitempty"` + ActiveWorkflow *WorkflowRef `json:"activeWorkflow,omitempty"` + EnvironmentVariables map[string]string `json:"environmentVariables,omitempty"` +} + +// WorkflowRef identifies a workflow by git repository location. +type WorkflowRef struct { + GitURL string `json:"gitUrl" binding:"required"` + Branch string `json:"branch,omitempty"` + Path string `json:"path,omitempty"` } // Repo represents a repository configuration @@ -31,8 +44,95 @@ type Repo struct { Branch string `json:"branch,omitempty"` } -// ErrorResponse is a standard error response -type ErrorResponse struct { - Error string `json:"error"` - Message string `json:"message,omitempty"` +// CreateRunRequest is the request body for creating an AG-UI run +type CreateRunRequest struct { + RunID string `json:"run_id,omitempty"` + ThreadID string `json:"thread_id,omitempty"` + Messages []RunMessage `json:"messages" binding:"required,min=1"` +} + +// RunMessage is a message in a run request +type RunMessage struct { + ID string `json:"id,omitempty"` + Role string `json:"role" binding:"required"` + Content string `json:"content" binding:"required"` +} + +// CreateRunResponse is the response for creating a run +type CreateRunResponse struct { + RunID string `json:"run_id"` + ThreadID string `json:"thread_id"` +} + +// SessionRunsResponse is the response for listing runs in a session +type SessionRunsResponse struct { + SessionID string `json:"session_id"` + Runs []RunSummary `json:"runs"` +} + +// RunSummary is a summary of a single run +type RunSummary struct { + RunID string `json:"run_id"` + StartedAt int64 `json:"started_at,omitempty"` + FinishedAt int64 `json:"finished_at,omitempty"` + Status string `json:"status"` + UserMessage string `json:"user_message,omitempty"` + EventCount int `json:"event_count"` +} + +// SendMessageRequest is the request body for sending a message to a session +type SendMessageRequest struct { + Content string `json:"content" binding:"required"` +} + +// SendMessageResponse is the response for sending a message +type SendMessageResponse struct { + RunID string `json:"run_id"` + ThreadID string `json:"thread_id"` +} + +// TranscriptOutputResponse is the response for transcript format output +type TranscriptOutputResponse struct { + SessionID string `json:"session_id"` + Format string `json:"format"` + Messages []TranscriptMessage `json:"messages"` +} + +// TranscriptMessage is a single message in transcript output +type TranscriptMessage struct { + ID string `json:"id,omitempty"` + Role string `json:"role"` + Content string `json:"content,omitempty"` + ToolCalls []TranscriptToolCall `json:"tool_calls,omitempty"` + ToolCallID string `json:"tool_call_id,omitempty"` + Name string `json:"name,omitempty"` + Timestamp string `json:"timestamp,omitempty"` +} + +// TranscriptToolCall is a tool call in a transcript message +type TranscriptToolCall struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Args string `json:"args,omitempty"` + Result string `json:"result,omitempty"` + Status string `json:"status,omitempty"` + Duration int64 `json:"duration,omitempty"` +} + +// EventsOutputResponse is the response for events/compact format output +type EventsOutputResponse struct { + SessionID string `json:"session_id"` + Format string `json:"format"` + Events []map[string]interface{} `json:"events"` +} + +// SessionRepo represents a repository configured for a session +type SessionRepo struct { + URL string `json:"url"` + Branch string `json:"branch,omitempty"` +} + +// MessageResponse is a simple message response +type MessageResponse struct { + Message string `json:"message"` } diff --git a/docs/internal/proposals/public-api-session-control.md b/docs/internal/proposals/public-api-session-control.md new file mode 100644 index 000000000..d7dd1f398 --- /dev/null +++ b/docs/internal/proposals/public-api-session-control.md @@ -0,0 +1,79 @@ +# Proposal: Public API Session Control Endpoints + +**Branch:** `feat/public-api-session-control` +**Date:** 2026-03-06 +**Status:** Draft +**Spec:** `components/public-api/openapi.yaml` + +--- + +## Summary + +Extend the public API from 4 session CRUD endpoints to a full session control surface. These endpoints let SDK/MCP clients manage the complete session lifecycle — send messages, retrieve output, start/stop sessions, and interrupt runs — without needing to understand the AG-UI protocol or construct complex request envelopes. + +All endpoints proxy to the existing Go backend. No new K8s operations are introduced in the public API. + +--- + +## Current State (Phase 1) + +The public API currently exposes only basic CRUD: + +| Method | Endpoint | Status | +|--------|----------|--------| +| GET | `/v1/sessions` | Implemented | +| POST | `/v1/sessions` | Implemented | +| GET | `/v1/sessions/{id}` | Implemented | +| DELETE | `/v1/sessions/{id}` | Implemented | + +--- + +## Proposed Endpoints + +### Messaging + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/v1/sessions/{id}/runs` | **Raw AG-UI run.** Accepts full `RunAgentInput` with caller-provided run/thread IDs and messages array. Direct proxy to backend `/agui/run`. | +| POST | `/v1/sessions/{id}/message` | **Simplified send.** Accepts `{"content": "..."}`, constructs the AG-UI `RunAgentInput` envelope server-side (generates run ID, thread ID, message ID). Proxies to backend `/agui/run`. | + +### Output Retrieval + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/v1/sessions/{id}/output` | Returns session output in one of three formats via `?format=` query param. | +| GET | `/v1/sessions/{id}/runs` | Lists all runs with status, timestamps, event counts, and originating user message. | + +**Output formats:** + +- **`transcript`** (default) — Assembled conversation messages from `MESSAGES_SNAPSHOT` events. Smallest, most useful for consumers. +- **`compact`** — AG-UI events with streaming deltas merged per message/tool call. Preserves event structure but significantly smaller than raw. +- **`events`** — Raw AG-UI events exactly as persisted. Can be very large. + +All formats support optional `?run_id=` filtering. + +### Lifecycle Control + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/v1/sessions/{id}/start` | Resume a stopped/completed session. Returns 422 if already running. | +| POST | `/v1/sessions/{id}/stop` | Stop a running session (kills pod). Returns 422 if already stopped. | +| POST | `/v1/sessions/{id}/interrupt` | Cancel the current run without killing the session. Equivalent to the red stop button in the UI. | + +--- + +## What This Enables + +Today, SDK/MCP clients can create sessions via the public API but must either: +- Hit the backend API directly for messaging and lifecycle control +- Construct AG-UI protocol envelopes manually + +With these additions, the public API becomes a complete session management interface. The `/message` endpoint is the key simplification — clients send plain text and the public API handles all AG-UI plumbing. + +--- + +## Open Question + +The backend API is externally routed on all current deployments (lab OKD, ROSA UAT). All proposed endpoints have direct backend equivalents (except `/message` envelope construction, transcript assembly, and run listing). If the backend remains externally accessible, clients can use it directly — making these public API additions a convenience layer rather than a necessity. + +The value depends on whether Phase 3 (backend internalization) from the [original proposal](acp-public-rest-api.md) proceeds. If the backend route is removed, these endpoints become required for external clients.