diff --git a/backend/src/dsl/compiler.ts b/backend/src/dsl/compiler.ts index 18369925..cbb3e98b 100644 --- a/backend/src/dsl/compiler.ts +++ b/backend/src/dsl/compiler.ts @@ -1,7 +1,7 @@ import { WorkflowGraphDto, WorkflowNodeDto } from '../workflows/dto/workflow-graph.dto'; // Ensure all worker components are registered before accessing the registry import '../../../worker/src/components'; -import { componentRegistry } from '@shipsec/component-sdk'; +import { componentRegistry, type ComponentPortMetadata } from '@shipsec/component-sdk'; import { extractPorts } from '@shipsec/component-sdk/zod-ports'; import { WorkflowAction, @@ -101,8 +101,12 @@ export function compileWorkflowGraph(graph: WorkflowGraphDto): WorkflowDefinitio const groupIdValue = config.groupId; const maxConcurrencyValue = config.maxConcurrency; + const mode = (config.mode as WorkflowNodeMetadata['mode']) ?? 'normal'; + const toolConfig = config.toolConfig as WorkflowNodeMetadata['toolConfig']; + nodesMetadata[node.id] = { ref: node.id, + mode, label: node.data?.label, joinStrategy, streamId: @@ -113,6 +117,7 @@ export function compileWorkflowGraph(graph: WorkflowGraphDto): WorkflowDefinitio typeof maxConcurrencyValue === 'number' && Number.isFinite(maxConcurrencyValue) ? maxConcurrencyValue : undefined, + toolConfig, }; } @@ -148,7 +153,8 @@ export function compileWorkflowGraph(graph: WorkflowGraphDto): WorkflowDefinitio const params: Record = { ...rawParams }; const inputOverrides: Record = { ...rawInputOverrides }; - let inputs = componentRegistry.getMetadata(node.type)?.inputs ?? []; + let inputs: ComponentPortMetadata[] = + (componentRegistry.getMetadata(node.type)?.inputs as ComponentPortMetadata[]) ?? []; if (component?.resolvePorts) { try { const resolved = component.resolvePorts(params); diff --git a/backend/src/dsl/types.ts b/backend/src/dsl/types.ts index 6fbf235a..8f7409a5 100644 --- a/backend/src/dsl/types.ts +++ b/backend/src/dsl/types.ts @@ -55,6 +55,13 @@ export const WorkflowNodeMetadataSchema = z.object({ maxConcurrency: z.number().int().positive().optional(), groupId: z.string().optional(), streamId: z.string().optional(), + mode: z.enum(['normal', 'tool']).default('normal'), + toolConfig: z + .object({ + boundInputIds: z.array(z.string()).default([]), + exposedInputIds: z.array(z.string()).default([]), + }) + .optional(), }); export type WorkflowNodeMetadata = z.infer; diff --git a/backend/src/mcp/__tests__/mcp-internal.integration.spec.ts b/backend/src/mcp/__tests__/mcp-internal.integration.spec.ts new file mode 100644 index 00000000..10e1cd83 --- /dev/null +++ b/backend/src/mcp/__tests__/mcp-internal.integration.spec.ts @@ -0,0 +1,94 @@ +import { describe, it, expect, beforeAll, afterAll } from 'bun:test'; +import { Test, TestingModule } from '@nestjs/testing'; +import { INestApplication } from '@nestjs/common'; +import request from 'supertest'; +import { McpModule } from '../mcp.module'; +import { TOOL_REGISTRY_REDIS } from '../tool-registry.service'; +import { Pool } from 'pg'; + +// Simple Mock Redis +class MockRedis { + data = new Map>(); + async hset(key: string, field: string, value: string) { + if (!this.data.has(key)) this.data.set(key, new Map()); + this.data.get(key)!.set(field, value); + return 1; + } + async hget(key: string, field: string) { + return this.data.get(key)?.get(field) || null; + } + async quit() {} +} + +describe('MCP Internal API (Integration)', () => { + let app: INestApplication; + let redis: MockRedis; + const INTERNAL_TOKEN = 'test-internal-token'; + + beforeAll(async () => { + process.env.INTERNAL_SERVICE_TOKEN = INTERNAL_TOKEN; + process.env.NODE_ENV = 'test'; + process.env.SKIP_INGEST_SERVICES = 'true'; + process.env.SHIPSEC_SKIP_MIGRATION_CHECK = 'true'; + + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [McpModule], + }) + .overrideProvider(Pool) + .useValue({ + connect: async () => ({ + query: async () => ({ rows: [] }), + release: () => {}, + }), + on: () => {}, + }) + .overrideProvider(TOOL_REGISTRY_REDIS) + .useValue(new MockRedis()) + .compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + + redis = moduleFixture.get(TOOL_REGISTRY_REDIS); + }); + + afterAll(async () => { + await app.close(); + }); + + it('registers a component tool via internal API', async () => { + const payload = { + runId: 'run-test-1', + nodeId: 'node-test-1', + toolName: 'test_tool', + componentId: 'core.test', + description: 'Test Tool', + inputSchema: { type: 'object', properties: {} }, + credentials: { apiKey: 'secret' }, + }; + + const response = await request(app.getHttpServer()) + .post('/internal/mcp/register-component') + .set('x-internal-token', INTERNAL_TOKEN) + .send(payload); + + expect(response.status).toBe(201); + expect(response.body).toEqual({ success: true }); + + // Verify it's in Redis + const toolJson = await redis.hget('mcp:run:run-test-1:tools', 'node-test-1'); + expect(toolJson).not.toBeNull(); + const tool = JSON.parse(toolJson!); + expect(tool.toolName).toBe('test_tool'); + expect(tool.status).toBe('ready'); + }); + + it('rejects identity-less internal requests', async () => { + const response = await request(app.getHttpServer()) + .post('/internal/mcp/register-component') + .send({}); + + // Should be caught by global AuthGuard + expect(response.status).toBe(403); + }); +}); diff --git a/backend/src/mcp/dto/mcp.dto.ts b/backend/src/mcp/dto/mcp.dto.ts new file mode 100644 index 00000000..a01cdd0f --- /dev/null +++ b/backend/src/mcp/dto/mcp.dto.ts @@ -0,0 +1,40 @@ +import { ToolInputSchema } from '@shipsec/component-sdk'; + +/** + * Input for registering a component tool + */ +export class RegisterComponentToolInput { + runId!: string; + nodeId!: string; + toolName!: string; + componentId!: string; + description!: string; + inputSchema!: ToolInputSchema; + credentials!: Record; +} + +/** + * Input for registering a remote MCP + */ +export class RegisterRemoteMcpInput { + runId!: string; + nodeId!: string; + toolName!: string; + description!: string; + inputSchema!: ToolInputSchema; + endpoint!: string; + authToken?: string; +} + +/** + * Input for registering a local MCP (stdio container) + */ +export class RegisterLocalMcpInput { + runId!: string; + nodeId!: string; + toolName!: string; + description!: string; + inputSchema!: ToolInputSchema; + endpoint!: string; + containerId!: string; +} diff --git a/backend/src/mcp/internal-mcp.controller.ts b/backend/src/mcp/internal-mcp.controller.ts new file mode 100644 index 00000000..ed3604bf --- /dev/null +++ b/backend/src/mcp/internal-mcp.controller.ts @@ -0,0 +1,30 @@ +import { Body, Controller, Post } from '@nestjs/common'; +import { ToolRegistryService } from './tool-registry.service'; +import { + RegisterComponentToolInput, + RegisterLocalMcpInput, + RegisterRemoteMcpInput, +} from './dto/mcp.dto'; + +@Controller('internal/mcp') +export class InternalMcpController { + constructor(private readonly toolRegistry: ToolRegistryService) {} + + @Post('register-component') + async registerComponent(@Body() body: RegisterComponentToolInput) { + await this.toolRegistry.registerComponentTool(body); + return { success: true }; + } + + @Post('register-remote') + async registerRemote(@Body() body: RegisterRemoteMcpInput) { + await this.toolRegistry.registerRemoteMcp(body); + return { success: true }; + } + + @Post('register-local') + async registerLocal(@Body() body: RegisterLocalMcpInput) { + await this.toolRegistry.registerLocalMcp(body); + return { success: true }; + } +} diff --git a/backend/src/mcp/mcp.module.ts b/backend/src/mcp/mcp.module.ts index 026b38f3..8a5659f1 100644 --- a/backend/src/mcp/mcp.module.ts +++ b/backend/src/mcp/mcp.module.ts @@ -2,10 +2,12 @@ import { Global, Module } from '@nestjs/common'; import Redis from 'ioredis'; import { ToolRegistryService, TOOL_REGISTRY_REDIS } from './tool-registry.service'; import { SecretsModule } from '../secrets/secrets.module'; +import { InternalMcpController } from './internal-mcp.controller'; @Global() @Module({ imports: [SecretsModule], + controllers: [InternalMcpController], providers: [ { provide: TOOL_REGISTRY_REDIS, diff --git a/backend/src/mcp/tool-registry.service.ts b/backend/src/mcp/tool-registry.service.ts index 5a518d7b..61b4f857 100644 --- a/backend/src/mcp/tool-registry.service.ts +++ b/backend/src/mcp/tool-registry.service.ts @@ -13,6 +13,11 @@ import { Injectable, Logger, Inject, OnModuleDestroy } from '@nestjs/common'; import type Redis from 'ioredis'; import { type ToolInputSchema } from '@shipsec/component-sdk'; import { SecretsEncryptionService } from '../secrets/secrets.encryption'; +import { + RegisterComponentToolInput, + RegisterLocalMcpInput, + RegisterRemoteMcpInput, +} from './dto/mcp.dto'; export const TOOL_REGISTRY_REDIS = Symbol('TOOL_REGISTRY_REDIS'); @@ -67,45 +72,6 @@ export interface RegisteredTool { registeredAt: string; } -/** - * Input for registering a component tool - */ -export interface RegisterComponentToolInput { - runId: string; - nodeId: string; - toolName: string; - componentId: string; - description: string; - inputSchema: ToolInputSchema; - credentials: Record; -} - -/** - * Input for registering a remote MCP - */ -export interface RegisterRemoteMcpInput { - runId: string; - nodeId: string; - toolName: string; - description: string; - inputSchema: ToolInputSchema; - endpoint: string; - authToken?: string; -} - -/** - * Input for registering a local MCP (stdio container) - */ -export interface RegisterLocalMcpInput { - runId: string; - nodeId: string; - toolName: string; - description: string; - inputSchema: ToolInputSchema; - endpoint: string; - containerId: string; -} - const REGISTRY_TTL_SECONDS = 60 * 60; // 1 hour @Injectable() diff --git a/e2e-tests/mcp-tool-mode.test.ts b/e2e-tests/mcp-tool-mode.test.ts new file mode 100644 index 00000000..8cd8b880 --- /dev/null +++ b/e2e-tests/mcp-tool-mode.test.ts @@ -0,0 +1,131 @@ +/** + * E2E Tests - MCP Tool Mode + * + * Validates that an MCP server can be started in Docker, registered in the tool registry, + * and cleaned up properly. + */ + +import { describe, test, expect, beforeAll, afterAll } from 'bun:test'; + +const API_BASE = 'http://127.0.0.1:3211/api/v1'; +const HEADERS = { + 'Content-Type': 'application/json', + 'x-internal-token': 'local-internal-token', +}; + +const runE2E = process.env.RUN_E2E === 'true'; + +// Helper function to poll workflow run status +async function pollRunStatus(runId: string, timeoutMs = 60000): Promise<{ status: string }> { + const startTime = Date.now(); + while (Date.now() - startTime < timeoutMs) { + const res = await fetch(`${API_BASE}/workflows/runs/${runId}/status`, { headers: HEADERS }); + const s = await res.json(); + if (['COMPLETED', 'FAILED', 'CANCELLED'].includes(s.status)) return s; + await new Promise(resolve => setTimeout(resolve, 1000)); + } + throw new Error(`Workflow run ${runId} timed out`); +} + +async function createWorkflow(workflow: any): Promise { + const res = await fetch(`${API_BASE}/workflows`, { + method: 'POST', + headers: HEADERS, + body: JSON.stringify(workflow), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`Failed to create workflow: ${res.status} ${text}`); + } + const { id } = await res.json(); + return id; +} + +async function runWorkflow(workflowId: string): Promise { + const res = await fetch(`${API_BASE}/workflows/${workflowId}/run`, { + method: 'POST', + headers: HEADERS, + body: JSON.stringify({ inputs: {} }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`Failed to run workflow: ${res.status} ${text}`); + } + const { runId } = await res.json(); + return runId; +} + +const e2eDescribe = runE2E ? describe : describe.skip; + +e2eDescribe('MCP Tool Mode E2E', () => { + + test('starts an MCP server in Docker and registers it', async () => { + // We use a simple alpine image as a mock MCP server that just stays alive + // In a real scenario, this would be mcp/server-everything or similar. + const workflow = { + name: 'Test: MCP Docker Registration', + nodes: [ + { + id: 'start', + type: 'core.workflow.entrypoint', + position: { x: 0, y: 0 }, + data: { label: 'Start', config: { params: { runtimeInputs: [] } } }, + }, + { + id: 'mcp', + type: 'core.mcp.server', + // Set tool mode + mode: 'tool', + position: { x: 200, y: 0 }, + data: { + label: 'MCP Server', + config: { + params: { + image: 'alpine', + command: ['sh', '-c', 'sleep 3600'], // Just stay alive + port: 8080, + }, + }, + }, + }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'mcp' }, + ], + }; + + const workflowId = await createWorkflow(workflow); + const runId = await runWorkflow(workflowId); + + const result = await pollRunStatus(runId); + expect(result.status).toBe('COMPLETED'); + + // Verify registration in backend internal API (or check Redis if we had access) + // We can use the internal health/debug endpoint if it exists, + // but for now we'll check if the trace event has the registration info. + const traceRes = await fetch(`${API_BASE}/workflows/runs/${runId}/trace`, { headers: HEADERS }); + const trace = await traceRes.json(); + + // Check for COMPLETED (mapped from NODE_COMPLETED) event for 'mcp' node + console.log(' [Debug] Fetched trace events:', trace.events.map((e: any) => `${e.nodeId}:${e.type}`)); + const mcpEvent = trace.events.find((e: any) => e.nodeId === 'mcp' && e.type === 'COMPLETED'); + expect(mcpEvent).toBeDefined(); + + if (mcpEvent) { + console.log(' [Debug] MCP Node Output:', JSON.stringify(mcpEvent.outputSummary, null, 2)); + expect(mcpEvent.outputSummary.endpoint).toBeDefined(); + expect(mcpEvent.outputSummary.containerId).toBeDefined(); + } + + // Cleanup: Kill the container after the test + const { execSync } = require('child_process'); + try { + console.log(` [Cleanup] Killing container for run ${runId}...`); + execSync(`docker rm -f $(docker ps -aq --filter "label=shipsec.runId=${runId}")`, { stdio: 'inherit' }); + console.log(' [Cleanup] Done.'); + } catch (e: any) { + console.warn(' [Cleanup] Failed to kill container (it might have already been removed):', e.message); + } + }, 120000); + +}); diff --git a/packages/component-sdk/src/port-meta.ts b/packages/component-sdk/src/port-meta.ts index 851f004a..a905837f 100644 --- a/packages/component-sdk/src/port-meta.ts +++ b/packages/component-sdk/src/port-meta.ts @@ -35,6 +35,8 @@ export interface PortMeta { schemaName?: string; /** Mark this schema as a credential type */ isCredential?: boolean; + /** True if this port should be hidden from the UI (defaults to false) */ + hidden?: boolean; } const METADATA_STORE = new WeakMap(); diff --git a/packages/component-sdk/src/runner.ts b/packages/component-sdk/src/runner.ts index 6a98c38e..00079543 100644 --- a/packages/component-sdk/src/runner.ts +++ b/packages/component-sdk/src/runner.ts @@ -114,6 +114,8 @@ async function runComponentInDocker( '--rm', '-i', '--network', network, + '--label', `shipsec.runId=${context.runId}`, + '--label', `shipsec.nodeRef=${context.componentRef}`, // Mount the directory containing both input and output '-v', `${outputDir}:${CONTAINER_OUTPUT_PATH}`, ]; @@ -130,6 +132,12 @@ async function runComponentInDocker( } } + if (runner.ports) { + for (const [hostPort, containerPort] of Object.entries(runner.ports)) { + dockerArgs.push('-p', `${hostPort}:${containerPort}`); + } + } + for (const [key, value] of Object.entries(env)) { dockerArgs.push('-e', `${key}=${value}`); } @@ -147,7 +155,27 @@ async function runComponentInDocker( const useTerminal = Boolean(context.terminalCollector); let capturedStdout = ''; - + + if (runner.detached) { + // For detached mode, we use -d instead of -i and return the container ID + const detachedArgs = dockerArgs.map(arg => arg === '-i' ? '-d' : arg); + if (!detachedArgs.includes('-d')) { + detachedArgs.splice(1, 0, '-d'); + } + + // In detached mode, we don't want --rm because we want the container to persist for the registry + const persistentArgs = detachedArgs.filter(arg => arg !== '--rm'); + + capturedStdout = await runDockerWithStandardIO(persistentArgs, params, context, timeoutSeconds, runner.stdinJson, true); + + // In detached mode, we return the container ID as part of a specialized output + return { + containerId: capturedStdout.trim(), + status: 'running', + endpoint: env.ENDPOINT || `http://localhost:${env.PORT || 8080}` + } as unknown as O; + } + if (useTerminal) { // Remove -i flag for PTY mode (stdin not needed with TTY) const argsWithoutStdin = dockerArgs.filter(arg => arg !== '-i'); @@ -180,7 +208,7 @@ async function runComponentInDocker( * @param context Execution context for logging */ async function readOutputFromFile( - filePath: string, + filePath: string, stdout: string, context: ExecutionContext ): Promise { @@ -208,7 +236,7 @@ async function readOutputFromFile( // This allows components that just write to stdout to continue working. if (stdout.trim().length > 0) { context.logger.info(`[Docker] No output file found, using stdout fallback (${stdout.length} bytes)`); - + // Try to parse stdout as JSON try { const output = JSON.parse(stdout.trim()); @@ -236,6 +264,7 @@ async function runDockerWithStandardIO( context: ExecutionContext, timeoutSeconds: number, stdinJson?: boolean, + detached?: boolean, ): Promise { const dockerPath = await resolveDockerPath(context); return new Promise((resolve, reject) => { @@ -263,7 +292,7 @@ async function runDockerWithStandardIO( stdoutEmitter(data); const chunk = data.toString(); stdout += chunk; // Capture for fallback - + // Send to log collector (which has chunking support) const logEntry = { runId: context.runId, @@ -274,7 +303,7 @@ async function runDockerWithStandardIO( timestamp: new Date().toISOString(), }; context.logCollector?.(logEntry); - + // NOTE: We intentionally do NOT emit stdout as trace progress events. // Output data is written to /shipsec-output/result.json by the container. // Stdout should only contain logs and progress messages from the component. @@ -337,7 +366,7 @@ async function runDockerWithStandardIO( context.logger.info(`[Docker] Completed successfully`); context.emitProgress('Docker container completed'); - + // Return captured stdout for fallback processing resolve(stdout); }); @@ -411,7 +440,7 @@ async function runDockerWithPty( code: error.code, } : String(error) }; - + console.log('diag', diag); context.logger.warn( `[Docker][PTY] Failed to spawn PTY: ${error instanceof Error ? error.message : String(error)}. Diagnostic: ${JSON.stringify(diag)}`, diff --git a/packages/component-sdk/src/types.ts b/packages/component-sdk/src/types.ts index 7161228d..459bdcd5 100644 --- a/packages/component-sdk/src/types.ts +++ b/packages/component-sdk/src/types.ts @@ -36,6 +36,8 @@ export interface DockerRunnerConfig { }>; // Optional volume mounts timeoutSeconds?: number; stdinJson?: boolean; // Whether to write params as JSON to container's stdin (default: true) + detached?: boolean; // If true, start container and return immediately without waiting for exit + ports?: Record; // Port mapping host -> container } @@ -251,6 +253,8 @@ export interface ComponentPortMetadata { isBranching?: boolean; /** Custom color for branching ports: 'green' | 'red' | 'amber' | 'blue' | 'purple' | 'slate' */ branchColor?: 'green' | 'red' | 'amber' | 'blue' | 'purple' | 'slate'; + /** True if this port should be hidden from the UI */ + hidden?: boolean; } export type ComponentParameterType = diff --git a/packages/component-sdk/src/zod-ports.ts b/packages/component-sdk/src/zod-ports.ts index f37d567c..edec932c 100644 --- a/packages/component-sdk/src/zod-ports.ts +++ b/packages/component-sdk/src/zod-ports.ts @@ -14,7 +14,7 @@ export interface ValidationResult { error?: string; } -type ZodDef = { type?: string; typeName?: string; [key: string]: any }; +type ZodDef = { type?: string; typeName?: string;[key: string]: any }; const LEGACY_TYPE_MAP: Record = { ZodString: 'string', @@ -131,6 +131,7 @@ export function extractPorts( valuePriority: metadata.valuePriority, isBranching: metadata.isBranching, branchColor: metadata.branchColor, + hidden: metadata.hidden, }); } diff --git a/worker/src/components/core/mcp-server.ts b/worker/src/components/core/mcp-server.ts new file mode 100644 index 00000000..23e850d5 --- /dev/null +++ b/worker/src/components/core/mcp-server.ts @@ -0,0 +1,120 @@ +import { z } from 'zod'; +import { + componentRegistry, + defineComponent, + inputs, + outputs, + parameters, + param, + port, + runComponentWithRunner, +} from '@shipsec/component-sdk'; + +const inputSchema = inputs({}); + +const outputSchema = outputs({ + endpoint: port(z.string().describe('The URL of the MCP server'), { label: 'Endpoint' }), + containerId: port(z.string().optional().describe('The Docker container ID'), { + label: 'Container ID', + hidden: true, + }), +}); + +const parameterSchema = parameters({ + image: param(z.string().describe('Docker image for the MCP server'), { + label: 'Docker Image', + editor: 'text', + placeholder: 'mcp/myserver:latest', + }), + command: param(z.array(z.string()).default([]).describe('Entrypoint command'), { + label: 'Command', + editor: 'variable-list', + }), + args: param(z.array(z.string()).default([]).describe('Arguments for the command'), { + label: 'Arguments', + editor: 'variable-list', + }), + env: param(z.record(z.string(), z.string()).default({}).describe('Environment variables'), { + label: 'Environment Variables', + editor: 'json', + }), + port: param(z.number().default(8080).describe('Internal port the server listens on'), { + label: 'Port', + editor: 'number', + }), +}); + +const definition = defineComponent({ + id: 'core.mcp.server', + label: 'MCP Server', + category: 'it_ops', + // The runner configuration here is a placeholder. + // The actual runner config is constructed dynamically in the execute method + // because `this.runner` is not interpolated when used directly in `execute`. + runner: { + kind: 'docker', + image: 'placeholder', + command: [], + detached: true, + }, + inputs: inputSchema, + outputs: outputSchema, + parameters: parameterSchema, + docs: 'Starts an external MCP server in a Docker container and registers it as a tool source.', + ui: { + slug: 'mcp-server', + version: '1.0.0', + type: 'process', + category: 'it_ops', + description: 'Run an external Model Context Protocol (MCP) server.', + icon: 'Server', + author: { + name: 'ShipSecAI', + type: 'shipsecai', + }, + isLatest: true, + }, + async execute({ params }, context) { + let containerId: string | undefined; + const serverPort = params.port || 8080; // Determine the port once + + if (params.image) { + // Manually construct runner config to resolve parameters, + // as `this.runner` is not interpolated when used directly in `execute`. + const runnerConfig = { + kind: 'docker' as const, // Explicitly type as 'docker' literal + image: params.image, + // Combine command and args into a single array for the Docker command + command: [...(params.command || []), ...(params.args || [])], + env: params.env, + detached: true, + // Map the internal server port to the same host port + ports: { [serverPort]: serverPort }, + }; + + // For local docker MCP servers, we start the container using the runner. + const result = await runComponentWithRunner( + runnerConfig, // Pass the dynamically constructed runner config + async () => ({}), + params, + context, + ); + + if (result && typeof result === 'object' && 'containerId' in result) { + containerId = (result as any).containerId; + } + } + + const port = params.port || 8080; + return { + endpoint: `http://localhost:${port}`, + containerId, + }; + }, +}); + +componentRegistry.register(definition); + +export type McpServerInput = typeof inputSchema; +export type McpServerParams = typeof parameterSchema; +export type McpServerOutput = typeof outputSchema; diff --git a/worker/src/components/index.ts b/worker/src/components/index.ts index 00139859..d43eb5d2 100644 --- a/worker/src/components/index.ts +++ b/worker/src/components/index.ts @@ -27,6 +27,7 @@ import './core/destination-artifact'; import './core/destination-s3'; import './core/text-block'; import './core/workflow-call'; +import './core/mcp-server'; // Manual Action components import './manual-action/manual-approval'; import './manual-action/manual-selection'; diff --git a/worker/src/temporal/activities/mcp.activity.ts b/worker/src/temporal/activities/mcp.activity.ts new file mode 100644 index 00000000..85cfa25a --- /dev/null +++ b/worker/src/temporal/activities/mcp.activity.ts @@ -0,0 +1,116 @@ +import { + componentRegistry, + ConfigurationError, + getCredentialInputIds, + getToolMetadata, + ServiceError, +} from '@shipsec/component-sdk'; +import { + RegisterComponentToolActivityInput, + RegisterLocalMcpActivityInput, + RegisterRemoteMcpActivityInput, +} from '../types'; + +const DEFAULT_API_BASE_URL = + process.env.STUDIO_API_BASE_URL ?? + process.env.SHIPSEC_API_BASE_URL ?? + process.env.API_BASE_URL ?? + 'http://localhost:3211'; + +function normalizeBaseUrl(url: string): string { + return url.endsWith('/') ? url.slice(0, -1) : url; +} + +async function callInternalApi(path: string, body: any) { + const internalToken = process.env.INTERNAL_SERVICE_TOKEN; + if (!internalToken) { + throw new ConfigurationError( + 'INTERNAL_SERVICE_TOKEN env var must be set to call internal MCP registry', + { + configKey: 'INTERNAL_SERVICE_TOKEN', + }, + ); + } + + const baseUrl = normalizeBaseUrl(DEFAULT_API_BASE_URL); + const response = await fetch(`${baseUrl}/internal/mcp/${path}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Internal-Token': internalToken, + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const raw = await response.text().catch(() => ''); + throw new ServiceError(`Failed to call internal MCP registry (${path}): ${raw}`, { + statusCode: response.status, + details: { statusText: response.statusText }, + }); + } + + return response.json(); +} + +export async function registerComponentToolActivity( + input: RegisterComponentToolActivityInput, +): Promise { + await callInternalApi('register-component', input); +} + +export async function registerRemoteMcpActivity( + input: RegisterRemoteMcpActivityInput, +): Promise { + await callInternalApi('register-remote', input); +} + +export async function registerLocalMcpActivity( + input: RegisterLocalMcpActivityInput, +): Promise { + const port = input.port || 8080; + // Use provided endpoint/containerId or fall back to defaults + const endpoint = input.endpoint || `http://localhost:${port}`; + const containerId = input.containerId || `docker-${input.image.replace(/[^a-zA-Z0-9]/g, '-')}`; + + await callInternalApi('register-local', { + ...input, + endpoint, + containerId, + }); +} + +export async function prepareAndRegisterToolActivity(input: { + runId: string; + nodeId: string; + componentId: string; + inputs: Record; + params: Record; +}): Promise { + const component = componentRegistry.get(input.componentId); + if (!component) { + throw new ServiceError(`Component ${input.componentId} not found`); + } + + const metadata = getToolMetadata(component); + const credentialIds = getCredentialInputIds(component); + + // Extract credentials from inputs/params + const allInputs = { ...input.inputs, ...input.params }; + const credentials: Record = {}; + for (const id of credentialIds) { + if (id in allInputs) { + credentials[id] = allInputs[id]; + } + } + + await callInternalApi('register-component', { + runId: input.runId, + nodeId: input.nodeId, + toolName: metadata.name, + componentId: input.componentId, + description: metadata.description, + inputSchema: metadata.inputSchema, + credentials, + }); +} diff --git a/worker/src/temporal/types.ts b/worker/src/temporal/types.ts index 3a17c016..11743289 100644 --- a/worker/src/temporal/types.ts +++ b/worker/src/temporal/types.ts @@ -40,6 +40,11 @@ export interface WorkflowNodeMetadata { maxConcurrency?: number; groupId?: string; streamId?: string; + mode?: 'normal' | 'tool'; + toolConfig?: { + boundInputIds: string[]; + exposedInputIds: string[]; + }; } export interface WorkflowFailureMetadata { @@ -173,3 +178,48 @@ export interface PrepareRunPayloadActivityInput { parentRunId?: string; parentNodeRef?: string; } + +// MCP Activity types + +export interface RegisterComponentToolActivityInput { + runId: string; + nodeId: string; + toolName: string; + componentId: string; + description: string; + inputSchema: any; + credentials: Record; +} + +export interface RegisterRemoteMcpActivityInput { + runId: string; + nodeId: string; + toolName: string; + description: string; + inputSchema: any; + endpoint: string; + authToken?: string; +} + +export interface RegisterLocalMcpActivityInput { + runId: string; + nodeId: string; + toolName: string; + description: string; + inputSchema: any; + image: string; + command?: string; + args?: string; + env?: Record; + port: number; + endpoint: string; + containerId: string; +} + +export interface PrepareAndRegisterToolActivityInput { + runId: string; + nodeId: string; + componentId: string; + inputs: Record; + params: Record; +} diff --git a/worker/src/temporal/workers/dev.worker.ts b/worker/src/temporal/workers/dev.worker.ts index 92f5f23a..740111fe 100644 --- a/worker/src/temporal/workers/dev.worker.ts +++ b/worker/src/temporal/workers/dev.worker.ts @@ -25,6 +25,12 @@ import { } from '../activities/human-input.activity'; import { prepareRunPayloadActivity } from '../activities/run-dispatcher.activity'; import { recordTraceEventActivity, initializeTraceActivity } from '../activities/trace.activity'; +import { + registerComponentToolActivity, + registerLocalMcpActivity, + registerRemoteMcpActivity, + prepareAndRegisterToolActivity, +} from '../activities/mcp.activity'; // ... (existing imports) @@ -211,6 +217,9 @@ async function main() { createHumanInputRequestActivity, cancelHumanInputRequestActivity, recordTraceEventActivity, + registerComponentToolActivity, + registerLocalMcpActivity, + registerRemoteMcpActivity, }).join(', ')}`, ); @@ -243,6 +252,10 @@ async function main() { cancelHumanInputRequestActivity, expireHumanInputRequestActivity, recordTraceEventActivity, + registerComponentToolActivity, + registerLocalMcpActivity, + registerRemoteMcpActivity, + prepareAndRegisterToolActivity, }, bundlerOptions: { ignoreModules: ['child_process'], diff --git a/worker/src/temporal/workflows/index.ts b/worker/src/temporal/workflows/index.ts index bca583bf..b92d124a 100644 --- a/worker/src/temporal/workflows/index.ts +++ b/worker/src/temporal/workflows/index.ts @@ -20,6 +20,10 @@ import type { RunWorkflowActivityOutput, WorkflowAction, PrepareRunPayloadActivityInput, + RegisterComponentToolActivityInput, + RegisterLocalMcpActivityInput, + RegisterRemoteMcpActivityInput, + PrepareAndRegisterToolActivityInput, } from '../types'; const { @@ -28,6 +32,9 @@ const { finalizeRunActivity, createHumanInputRequestActivity, expireHumanInputRequestActivity, + registerLocalMcpActivity, + registerRemoteMcpActivity, + prepareAndRegisterToolActivity, } = proxyActivities<{ runComponentActivity(input: RunComponentActivityInput): Promise; setRunMetadataActivity(input: { @@ -53,6 +60,10 @@ const { resolveUrl: string; }>; expireHumanInputRequestActivity(requestId: string): Promise; + registerComponentToolActivity(input: RegisterComponentToolActivityInput): Promise; + registerLocalMcpActivity(input: RegisterLocalMcpActivityInput): Promise; + registerRemoteMcpActivity(input: RegisterRemoteMcpActivityInput): Promise; + prepareAndRegisterToolActivity(input: PrepareAndRegisterToolActivityInput): Promise; }>({ startToCloseTimeout: '10 minutes', }); @@ -519,6 +530,62 @@ export async function shipsecWorkflowRun( const retryOptions = mapRetryPolicy(action.retryPolicy); + const isToolMode = nodeMetadata?.mode === 'tool'; + + if (isToolMode) { + if (action.componentId === 'core.mcp.server') { + const { runComponentActivity: runMcp } = proxyActivities<{ + runComponentActivity( + input: RunComponentActivityInput, + ): Promise; + }>({ + startToCloseTimeout: '10 minutes', + retry: retryOptions, + }); + + const mcpOutput = await runMcp(activityInput); + const output = mcpOutput.output as any; + const endpoint = output.endpoint; + const containerId = output.containerId; + + if (mergedParams.type === 'docker' || mergedParams.image) { + await registerLocalMcpActivity({ + runId: input.runId, + nodeId: action.ref, + toolName: + (mergedParams.image as string)?.split('/').pop()?.split(':')[0] || 'mcp_server', + description: `Local MCP Server (${mergedParams.image})`, + inputSchema: {}, + image: mergedParams.image as string, + port: (mergedParams.port as number) || 8080, + endpoint, + containerId, + }); + } else { + await registerRemoteMcpActivity({ + runId: input.runId, + nodeId: action.ref, + toolName: 'remote_mcp', + description: `Remote MCP Server (${endpoint})`, + inputSchema: {}, + endpoint: endpoint as string, + authToken: mergedParams.authToken as string, + }); + } + } else { + await prepareAndRegisterToolActivity({ + runId: input.runId, + nodeId: action.ref, + componentId: action.componentId, + inputs: mergedInputs, + params: mergedParams, + }); + } + + // Tool registration is successful, proceed as a successful node execution + return { activePorts: ['default'] }; + } + const { runComponentActivity: runComponentWithRetry } = proxyActivities<{ runComponentActivity( input: RunComponentActivityInput,