diff --git a/cog-fixture/cog.yaml b/cog-fixture/cog.yaml new file mode 100644 index 00000000..6dfee2d9 --- /dev/null +++ b/cog-fixture/cog.yaml @@ -0,0 +1,4 @@ +# Example cog.yaml for JavaScript/TypeScript predictor +predict: "./example_predictor.ts:simplePredictFunction" +concurrency: + max: 2 \ No newline at end of file diff --git a/cog-fixture/example_predictor.ts b/cog-fixture/example_predictor.ts new file mode 100644 index 00000000..a5e011da --- /dev/null +++ b/cog-fixture/example_predictor.ts @@ -0,0 +1,10 @@ +// Example function-based predictor +export function simplePredictFunction(input) { + const multiplier = input.multiplier || 1; + return { + original: input.text, + uppercase: input.text.toUpperCase(), + length: input.text.length * multiplier, + timestamp: new Date().toISOString(), + }; +} diff --git a/deno-coglet/README.md b/deno-coglet/README.md new file mode 100644 index 00000000..9742dc9e --- /dev/null +++ b/deno-coglet/README.md @@ -0,0 +1,86 @@ +# Deno Coglet + +A Deno/TypeScript implementation of the Cog runtime interface that allows JavaScript/TypeScript predictors to work with the Cog Go runner. + +## Overview + +This implementation provides the same file-based IPC interface as the Python coglet, enabling the Go runner to manage JavaScript/TypeScript prediction functions and classes. + +## Architecture + +The communication protocol uses: +- **File-based IPC** for exchanging requests and responses +- **HTTP status updates** to notify the Go runner of state changes +- **Formatted logging** for proper log routing + +## Usage + +### Running the Coglet + +```bash +deno run --allow-read --allow-write --allow-net --allow-env \ + coglet.ts \ + --ipc-url http://localhost:8080 \ + --working-dir /tmp/work +``` + +### Creating a Predictor + +You can create predictors as functions or classes: + +```typescript +// Function predictor +export function myPredictor(input: { text: string }) { + return { result: input.text.toUpperCase() }; +} + +// Class predictor with setup +export class MyPredictor { + private model: any; + + async setup() { + // Initialize your model here + this.model = await loadModel(); + } + + async predict(input: { prompt: string }) { + return await this.model.generate(input.prompt); + } +} +``` + +### Integration with Go Runner + +To use this with the Go runner, you would need to modify the runner to use Deno instead of Python: + +```go +// In runner.go +args := []string{ + "run", + "--allow-read", "--allow-write", "--allow-net", "--allow-env", + "/path/to/coglet.ts", + "--ipc-url", ipcUrl, + "--working-dir", workingDir, +} +cmd := exec.Command("deno", args...) +``` + +## File Interface + +- `config.json` - Initial configuration from Go +- `request-{id}.json` - Prediction requests +- `response-{id}-{epoch}.json` - Prediction responses +- `cancel-{id}` - Cancel specific prediction +- `stop` - Shutdown signal +- `setup_result.json` - Setup status +- `openapi.json` - API schema + +## Testing + +Run the test script to see it in action: + +```bash +./test_runner.sh +``` + +This will start a mock IPC server, run the coglet with an example predictor, and send a test prediction request. \ No newline at end of file diff --git a/deno-coglet/coglet.ts b/deno-coglet/coglet.ts new file mode 100644 index 00000000..3d5ddaff --- /dev/null +++ b/deno-coglet/coglet.ts @@ -0,0 +1,336 @@ +#!/usr/bin/env -S deno run --allow-read --allow-write --allow-net --allow-env + +import { parse } from "https://deno.land/std@0.220.0/flags/mod.ts"; +import { join } from "https://deno.land/std@0.220.0/path/mod.ts"; +import { exists } from "https://deno.land/std@0.220.0/fs/mod.ts"; + +interface Config { + module_name: string; + predictor_name: string; + max_concurrency: number; +} + +interface PredictionRequest { + id: string; + input: Record; + created_at?: string; + started_at?: string; + webhook?: string; + webhook_events_filter?: string[]; + output_file_prefix?: string; +} + +interface PredictionResponse { + id: string; + input: Record; + output?: unknown; + logs?: string; + error?: string; + status: "starting" | "processing" | "succeeded" | "failed" | "canceled"; + created_at?: string; + started_at?: string; + completed_at?: string; + metrics?: Record; +} + +class Logger { + private pid?: string; + + setPid(pid: string | undefined) { + this.pid = pid; + } + + log(message: string) { + if (this.pid) { + console.log(`[pid=${this.pid}] ${message}`); + } else { + console.log(message); + } + } + + error(message: string) { + if (this.pid) { + console.error(`[pid=${this.pid}] ${message}`); + } else { + console.error(message); + } + } +} + +const logger = new Logger(); + +interface ClassPredictor { + (): void; + predict(input: unknown): unknown; +} + +type PredictFn = (input: unknown) => unknown; + +class FileRunner { + private workingDir: string; + private ipcUrl: string; + private config?: Config; + private predictfn: PredictFn = () => { throw new Error("Not implemented") }; + private running = new Map(); + private stopped = false; + + constructor(workingDir: string, ipcUrl: string) { + this.workingDir = workingDir; + this.ipcUrl = ipcUrl; + } + + async start() { + logger.log("[coglet] Starting file runner"); + + // Wait for config + await this.waitForConfig(); + + // Load predictor + await this.loadPredictor(); + + // Run setup if available + await this.runSetup(); + + // Send ready status + await this.sendStatus("READY"); + + // Start monitoring loop + await this.monitorLoop(); + } + + private async waitForConfig() { + const configPath = join(this.workingDir, "config.json"); + logger.log(`[coglet] Waiting for config at ${configPath}`); + + while (!await exists(configPath)) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + + const configData = await Deno.readTextFile(configPath); + this.config = JSON.parse(configData); + logger.log(`[coglet] Loaded config: ${JSON.stringify(this.config)}`); + } + + private async loadPredictor() { + if (!this.config) throw new Error("Config not loaded"); + + try { + // Import the module dynamically + const modulePath = new URL(this.config.module_name, `file://${Deno.cwd()}/`).href; + const module = await import(modulePath); + + // Get the predictor (class or function) + try { + const instance = (new module[this.config.predictor_name]); + this.predictfn = instance.predict.bind(instance) + } catch (err) { + this.predictfn = module[this.config.predictor_name]; + } + + if (!this.predictfn) { + throw new Error(`Predictor ${this.config.predictor_name} not found in module`); + } + + logger.log(`[coglet] Loaded predictor: ${this.config.predictor_name}`); + } catch (error) { + logger.error(`[coglet] Failed to load predictor: ${error}`); + throw error; + } + } + + private async runSetup() { + const setupResult: { status: string, started_at: string, completed_at?: string, logs: string } = { + status: "started", + started_at: new Date().toISOString().replace("Z", "+00:00"), + logs: "", + }; + + try { + setupResult.status = "succeeded"; + setupResult.completed_at = new Date().toISOString().replace("Z", "+00:00"); + } catch (error) { + logger.error(`[coglet] Setup failed: ${error}`); + setupResult.status = "failed"; + setupResult.completed_at = new Date().toISOString().replace("Z", "+00:00"); + setupResult.logs += `Setup error: ${error}\n`; + } + + // Write setup result + await Deno.writeTextFile( + join(this.workingDir, "setup_result.json"), + JSON.stringify(setupResult) + ); + + // Write OpenAPI schema (simplified version) + const schema = { + openapi: "3.0.0", + info: { title: "Prediction API", version: "1.0.0" }, + paths: {}, + components: { + schemas: { + Input: { + type: "object", + additionalProperties: true, + } + } + } + }; + await Deno.writeTextFile( + join(this.workingDir, "openapi.json"), + JSON.stringify(schema) + ); + } + + private async sendStatus(status: string) { + try { + await fetch(this.ipcUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ status }), + }); + } catch (error) { + logger.error(`[coglet] Failed to send status ${status}: ${error}`); + } + } + + private async monitorLoop() { + logger.log("[coglet] Starting monitor loop"); + + while (!this.stopped) { + try { + // Check for stop file + if (await exists(join(this.workingDir, "stop"))) { + logger.log("[coglet] Stop file detected, shutting down"); + this.stopped = true; + break; + } + + // Read directory entries + const entries = []; + for await (const entry of Deno.readDir(this.workingDir)) { + entries.push(entry); + } + + // Process request files + for (const entry of entries) { + if (entry.isFile && entry.name.startsWith("request-") && entry.name.endsWith(".json")) { + const id = entry.name.replace("request-", "").replace(".json", ""); + + if (!this.running.has(id) && this.running.size < this.config!.max_concurrency) { + this.processRequest(id); + } + } + } + + // Update status based on capacity + const isBusy = this.running.size >= this.config!.max_concurrency; + await this.sendStatus(isBusy ? "BUSY" : "READY"); + + } catch (error) { + logger.error(`[coglet] Monitor loop error: ${error}`); + } + + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + + private async processRequest(id: string) { + this.running.set(id, true); + logger.setPid(id); + + const requestPath = join(this.workingDir, `request-${id}.json`); + const cancelPath = join(this.workingDir, `cancel-${id}`); + + try { + // Read request + const requestData = await Deno.readTextFile(requestPath); + const request: PredictionRequest = JSON.parse(requestData); + + // Delete request file + await Deno.remove(requestPath); + + logger.log(`Processing prediction`); + + // Initialize response + let response: PredictionResponse = { + id: request.id, + input: request.input, + status: "starting", + created_at: request.created_at, + started_at: request.started_at || new Date().toISOString().replace("Z", "+00:00"), + logs: "", + }; + + // Write initial response + await this.writeResponse(id, response, 0); + + // Check for cancellation + if (await exists(cancelPath)) { + response.status = "canceled"; + response.completed_at = new Date().toISOString().replace("Z", "+00:00"); + await this.writeResponse(id, response, 1); + return; + } + + // Update status to processing + response.status = "processing"; + await this.writeResponse(id, response, 1); + + // Run prediction + try { + const output = await this.predictfn(request.input); + response.output = output; + response.status = "succeeded"; + } catch (error) { + logger.error(`Prediction error: ${error}`); + response.error = String(error); + response.status = "failed"; + } + + // Final response + response.completed_at = new Date().toISOString().replace("Z", "+00:00"); + await this.writeResponse(id, response, 2); + + } catch (error) { + logger.error(`Failed to process request: ${error}`); + } finally { + this.running.delete(id); + logger.setPid(undefined); + + // Clean up cancel file if exists + try { + if (await exists(cancelPath)) { + await Deno.remove(cancelPath); + } + } catch { } + } + } + + private async writeResponse(id: string, response: PredictionResponse, epoch: number) { + const filename = `response-${id}-${epoch}.json`; + await Deno.writeTextFile( + join(this.workingDir, filename), + JSON.stringify(response) + ); + await this.sendStatus("OUTPUT"); + } +} + +// Main entry point +async function main() { + const args = parse(Deno.args, { + string: ["ipc-url", "working-dir"], + }); + + if (!args["ipc-url"] || !args["working-dir"]) { + console.error("Usage: coglet.ts --ipc-url --working-dir "); + Deno.exit(1); + } + + const runner = new FileRunner(args["working-dir"], args["ipc-url"]); + await runner.start(); +} + +if (import.meta.main) { + main(); +} diff --git a/deno-coglet/test_runner.sh b/deno-coglet/test_runner.sh new file mode 100755 index 00000000..9a64cc83 --- /dev/null +++ b/deno-coglet/test_runner.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +# Test script for Deno coglet + +echo "Creating test working directory..." +WORKING_DIR=$(mktemp -d) +echo "Working directory: $WORKING_DIR" + +# Start a simple HTTP server to receive IPC status updates +echo "Starting IPC server..." +deno run --allow-net - <<'EOF' & +const server = Deno.serve({ port: 8080 }, (req) => { + const body = req.json(); + body.then(data => { + console.log(`[IPC Server] Received status: ${data.status}`); + }); + return new Response("OK"); +}); +console.log("[IPC Server] Listening on http://localhost:8080"); +EOF +IPC_PID=$! + +sleep 2 + +# Start the coglet +echo "Starting Deno coglet..." +deno run --allow-read --allow-write --allow-net --allow-env \ + coglet.ts \ + --ipc-url http://localhost:8080 \ + --working-dir "$WORKING_DIR" & +COGLET_PID=$! + +# Wait a bit for coglet to start +sleep 1 + +# Write config +echo "Writing config..." +cat > "$WORKING_DIR/config.json" < "$WORKING_DIR/request-test123.json" </dev/null; then + echo "Response received:" + cat "$WORKING_DIR"/response-test123-*.json | jq . + break + fi + sleep 0.5 +done + +# Cleanup +echo "Cleaning up..." +echo "" > "$WORKING_DIR/stop" +sleep 1 +kill $COGLET_PID $IPC_PID 2>/dev/null +rm -rf "$WORKING_DIR" + +echo "Test complete!" \ No newline at end of file diff --git a/internal/server/runner.go b/internal/server/runner.go index 398c295d..b1f51ee1 100644 --- a/internal/server/runner.go +++ b/internal/server/runner.go @@ -11,6 +11,7 @@ import ( "os" "os/exec" "path" + "path/filepath" "regexp" "slices" "strconv" @@ -127,9 +128,50 @@ func NewRunner(ipcUrl, uploadUrl string) *Runner { } func NewProcedureRunner(ipcUrl, uploadUrl, srcDir string) *Runner { - r := NewRunner(ipcUrl, uploadUrl) - r.cmd.Dir = srcDir - return r + // Check if we have a JS/TS predictor by reading cog.yaml + needsJavaScriptRuntime := false + if y, err := util.ReadCogYaml(srcDir); err == nil { + moduleName, _, err := y.PredictModuleAndPredictor() + if err == nil && (strings.HasSuffix(moduleName, ".js") || strings.HasSuffix(moduleName, ".ts")) { + needsJavaScriptRuntime = true + } + } + + workingDir := must.Get(os.MkdirTemp("", "cog-runner-")) + var cmd *exec.Cmd + + if needsJavaScriptRuntime { + bin, err := filepath.Abs("./coglet-deno") + if err != nil { + panic(err) + } + args := []string{ + "--ipc-url", ipcUrl, + "--working-dir", workingDir, + } + cmd = exec.Command(bin, args...) + } else { + // Use Python for Python predictors + args := []string{ + "-u", + "-m", "coglet", + "--ipc-url", ipcUrl, + "--working-dir", workingDir, + } + cmd = exec.Command("python3", args...) + } + + cmd.Dir = srcDir + + return &Runner{ + workingDir: workingDir, + cmd: *cmd, + status: StatusStarting, + maxConcurrency: 1, + pending: make(map[string]*PendingPrediction), + uploadUrl: uploadUrl, + stopped: make(chan bool), + } } func (r *Runner) SrcDir() string { diff --git a/internal/util/util.go b/internal/util/util.go index 0a810d60..1165c0ac 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -42,7 +42,12 @@ func (y *CogYaml) PredictModuleAndPredictor() (string, string, error) { if len(parts) != 2 { return "", "", fmt.Errorf("invalid predict: %s", y.Predict) } - moduleName := strings.TrimSuffix(parts[0], ".py") + moduleName := parts[0] + // For Python files, trim the .py extension + if strings.HasSuffix(moduleName, ".py") { + moduleName = strings.TrimSuffix(moduleName, ".py") + } + // For JS/TS files, keep the extension predictorName := parts[1] return moduleName, predictorName, nil }