From e26a4f4bc3dbedd4e5f524d97b0f43721229954b Mon Sep 17 00:00:00 2001 From: Gregor Martynus <39992+gr2m@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:08:01 -0800 Subject: [PATCH] use @ai-sdk/durable-agent --- packages/ai/package.json | 5 +- packages/ai/src/agent.ts | 4 + packages/ai/src/agent/do-stream-step.test.ts | 128 - packages/ai/src/agent/do-stream-step.ts | 665 ------ packages/ai/src/agent/durable-agent.test.ts | 2054 ----------------- packages/ai/src/agent/durable-agent.ts | 1203 ---------- .../ai/src/agent/stream-text-iterator.test.ts | 429 ---- packages/ai/src/agent/stream-text-iterator.ts | 476 ---- packages/ai/src/agent/tools-to-model-tools.ts | 13 - packages/ai/src/agent/types.ts | 36 - pnpm-lock.yaml | 77 + 11 files changed, 84 insertions(+), 5006 deletions(-) create mode 100644 packages/ai/src/agent.ts delete mode 100644 packages/ai/src/agent/do-stream-step.test.ts delete mode 100644 packages/ai/src/agent/do-stream-step.ts delete mode 100644 packages/ai/src/agent/durable-agent.test.ts delete mode 100644 packages/ai/src/agent/durable-agent.ts delete mode 100644 packages/ai/src/agent/stream-text-iterator.test.ts delete mode 100644 packages/ai/src/agent/stream-text-iterator.ts delete mode 100644 packages/ai/src/agent/tools-to-model-tools.ts delete mode 100644 packages/ai/src/agent/types.ts diff --git a/packages/ai/package.json b/packages/ai/package.json index 14682c1b4d..bfacc98c50 100644 --- a/packages/ai/package.json +++ b/packages/ai/package.json @@ -31,8 +31,8 @@ "default": "./dist/index.js" }, "./agent": { - "types": "./dist/agent/durable-agent.d.ts", - "default": "./dist/agent/durable-agent.js" + "types": "./dist/agent.d.ts", + "default": "./dist/agent.js" }, "./anthropic": { "types": "./dist/providers/anthropic.d.ts", @@ -72,6 +72,7 @@ "workflow": "^4.1.0-beta.52" }, "dependencies": { + "@ai-sdk/durable-agent": "0.0.0-27b68fc9-20260206061543", "@ai-sdk/provider": "^2.0.0 || ^3.0.0", "zod": "catalog:" }, diff --git a/packages/ai/src/agent.ts b/packages/ai/src/agent.ts new file mode 100644 index 0000000000..f542b72362 --- /dev/null +++ b/packages/ai/src/agent.ts @@ -0,0 +1,4 @@ +/** + * Re-export DurableAgent from @ai-sdk/durable-agent + */ +export * from '@ai-sdk/durable-agent'; diff --git a/packages/ai/src/agent/do-stream-step.test.ts b/packages/ai/src/agent/do-stream-step.test.ts deleted file mode 100644 index b9220a2888..0000000000 --- a/packages/ai/src/agent/do-stream-step.test.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { describe, expect, it } from 'vitest'; -import { normalizeFinishReason } from './do-stream-step.js'; - -describe('normalizeFinishReason', () => { - describe('string finish reasons', () => { - it('should pass through "stop"', () => { - expect(normalizeFinishReason('stop')).toBe('stop'); - }); - - it('should pass through "tool-calls"', () => { - expect(normalizeFinishReason('tool-calls')).toBe('tool-calls'); - }); - - it('should pass through "length"', () => { - expect(normalizeFinishReason('length')).toBe('length'); - }); - - it('should pass through "content-filter"', () => { - expect(normalizeFinishReason('content-filter')).toBe('content-filter'); - }); - - it('should pass through "error"', () => { - expect(normalizeFinishReason('error')).toBe('error'); - }); - - it('should pass through "other"', () => { - expect(normalizeFinishReason('other')).toBe('other'); - }); - - it('should pass through "unknown"', () => { - expect(normalizeFinishReason('unknown')).toBe('unknown'); - }); - }); - - describe('object finish reasons', () => { - it('should extract "stop" from object', () => { - expect(normalizeFinishReason({ type: 'stop' })).toBe('stop'); - }); - - it('should extract "tool-calls" from object', () => { - expect(normalizeFinishReason({ type: 'tool-calls' })).toBe('tool-calls'); - }); - - it('should extract "length" from object', () => { - expect(normalizeFinishReason({ type: 'length' })).toBe('length'); - }); - - it('should extract "content-filter" from object', () => { - expect(normalizeFinishReason({ type: 'content-filter' })).toBe( - 'content-filter' - ); - }); - - it('should extract "error" from object', () => { - expect(normalizeFinishReason({ type: 'error' })).toBe('error'); - }); - - it('should extract "other" from object', () => { - expect(normalizeFinishReason({ type: 'other' })).toBe('other'); - }); - - it('should extract "unknown" from object', () => { - expect(normalizeFinishReason({ type: 'unknown' })).toBe('unknown'); - }); - - it('should return "unknown" for object without type property', () => { - expect(normalizeFinishReason({})).toBe('unknown'); - }); - - it('should return "unknown" for object with null type', () => { - expect(normalizeFinishReason({ type: null })).toBe('unknown'); - }); - - it('should return "unknown" for object with undefined type', () => { - expect(normalizeFinishReason({ type: undefined })).toBe('unknown'); - }); - - it('should handle object with additional properties', () => { - expect( - normalizeFinishReason({ - type: 'stop', - reason: 'end_turn', - metadata: { foo: 'bar' }, - }) - ).toBe('stop'); - }); - }); - - describe('edge cases', () => { - it('should return "unknown" for undefined', () => { - expect(normalizeFinishReason(undefined)).toBe('unknown'); - }); - - it('should return "unknown" for null', () => { - expect(normalizeFinishReason(null)).toBe('unknown'); - }); - - it('should return "unknown" for number', () => { - expect(normalizeFinishReason(42)).toBe('unknown'); - }); - - it('should return "unknown" for boolean', () => { - expect(normalizeFinishReason(true)).toBe('unknown'); - }); - - it('should return "unknown" for array', () => { - expect(normalizeFinishReason(['stop'])).toBe('unknown'); - }); - - it('should handle empty string', () => { - expect(normalizeFinishReason('')).toBe(''); - }); - }); - - describe('bug reproduction', () => { - it('should handle object format that caused [object Object] error', () => { - const normalized = normalizeFinishReason({ type: 'stop' }); - expect(normalized).toBe('stop'); - expect(typeof normalized).toBe('string'); - }); - - it('should handle tool-calls object format', () => { - const normalized = normalizeFinishReason({ type: 'tool-calls' }); - expect(normalized).toBe('tool-calls'); - expect(typeof normalized).toBe('string'); - }); - }); -}); diff --git a/packages/ai/src/agent/do-stream-step.ts b/packages/ai/src/agent/do-stream-step.ts deleted file mode 100644 index b0d89212f5..0000000000 --- a/packages/ai/src/agent/do-stream-step.ts +++ /dev/null @@ -1,665 +0,0 @@ -import type { - LanguageModelV2CallOptions, - LanguageModelV2Prompt, - LanguageModelV2StreamPart, - LanguageModelV2ToolCall, - LanguageModelV2ToolChoice, - SharedV2ProviderOptions, -} from '@ai-sdk/provider'; -import { - type FinishReason, - gateway, - generateId, - type StepResult, - type StopCondition, - type ToolChoice, - type ToolSet, - type UIMessageChunk, -} from 'ai'; -import type { - ProviderOptions, - StreamTextTransform, - TelemetrySettings, -} from './durable-agent.js'; -import type { CompatibleLanguageModel } from './types.js'; - -export type FinishPart = Extract; - -export type ModelStopCondition = StopCondition>; - -/** - * Provider-executed tool result captured from the stream. - */ -export interface ProviderExecutedToolResult { - toolCallId: string; - toolName: string; - result: unknown; - isError?: boolean; -} - -/** - * Convert a Uint8Array to a base64 string safely. - * Uses a loop instead of spread operator to avoid stack overflow on large arrays. - */ -function uint8ArrayToBase64(data: Uint8Array): string { - let binary = ''; - for (let i = 0; i < data.length; i++) { - binary += String.fromCharCode(data[i]); - } - return btoa(binary); -} - -/** - * Options for the doStreamStep function. - */ -export interface DoStreamStepOptions { - sendStart?: boolean; - maxOutputTokens?: number; - temperature?: number; - topP?: number; - topK?: number; - presencePenalty?: number; - frequencyPenalty?: number; - stopSequences?: string[]; - seed?: number; - maxRetries?: number; - abortSignal?: AbortSignal; - headers?: Record; - providerOptions?: ProviderOptions; - toolChoice?: ToolChoice; - includeRawChunks?: boolean; - experimental_telemetry?: TelemetrySettings; - transforms?: Array>; - responseFormat?: LanguageModelV2CallOptions['responseFormat']; - /** - * If true, collects and returns all UIMessageChunks written to the stream. - * This is used by DurableAgent when collectUIMessages is enabled. - */ - collectUIChunks?: boolean; -} - -/** - * Convert AI SDK ToolChoice to LanguageModelV2ToolChoice - */ -function toLanguageModelToolChoice( - toolChoice: ToolChoice | undefined -): LanguageModelV2ToolChoice | undefined { - if (toolChoice === undefined) { - return undefined; - } - if (toolChoice === 'auto') { - return { type: 'auto' }; - } - if (toolChoice === 'none') { - return { type: 'none' }; - } - if (toolChoice === 'required') { - return { type: 'required' }; - } - if (typeof toolChoice === 'object' && toolChoice.type === 'tool') { - return { type: 'tool', toolName: toolChoice.toolName }; - } - return undefined; -} - -export async function doStreamStep( - conversationPrompt: LanguageModelV2Prompt, - modelInit: string | (() => Promise), - writable: WritableStream, - tools?: LanguageModelV2CallOptions['tools'], - options?: DoStreamStepOptions -) { - 'use step'; - - // Model can be LanguageModelV2 (AI SDK v5) or LanguageModelV3 (AI SDK v6) - // Both have compatible doStream interfaces for our use case - let model: CompatibleLanguageModel | undefined; - if (typeof modelInit === 'string') { - // gateway() returns LanguageModelV2 in AI SDK v5 and LanguageModelV3 in AI SDK v6 - // Both are compatible at runtime for doStream operations - model = gateway(modelInit) as CompatibleLanguageModel; - } else if (typeof modelInit === 'function') { - // User-provided model factory - could return V2 or V3 - model = await modelInit(); - } else { - throw new Error( - 'Invalid "model initialization" argument. Must be a string or a function that returns a LanguageModel instance.' - ); - } - - // Build call options with all generation settings - const callOptions: LanguageModelV2CallOptions = { - prompt: conversationPrompt, - tools, - ...(options?.maxOutputTokens !== undefined && { - maxOutputTokens: options.maxOutputTokens, - }), - ...(options?.temperature !== undefined && { - temperature: options.temperature, - }), - ...(options?.topP !== undefined && { topP: options.topP }), - ...(options?.topK !== undefined && { topK: options.topK }), - ...(options?.presencePenalty !== undefined && { - presencePenalty: options.presencePenalty, - }), - ...(options?.frequencyPenalty !== undefined && { - frequencyPenalty: options.frequencyPenalty, - }), - ...(options?.stopSequences !== undefined && { - stopSequences: options.stopSequences, - }), - ...(options?.seed !== undefined && { seed: options.seed }), - ...(options?.abortSignal !== undefined && { - abortSignal: options.abortSignal, - }), - ...(options?.headers !== undefined && { headers: options.headers }), - ...(options?.providerOptions !== undefined && { - providerOptions: options.providerOptions as SharedV2ProviderOptions, - }), - ...(options?.toolChoice !== undefined && { - toolChoice: toLanguageModelToolChoice(options.toolChoice), - }), - ...(options?.includeRawChunks !== undefined && { - includeRawChunks: options.includeRawChunks, - }), - ...(options?.responseFormat !== undefined && { - responseFormat: options.responseFormat, - }), - }; - - const result = await model.doStream(callOptions); - - let finish: FinishPart | undefined; - const toolCalls: LanguageModelV2ToolCall[] = []; - // Map of tool call ID to provider-executed tool result - const providerExecutedToolResults = new Map< - string, - ProviderExecutedToolResult - >(); - const chunks: LanguageModelV2StreamPart[] = []; - const includeRawChunks = options?.includeRawChunks ?? false; - const collectUIChunks = options?.collectUIChunks ?? false; - const uiChunks: UIMessageChunk[] = []; - - // Build the stream pipeline - let stream: ReadableStream = result.stream; - - // Apply custom transforms if provided - if (options?.transforms && options.transforms.length > 0) { - let terminated = false; - const stopStream = () => { - terminated = true; - }; - - for (const transform of options.transforms) { - if (!terminated) { - stream = stream.pipeThrough( - transform({ - tools: {} as ToolSet, // Note: toolSet not available inside step boundary due to serialization - stopStream, - }) - ); - } - } - } - - await stream - .pipeThrough( - new TransformStream({ - async transform(chunk, controller) { - if (chunk.type === 'tool-call') { - toolCalls.push({ - ...chunk, - input: chunk.input || '{}', - }); - } else if (chunk.type === 'tool-result') { - // Capture provider-executed tool results - if (chunk.providerExecuted) { - providerExecutedToolResults.set(chunk.toolCallId, { - toolCallId: chunk.toolCallId, - toolName: chunk.toolName, - result: chunk.result, - isError: chunk.isError, - }); - } - } else if (chunk.type === 'finish') { - finish = chunk; - } - chunks.push(chunk); - controller.enqueue(chunk); - }, - }) - ) - .pipeThrough( - new TransformStream({ - start: (controller) => { - if (options?.sendStart) { - controller.enqueue({ - type: 'start', - // Note that if useChat is used client-side, useChat will generate a different - // messageId. It's hard to work around this. - messageId: generateId(), - }); - } - controller.enqueue({ - type: 'start-step', - }); - }, - flush: (controller) => { - controller.enqueue({ - type: 'finish-step', - }); - }, - transform: async (part, controller) => { - const partType = part.type; - switch (partType) { - case 'text-start': { - controller.enqueue({ - type: 'text-start', - id: part.id, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'text-delta': { - controller.enqueue({ - type: 'text-delta', - id: part.id, - delta: part.delta, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'text-end': { - controller.enqueue({ - type: 'text-end', - id: part.id, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'reasoning-start': { - controller.enqueue({ - type: 'reasoning-start', - id: part.id, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'reasoning-delta': { - controller.enqueue({ - type: 'reasoning-delta', - id: part.id, - delta: part.delta, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - - break; - } - - case 'reasoning-end': { - controller.enqueue({ - type: 'reasoning-end', - id: part.id, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'file': { - // Convert data to URL, handling Uint8Array, URL, and string cases - let url: string; - const fileData = part.data as Uint8Array | string | URL; - if (fileData instanceof Uint8Array) { - // Convert Uint8Array to base64 and create data URL - const base64 = uint8ArrayToBase64(fileData); - url = `data:${part.mediaType};base64,${base64}`; - } else if (fileData instanceof URL) { - // Use URL directly (could be a data URL or remote URL) - url = fileData.href; - } else if ( - fileData.startsWith('data:') || - fileData.startsWith('http:') || - fileData.startsWith('https:') - ) { - // Already a URL string - url = fileData; - } else { - // Assume it's base64-encoded data - url = `data:${part.mediaType};base64,${fileData}`; - } - controller.enqueue({ - type: 'file', - mediaType: part.mediaType, - url, - }); - break; - } - - case 'source': { - if (part.sourceType === 'url') { - controller.enqueue({ - type: 'source-url', - sourceId: part.id, - url: part.url, - title: part.title, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - } - - if (part.sourceType === 'document') { - controller.enqueue({ - type: 'source-document', - sourceId: part.id, - mediaType: part.mediaType, - title: part.title, - filename: part.filename, - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - } - break; - } - - case 'tool-input-start': { - controller.enqueue({ - type: 'tool-input-start', - toolCallId: part.id, - toolName: part.toolName, - ...(part.providerExecuted != null - ? { providerExecuted: part.providerExecuted } - : {}), - }); - break; - } - - case 'tool-input-delta': { - controller.enqueue({ - type: 'tool-input-delta', - toolCallId: part.id, - inputTextDelta: part.delta, - }); - break; - } - - case 'tool-input-end': { - // End of tool input streaming - no UI chunk needed - break; - } - - case 'tool-call': { - controller.enqueue({ - type: 'tool-input-available', - toolCallId: part.toolCallId, - toolName: part.toolName, - input: JSON.parse(part.input || '{}'), - ...(part.providerExecuted != null - ? { providerExecuted: part.providerExecuted } - : {}), - ...(part.providerMetadata != null - ? { providerMetadata: part.providerMetadata } - : {}), - }); - break; - } - - case 'tool-result': { - controller.enqueue({ - type: 'tool-output-available', - toolCallId: part.toolCallId, - output: part.result, - ...(part.providerExecuted != null - ? { providerExecuted: part.providerExecuted } - : {}), - }); - break; - } - - case 'error': { - const error = part.error; - controller.enqueue({ - type: 'error', - errorText: - error instanceof Error ? error.message : String(error), - }); - - break; - } - - case 'stream-start': { - // Stream start is internal, no UI chunk needed - break; - } - - case 'response-metadata': { - // Response metadata is internal, no UI chunk needed - break; - } - - case 'finish': { - // Finish is handled separately - break; - } - - case 'raw': { - // Raw chunks are only included if explicitly requested - if (includeRawChunks) { - // Raw chunks contain provider-specific data - // We don't have a direct mapping to UIMessageChunk - // but we can log or handle them if needed - } - break; - } - - default: { - // Handle any other chunk types gracefully - // const exhaustiveCheck: never = partType; - // console.warn(`Unknown chunk type: ${partType}`); - } - } - }, - }) - ) - .pipeThrough( - // Optionally collect UIMessageChunks for later conversion to UIMessage[] - new TransformStream({ - transform: (chunk, controller) => { - if (collectUIChunks) { - uiChunks.push(chunk); - } - controller.enqueue(chunk); - }, - }) - ) - .pipeTo(writable, { preventClose: true }); - - const step = chunksToStep(chunks, toolCalls, conversationPrompt, finish); - return { - toolCalls, - finish, - step, - uiChunks: collectUIChunks ? uiChunks : undefined, - providerExecutedToolResults, - }; -} - -/** - * Normalize the finish reason to the AI SDK FinishReason type. - * AI SDK v6 may return an object with a 'type' property, - * while AI SDK v5 returns a plain string. This function handles both. - * - * @internal Exported for testing - */ -export function normalizeFinishReason(rawFinishReason: unknown): FinishReason { - // Handle object-style finish reason (possible in some AI SDK versions/providers) - if (typeof rawFinishReason === 'object' && rawFinishReason !== null) { - const objReason = rawFinishReason as { type?: string }; - return (objReason.type as FinishReason) ?? 'unknown'; - } - // Handle string finish reason (standard format) - if (typeof rawFinishReason === 'string') { - return rawFinishReason as FinishReason; - } - return 'unknown'; -} - -// This is a stand-in for logic in the AI-SDK streamText code which aggregates -// chunks into a single step result. -function chunksToStep( - chunks: LanguageModelV2StreamPart[], - toolCalls: LanguageModelV2ToolCall[], - conversationPrompt: LanguageModelV2Prompt, - finish?: FinishPart -): StepResult { - // Transform chunks to a single step result - const text = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'text-delta' - ) - .map((chunk) => chunk.delta) - .join(''); - - const reasoning = chunks.filter( - (chunk): chunk is Extract => - chunk.type === 'reasoning-delta' - ); - - const reasoningText = reasoning.map((chunk) => chunk.delta).join(''); - - // Extract warnings from stream-start chunk - const streamStart = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'stream-start' - ); - - // Extract response metadata from response-metadata chunk - const responseMetadata = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'response-metadata' - ); - - // Extract files from file chunks - // File chunks contain mediaType and data (base64 string or Uint8Array) - // GeneratedFile requires both base64 and uint8Array properties - const files = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'file' - ) - .map((chunk) => { - const data = chunk.data; - // If data is already a Uint8Array, convert to base64; otherwise use as-is - if (data instanceof Uint8Array) { - // Convert Uint8Array to base64 string - const base64 = uint8ArrayToBase64(data); - return { - mediaType: chunk.mediaType, - base64, - uint8Array: data, - }; - } else { - // Data is base64 string, decode to Uint8Array - const binaryString = atob(data); - const bytes = new Uint8Array(binaryString.length); - for (let i = 0; i < binaryString.length; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - return { - mediaType: chunk.mediaType, - base64: data, - uint8Array: bytes, - }; - } - }); - - // Extract sources from source chunks - const sources = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'source' - ) - .map((chunk) => chunk); - - const stepResult: StepResult = { - content: [ - ...(text ? [{ type: 'text' as const, text }] : []), - ...toolCalls.map((toolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: JSON.parse(toolCall.input), - dynamic: true as const, - })), - ], - text, - reasoning: reasoning.map((chunk) => ({ - type: 'reasoning' as const, - text: chunk.delta, - })), - reasoningText: reasoningText || undefined, - files, - sources, - toolCalls: toolCalls.map((toolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: JSON.parse(toolCall.input), - dynamic: true as const, - })), - staticToolCalls: [], - dynamicToolCalls: toolCalls.map((toolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: JSON.parse(toolCall.input), - dynamic: true as const, - })), - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: normalizeFinishReason(finish?.finishReason), - usage: finish?.usage || { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, - warnings: streamStart?.warnings, - request: { - body: JSON.stringify({ - prompt: conversationPrompt, - tools: toolCalls.map((toolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: JSON.parse(toolCall.input), - dynamic: true as const, - })), - }), - }, - response: { - id: responseMetadata?.id ?? 'unknown', - timestamp: responseMetadata?.timestamp ?? new Date(), - modelId: responseMetadata?.modelId ?? 'unknown', - messages: [], - }, - providerMetadata: finish?.providerMetadata || {}, - }; - - return stepResult; -} diff --git a/packages/ai/src/agent/durable-agent.test.ts b/packages/ai/src/agent/durable-agent.test.ts deleted file mode 100644 index 383207c222..0000000000 --- a/packages/ai/src/agent/durable-agent.test.ts +++ /dev/null @@ -1,2054 +0,0 @@ -/** - * Tests for DurableAgent - * - * These tests focus on error handling in tool execution, - * particularly for FatalError conversion to tool result errors, - * and verifying that messages are properly passed to tool execute functions. - */ -import type { - LanguageModelV2, - LanguageModelV2Prompt, - LanguageModelV2ToolCall, - LanguageModelV2ToolResultPart, -} from '@ai-sdk/provider'; -import type { StepResult, ToolSet } from 'ai'; -import { describe, expect, it, vi } from 'vitest'; -import { FatalError } from 'workflow'; -import { z } from 'zod'; - -// Mock the streamTextIterator -vi.mock('./stream-text-iterator.js', () => ({ - streamTextIterator: vi.fn(), -})); - -// Import after mocking -const { DurableAgent } = await import('./durable-agent.js'); - -import type { - PrepareStepCallback, - ToolCallRepairFunction, -} from './durable-agent.js'; -import type { StreamTextIteratorYieldValue } from './stream-text-iterator.js'; - -/** - * Creates a mock LanguageModelV2 for testing - */ -function createMockModel(): LanguageModelV2 { - return { - specificationVersion: 'v2' as const, - provider: 'test', - modelId: 'test-model', - doGenerate: vi.fn(), - doStream: vi.fn(), - supportedUrls: {}, - }; -} - -/** - * Type for the mock iterator used in tests - */ -type MockIterator = AsyncGenerator< - StreamTextIteratorYieldValue, - LanguageModelV2Prompt, - LanguageModelV2ToolResultPart[] ->; - -describe('DurableAgent', () => { - describe('tool execution error handling', () => { - it('should convert FatalError to tool error result', async () => { - const errorMessage = 'This is a fatal error'; - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({}), - execute: async () => { - throw new FatalError(errorMessage); - }, - }, - }; - - // We need to test the executeTool function indirectly through the agent - // Create a mock model that will trigger tool calls - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - // Create a mock writable stream - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - // Mock the streamTextIterator to return tool calls and then complete - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{}', - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - // Execute the stream - this should not throw even though the tool throws FatalError - await expect( - agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }) - ).resolves.not.toThrow(); - - // Verify that the iterator was called with tool results including the error - expect(mockIterator.next).toHaveBeenCalledTimes(2); - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(1); - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'test-call-id', - toolName: 'testTool', - output: { - type: 'error-text', - value: errorMessage, - }, - }); - }); - - it('should re-throw non-FatalError errors for retry', async () => { - const errorMessage = 'This is a retryable error'; - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({}), - execute: async () => { - throw new Error(errorMessage); - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{}', - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - }, - }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - // Execute should throw because non-FatalErrors are re-thrown - await expect( - agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }) - ).rejects.toThrow(errorMessage); - }); - - it('should successfully execute tools that return normally', async () => { - const toolResult = { success: true, data: 'test result' }; - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({}), - execute: async () => toolResult, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{}', - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - // Verify that the iterator was called with successful tool results - expect(mockIterator.next).toHaveBeenCalledTimes(2); - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(1); - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'test-call-id', - toolName: 'testTool', - output: { - // Object results use 'json' type with raw value (not stringified) - type: 'json', - value: toolResult, - }, - }); - }); - - it('should skip local execution for provider-executed tools', async () => { - // This tool should NOT be called because the tool call is provider-executed - const executeFn = vi.fn(); - const tools: ToolSet = { - // This is a local tool - should never be called for provider-executed calls - localTool: { - description: 'A local tool', - inputSchema: z.object({}), - execute: executeFn, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - // Create a provider-executed tool result map - const providerExecutedToolResults = new Map(); - providerExecutedToolResults.set('provider-call-id', { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - result: 'Search results for: test query', - isError: false, - }); - - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - input: '{"query":"test query"}', - providerExecuted: true, // This is a provider-executed tool - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - providerExecutedToolResults, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - // The local tool execute function should NOT have been called - expect(executeFn).not.toHaveBeenCalled(); - - // Verify that the iterator was called with the provider-executed tool result - expect(mockIterator.next).toHaveBeenCalledTimes(2); - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(1); - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - output: { - // String results use 'text' type with raw value - type: 'text', - value: 'Search results for: test query', - }, - }); - }); - - it('should handle mixed provider-executed and local tools', async () => { - const localToolResult = { local: 'result' }; - const localExecuteFn = vi.fn().mockResolvedValue(localToolResult); - const tools: ToolSet = { - localTool: { - description: 'A local tool', - inputSchema: z.object({}), - execute: localExecuteFn, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - // Create a provider-executed tool result map - const providerExecutedToolResults = new Map(); - providerExecutedToolResults.set('provider-call-id', { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - result: { searchResults: ['result1', 'result2'] }, - isError: false, - }); - - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - // Local tool call - should be executed locally - { - toolCallId: 'local-call-id', - toolName: 'localTool', - input: '{}', - providerExecuted: false, - } as LanguageModelV2ToolCall, - // Provider-executed tool call - should use stream result - { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - input: '{"query":"test"}', - providerExecuted: true, - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - providerExecutedToolResults, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - // The local tool execute function SHOULD have been called - expect(localExecuteFn).toHaveBeenCalledTimes(1); - - // Verify that the iterator was called with both tool results - expect(mockIterator.next).toHaveBeenCalledTimes(2); - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(2); - - // First result should be from local tool (object result uses 'json' type) - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'local-call-id', - toolName: 'localTool', - output: { - type: 'json', - value: localToolResult, - }, - }); - - // Second result should be from provider-executed tool (object result uses 'json' type) - expect(toolResultsCall[1]).toMatchObject({ - type: 'tool-result', - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - output: { - type: 'json', - value: { searchResults: ['result1', 'result2'] }, - }, - }); - }); - - it('should handle provider-executed tool errors with isError flag', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - // Create a provider-executed tool result with isError: true - const providerExecutedToolResults = new Map(); - providerExecutedToolResults.set('provider-call-id', { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - result: 'Search failed: Rate limit exceeded', - isError: true, - }); - - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - input: '{"query":"test query"}', - providerExecuted: true, - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - providerExecutedToolResults, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - // Verify that the iterator was called with error-text output type - expect(mockIterator.next).toHaveBeenCalledTimes(2); - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(1); - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'provider-call-id', - toolName: 'WebSearch', - output: { - // String error results use 'error-text' type with raw value - type: 'error-text', - value: 'Search failed: Rate limit exceeded', - }, - }); - }); - - it('should warn and return empty result when provider-executed tool result is missing', async () => { - const consoleWarnSpy = vi - .spyOn(console, 'warn') - .mockImplementation(() => {}); - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - // Empty map - no provider results available - const providerExecutedToolResults = new Map(); - - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'missing-result-id', - toolName: 'WebSearch', - input: '{"query":"test query"}', - providerExecuted: true, - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - providerExecutedToolResults, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - // Verify warning was logged - expect(consoleWarnSpy).toHaveBeenCalledWith( - expect.stringContaining('Provider-executed tool "WebSearch"') - ); - expect(consoleWarnSpy).toHaveBeenCalledWith( - expect.stringContaining('missing-result-id') - ); - - // Verify empty result was returned - const toolResultsCall = mockIterator.next.mock.calls[1][0]; - expect(toolResultsCall).toBeDefined(); - expect(toolResultsCall).toHaveLength(1); - expect(toolResultsCall[0]).toMatchObject({ - type: 'tool-result', - toolCallId: 'missing-result-id', - toolName: 'WebSearch', - output: { - type: 'text', - value: '', - }, - }); - - consoleWarnSpy.mockRestore(); - }); - }); - - describe('prepareStep callback', () => { - it('should pass prepareStep callback to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const prepareStep: PrepareStepCallback = vi.fn().mockReturnValue({}); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - prepareStep, - }); - - // Verify streamTextIterator was called with prepareStep - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - prepareStep, - }) - ); - }); - - it('should allow prepareStep to modify messages', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const injectedMessage = { - role: 'user' as const, - content: [{ type: 'text' as const, text: 'injected message' }], - }; - - const prepareStep: PrepareStepCallback = ({ messages }) => { - return { - messages: [...messages, injectedMessage], - }; - }; - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - prepareStep, - }); - - // Verify prepareStep was passed to the iterator - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - prepareStep: expect.any(Function), - }) - ); - }); - - it('should allow prepareStep to change model dynamically', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const prepareStep: PrepareStepCallback = ({ stepNumber }) => { - // Switch to a different model after step 0 - if (stepNumber > 0) { - return { - model: 'anthropic/claude-sonnet-4.5', - }; - } - return {}; - }; - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - prepareStep, - }); - - // Verify prepareStep was passed to the iterator - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - prepareStep: expect.any(Function), - }) - ); - }); - - it('should provide step information to prepareStep callback', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const prepareStepCalls: Array<{ - model: unknown; - stepNumber: number; - steps: unknown[]; - messages: LanguageModelV2Prompt; - }> = []; - - const prepareStep: PrepareStepCallback = (info) => { - prepareStepCalls.push({ - model: info.model, - stepNumber: info.stepNumber, - steps: info.steps, - messages: info.messages, - }); - return {}; - }; - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - prepareStep, - }); - - // Verify prepareStep was passed and the function captures expected params - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - prepareStep: expect.any(Function), - }) - ); - }); - }); - - describe('tool execution with messages', () => { - it('should pass conversation messages to tool execute function', async () => { - // Track what messages were passed to the tool - let receivedMessages: unknown; - let receivedToolCallId: string | undefined; - - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({ query: z.string() }), - execute: async (_input, options) => { - receivedMessages = options.messages; - receivedToolCallId = options.toolCallId; - return { result: 'success' }; - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - // Mock conversation messages that would be accumulated by the iterator - const conversationMessages: LanguageModelV2Prompt = [ - { - role: 'user', - content: [{ type: 'text', text: 'What is the weather?' }], - }, - { - role: 'assistant', - content: [ - { - type: 'tool-call', - toolCallId: 'test-call-id', - toolName: 'testTool', - input: { query: 'weather' }, - }, - ], - }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{"query":"weather"}', - } as LanguageModelV2ToolCall, - ], - messages: conversationMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'What is the weather?' }], - writable: mockWritable, - }); - - // Verify that messages were passed to the tool - expect(receivedToolCallId).toBe('test-call-id'); - expect(receivedMessages).toBeDefined(); - expect(Array.isArray(receivedMessages)).toBe(true); - expect(receivedMessages).toEqual(conversationMessages); - }); - - it('should pass messages to multiple tools in parallel execution', async () => { - // Track messages received by each tool - const receivedByTools: Record = {}; - - const tools: ToolSet = { - weatherTool: { - description: 'Get weather', - inputSchema: z.object({ city: z.string() }), - execute: async (_input, options) => { - receivedByTools['weatherTool'] = options.messages; - return { temp: 72 }; - }, - }, - newsTool: { - description: 'Get news', - inputSchema: z.object({ topic: z.string() }), - execute: async (_input, options) => { - receivedByTools['newsTool'] = options.messages; - return { headlines: ['News 1'] }; - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const conversationMessages: LanguageModelV2Prompt = [ - { - role: 'user', - content: [{ type: 'text', text: 'Weather and news please' }], - }, - { - role: 'assistant', - content: [ - { - type: 'tool-call', - toolCallId: 'weather-call', - toolName: 'weatherTool', - input: { city: 'NYC' }, - }, - { - type: 'tool-call', - toolCallId: 'news-call', - toolName: 'newsTool', - input: { topic: 'tech' }, - }, - ], - }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'weather-call', - toolName: 'weatherTool', - input: '{"city":"NYC"}', - } as LanguageModelV2ToolCall, - { - toolCallId: 'news-call', - toolName: 'newsTool', - input: '{"topic":"tech"}', - } as LanguageModelV2ToolCall, - ], - messages: conversationMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'Weather and news please' }], - writable: mockWritable, - }); - - // Both tools should have received the same conversation messages - expect(receivedByTools['weatherTool']).toEqual(conversationMessages); - expect(receivedByTools['newsTool']).toEqual(conversationMessages); - }); - - it('should pass updated messages on subsequent tool call rounds', async () => { - // Track messages received in each round - const messagesPerRound: unknown[] = []; - - const tools: ToolSet = { - searchTool: { - description: 'Search for info', - inputSchema: z.object({ query: z.string() }), - execute: async (_input, options) => { - messagesPerRound.push(options.messages); - return { found: true }; - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - // First round messages - const firstRoundMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'Search for cats' }] }, - { - role: 'assistant', - content: [ - { - type: 'tool-call', - toolCallId: 'search-1', - toolName: 'searchTool', - input: { query: 'cats' }, - }, - ], - }, - ]; - - // Second round messages (includes first tool result) - const secondRoundMessages: LanguageModelV2Prompt = [ - ...firstRoundMessages, - { - role: 'tool', - content: [ - { - type: 'tool-result', - toolCallId: 'search-1', - toolName: 'searchTool', - output: { type: 'text', value: '{"found":true}' }, - }, - ], - }, - { - role: 'assistant', - content: [ - { - type: 'tool-call', - toolCallId: 'search-2', - toolName: 'searchTool', - input: { query: 'dogs' }, - }, - ], - }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - // First tool call round - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'search-1', - toolName: 'searchTool', - input: '{"query":"cats"}', - } as LanguageModelV2ToolCall, - ], - messages: firstRoundMessages, - }, - }) - // Second tool call round - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'search-2', - toolName: 'searchTool', - input: '{"query":"dogs"}', - } as LanguageModelV2ToolCall, - ], - messages: secondRoundMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'Search for cats' }], - writable: mockWritable, - }); - - // Verify messages grow with each round - expect(messagesPerRound).toHaveLength(2); - expect(messagesPerRound[0]).toEqual(firstRoundMessages); - expect(messagesPerRound[1]).toEqual(secondRoundMessages); - // Second round should have more messages than first - expect((messagesPerRound[1] as unknown[]).length).toBeGreaterThan( - (messagesPerRound[0] as unknown[]).length - ); - }); - }); - - describe('generation settings', () => { - it('should pass generation settings from constructor to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - temperature: 0.7, - maxOutputTokens: 1000, - topP: 0.9, - seed: 42, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - generationSettings: expect.objectContaining({ - temperature: 0.7, - maxOutputTokens: 1000, - topP: 0.9, - seed: 42, - }), - }) - ); - }); - - it('should allow stream options to override constructor generation settings', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - temperature: 0.7, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - temperature: 0.3, // Override - maxOutputTokens: 500, // New setting - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - generationSettings: expect.objectContaining({ - temperature: 0.3, - maxOutputTokens: 500, - }), - }) - ); - }); - }); - - describe('maxSteps', () => { - it('should pass maxSteps to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - maxSteps: 5, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - maxSteps: 5, - }) - ); - }); - }); - - describe('toolChoice', () => { - it('should pass toolChoice from constructor to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - toolChoice: 'required', - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - toolChoice: 'required', - }) - ); - }); - - it('should allow stream options to override constructor toolChoice', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - toolChoice: 'auto', - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - toolChoice: 'none', - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - toolChoice: 'none', - }) - ); - }); - }); - - describe('activeTools', () => { - it('should filter tools when activeTools is specified', async () => { - const tools: ToolSet = { - tool1: { - description: 'Tool 1', - inputSchema: z.object({}), - execute: async () => ({}), - }, - tool2: { - description: 'Tool 2', - inputSchema: z.object({}), - execute: async () => ({}), - }, - tool3: { - description: 'Tool 3', - inputSchema: z.object({}), - execute: async () => ({}), - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - // Clear previous mock calls - vi.mocked(streamTextIterator).mockClear(); - - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - activeTools: ['tool1', 'tool3'], - }); - - // Verify only active tools are passed (get the most recent call) - const calls = vi.mocked(streamTextIterator).mock.calls; - const lastCall = calls[calls.length - 1][0]; - expect(Object.keys(lastCall.tools).sort()).toEqual(['tool1', 'tool3']); - }); - }); - - describe('callbacks', () => { - it('should pass onError callback to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const onError = vi.fn(); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - onError, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - onError, - }) - ); - }); - - it('should call onError when tool execution fails', async () => { - const toolError = new Error('Tool execution failed'); - const tools: ToolSet = { - failingTool: { - description: 'A tool that fails', - inputSchema: z.object({}), - execute: async () => { - throw toolError; - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'failingTool', - input: '{}', - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const onError = vi.fn(); - - await expect( - agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - onError, - }) - ).rejects.toThrow('Tool execution failed'); - - expect(onError).toHaveBeenCalledWith({ error: toolError }); - }); - - it('should call onFinish with steps and messages when streaming completes', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockStep: StepResult = { - content: [{ type: 'text', text: 'Hello' }], - text: 'Hello', - reasoningText: undefined, - reasoning: [], - files: [], - sources: [], - toolCalls: [], - toolResults: [], - finishReason: 'stop', - usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 }, - request: {}, - response: { - id: 'test-id', - modelId: 'test-model', - timestamp: new Date(), - }, - warnings: [], - // We're missing some properties that aren't relevant for the test - } as unknown as StepResult; - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [], - messages: mockMessages, - step: mockStep, - }, - }) - .mockResolvedValueOnce({ done: true, value: mockMessages }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const onFinish = vi.fn(); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - onFinish, - }); - - expect(onFinish).toHaveBeenCalledWith( - expect.objectContaining({ - steps: expect.any(Array), - messages: expect.any(Array), - experimental_context: undefined, - }) - ); - }); - - it('should call onAbort when abort signal is already aborted', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const onAbort = vi.fn(); - const abortController = new AbortController(); - abortController.abort(); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - abortSignal: abortController.signal, - onAbort, - }); - - expect(onAbort).toHaveBeenCalledWith({ steps: [] }); - }); - }); - - describe('experimental_context', () => { - it('should pass experimental_context to tool execute function', async () => { - let receivedContext: unknown; - - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({}), - execute: async (_input, options) => { - receivedContext = options.experimental_context; - return { result: 'success' }; - }, - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{}', - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - context: { userId: '123', sessionId: 'abc' }, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - experimental_context: { userId: '123', sessionId: 'abc' }, - }); - - expect(receivedContext).toEqual({ userId: '123', sessionId: 'abc' }); - }); - }); - - describe('stream result', () => { - it('should return messages and steps in result', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const mockStep: StepResult = { - content: [{ type: 'text', text: 'Hello' }], - text: 'Hello', - reasoningText: undefined, - reasoning: [], - files: [], - sources: [], - toolCalls: [], - toolResults: [], - finishReason: 'stop', - usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 }, - request: {}, - response: { - id: 'test-id', - modelId: 'test-model', - timestamp: new Date(), - }, - warnings: [], - // We're missing some properties that aren't relevant for the test - } as unknown as StepResult; - const finalMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - { role: 'assistant', content: [{ type: 'text', text: 'Hello' }] }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [], - messages: finalMessages, - step: mockStep, - }, - }) - .mockResolvedValueOnce({ done: true, value: finalMessages }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - expect(result.messages).toBeDefined(); - expect(result.steps).toHaveLength(1); - expect(result.steps[0]).toEqual(mockStep); - }); - }); - - describe('tool call repair', () => { - it('should use repair function when tool call fails to parse', async () => { - const repairFn: ToolCallRepairFunction = vi - .fn() - .mockReturnValue({ - toolCallId: 'test-call-id', - toolName: 'testTool', - input: '{"name":"repaired"}', // Fixed input with valid schema - }); - - const tools: ToolSet = { - testTool: { - description: 'A test tool', - inputSchema: z.object({ name: z.string() }), - execute: async () => ({ result: 'success' }), - }, - }; - - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const mockMessages: LanguageModelV2Prompt = [ - { role: 'user', content: [{ type: 'text', text: 'test' }] }, - ]; - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi - .fn() - .mockResolvedValueOnce({ - done: false, - value: { - toolCalls: [ - { - toolCallId: 'test-call-id', - toolName: 'testTool', - input: 'invalid json', // This will fail to parse - } as LanguageModelV2ToolCall, - ], - messages: mockMessages, - }, - }) - .mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - experimental_repairToolCall: repairFn, - }); - - // Verify repair function was called - expect(repairFn).toHaveBeenCalledWith( - expect.objectContaining({ - toolCall: expect.objectContaining({ - toolCallId: 'test-call-id', - toolName: 'testTool', - }), - tools, - error: expect.any(Error), - messages: mockMessages, - }) - ); - }); - }); - - describe('includeRawChunks', () => { - it('should pass includeRawChunks to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - includeRawChunks: true, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - includeRawChunks: true, - }) - ); - }); - }); - - describe('experimental_telemetry', () => { - it('should pass telemetry settings from constructor to streamTextIterator', async () => { - const mockModel = createMockModel(); - - const telemetrySettings = { - isEnabled: true, - functionId: 'test-agent', - metadata: { version: '1.0' }, - }; - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - experimental_telemetry: telemetrySettings, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - experimental_telemetry: telemetrySettings, - }) - ); - }); - - it('should allow stream options to override constructor telemetry', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - experimental_telemetry: { functionId: 'constructor-id' }, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const streamTelemetry = { functionId: 'stream-id', isEnabled: false }; - - await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - experimental_telemetry: streamTelemetry, - }); - - expect(streamTextIterator).toHaveBeenCalledWith( - expect.objectContaining({ - experimental_telemetry: streamTelemetry, - }) - ); - }); - }); - - describe('collectUIMessages', () => { - it('should return undefined uiMessages when collectUIMessages is false', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - collectUIMessages: false, - }); - - expect(result.uiMessages).toBeUndefined(); - }); - - it('should return undefined uiMessages when collectUIMessages is not set', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - }); - - expect(result.uiMessages).toBeUndefined(); - }); - - it('should pass collectUIChunks to streamTextIterator when collectUIMessages is true', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const mockWritable = new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - let capturedCollectUIChunks: boolean | undefined; - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockImplementation((opts) => { - capturedCollectUIChunks = opts.collectUIChunks; - return mockIterator as unknown as MockIterator; - }); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - collectUIMessages: true, - }); - - // When collectUIMessages is true, collectUIChunks should be passed to streamTextIterator - expect(capturedCollectUIChunks).toBe(true); - - // uiMessages should be defined (even if empty, since we're mocking) - expect(result.uiMessages).toBeDefined(); - expect(Array.isArray(result.uiMessages)).toBe(true); - }); - - it('should work correctly when collectUIMessages is true and sendFinish is false', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const writtenChunks: unknown[] = []; - const closeFn = vi.fn(); - const mockWritable = new WritableStream({ - write: (chunk) => { - writtenChunks.push(chunk); - }, - close: closeFn, - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - collectUIMessages: true, - sendFinish: false, - }); - - // uiMessages should still be defined even when sendFinish is false - expect(result.uiMessages).toBeDefined(); - expect(Array.isArray(result.uiMessages)).toBe(true); - - // The original writable should have been closed (since preventClose defaults to false) - expect(closeFn).toHaveBeenCalled(); - - // No finish chunk should have been written to the client - expect( - writtenChunks.find((c: any) => c.type === 'finish') - ).toBeUndefined(); - }); - - it('should not write finish chunk but still return uiMessages when sendFinish is false', async () => { - const mockModel = createMockModel(); - - const agent = new DurableAgent({ - model: async () => mockModel, - tools: {}, - }); - - const writtenChunks: unknown[] = []; - const mockWritable = new WritableStream({ - write: (chunk) => { - writtenChunks.push(chunk); - }, - close: vi.fn(), - }); - - const { streamTextIterator } = await import('./stream-text-iterator.js'); - const mockIterator = { - next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }), - }; - vi.mocked(streamTextIterator).mockReturnValue( - mockIterator as unknown as MockIterator - ); - - const result = await agent.stream({ - messages: [{ role: 'user', content: 'test' }], - writable: mockWritable, - collectUIMessages: true, - sendFinish: false, - preventClose: true, - }); - - // uiMessages should be available even with sendFinish=false and preventClose=true - expect(result.uiMessages).toBeDefined(); - expect(Array.isArray(result.uiMessages)).toBe(true); - }); - }); -}); diff --git a/packages/ai/src/agent/durable-agent.ts b/packages/ai/src/agent/durable-agent.ts deleted file mode 100644 index 6de6205a5d..0000000000 --- a/packages/ai/src/agent/durable-agent.ts +++ /dev/null @@ -1,1203 +0,0 @@ -import type { - LanguageModelV2CallOptions, - LanguageModelV2Prompt, - LanguageModelV2StreamPart, - LanguageModelV2ToolCall, - LanguageModelV2ToolResultPart, - SharedV2ProviderOptions, -} from '@ai-sdk/provider'; -import { - asSchema, - type FinishReason, - type LanguageModelResponseMetadata, - type LanguageModelUsage, - type ModelMessage, - Output, - readUIMessageStream, - type StepResult, - type StopCondition, - type StreamTextOnStepFinishCallback, - type ToolChoice, - type ToolSet, - type UIMessage, - type UIMessageChunk, -} from 'ai'; -import { convertToLanguageModelPrompt, standardizePrompt } from 'ai/internal'; -import { FatalError } from 'workflow'; -import { streamTextIterator } from './stream-text-iterator.js'; -import type { CompatibleLanguageModel } from './types.js'; - -// Re-export for consumers -export type { CompatibleLanguageModel } from './types.js'; - -/** - * Re-export the Output helper for structured output specifications. - * Use `Output.object({ schema })` for structured output or `Output.text()` for text output. - */ -export { Output }; - -/** - * Output specification interface for structured outputs. - * Use `Output.object({ schema })` or `Output.text()` to create an output specification. - */ -export interface OutputSpecification { - readonly type: 'object' | 'text'; - responseFormat: LanguageModelV2CallOptions['responseFormat']; - parsePartial(options: { - text: string; - }): Promise<{ partial: PARTIAL } | undefined>; - parseOutput( - options: { text: string }, - context: { - response: LanguageModelResponseMetadata; - usage: LanguageModelUsage; - finishReason: FinishReason; - } - ): Promise; -} - -/** - * Provider-specific options type. This is equivalent to SharedV2ProviderOptions from @ai-sdk/provider. - */ -export type ProviderOptions = SharedV2ProviderOptions; - -/** - * Telemetry settings for observability. - */ -export interface TelemetrySettings { - /** - * Enable or disable telemetry. Defaults to true. - */ - isEnabled?: boolean; - - /** - * Identifier for this function. Used to group telemetry data by function. - */ - functionId?: string; - - /** - * Additional information to include in the telemetry data. - */ - metadata?: Record< - string, - | string - | number - | boolean - | Array - | null - | undefined - >; - - /** - * Custom tracer for the telemetry. - */ - tracer?: unknown; -} - -/** - * A transformation that is applied to the stream. - */ -export type StreamTextTransform = (options: { - tools: TTools; - stopStream: () => void; -}) => TransformStream; - -/** - * Function to repair a tool call that failed to parse. - */ -export type ToolCallRepairFunction = (options: { - toolCall: LanguageModelV2ToolCall; - tools: TTools; - error: unknown; - messages: LanguageModelV2Prompt; -}) => Promise | LanguageModelV2ToolCall | null; - -/** - * Custom download function for URLs. - * The function receives an array of URLs with information about whether - * the model supports them directly. - */ -export type DownloadFunction = ( - options: { - url: URL; - isUrlSupportedByModel: boolean; - }[] -) => PromiseLike< - ({ data: Uint8Array; mediaType: string | undefined } | null)[] ->; - -/** - * Generation settings that can be passed to the model. - * These map directly to LanguageModelV2CallOptions. - */ -export interface GenerationSettings { - /** - * Maximum number of tokens to generate. - */ - maxOutputTokens?: number; - - /** - * Temperature setting. The range depends on the provider and model. - * It is recommended to set either `temperature` or `topP`, but not both. - */ - temperature?: number; - - /** - * Nucleus sampling. This is a number between 0 and 1. - * E.g. 0.1 would mean that only tokens with the top 10% probability mass are considered. - * It is recommended to set either `temperature` or `topP`, but not both. - */ - topP?: number; - - /** - * Only sample from the top K options for each subsequent token. - * Used to remove "long tail" low probability responses. - * Recommended for advanced use cases only. You usually only need to use temperature. - */ - topK?: number; - - /** - * Presence penalty setting. It affects the likelihood of the model to - * repeat information that is already in the prompt. - * The presence penalty is a number between -1 (increase repetition) - * and 1 (maximum penalty, decrease repetition). 0 means no penalty. - */ - presencePenalty?: number; - - /** - * Frequency penalty setting. It affects the likelihood of the model - * to repeatedly use the same words or phrases. - * The frequency penalty is a number between -1 (increase repetition) - * and 1 (maximum penalty, decrease repetition). 0 means no penalty. - */ - frequencyPenalty?: number; - - /** - * Stop sequences. If set, the model will stop generating text when one of the stop sequences is generated. - * Providers may have limits on the number of stop sequences. - */ - stopSequences?: string[]; - - /** - * The seed (integer) to use for random sampling. If set and supported - * by the model, calls will generate deterministic results. - */ - seed?: number; - - /** - * Maximum number of retries. Set to 0 to disable retries. - * Note: In workflow context, retries are typically handled by the workflow step mechanism. - * @default 2 - */ - maxRetries?: number; - - /** - * Abort signal for cancelling the operation. - */ - abortSignal?: AbortSignal; - - /** - * Additional HTTP headers to be sent with the request. - * Only applicable for HTTP-based providers. - */ - headers?: Record; - - /** - * Additional provider-specific options. They are passed through - * to the provider from the AI SDK and enable provider-specific - * functionality that can be fully encapsulated in the provider. - */ - providerOptions?: ProviderOptions; -} - -/** - * Information passed to the prepareStep callback. - */ -export interface PrepareStepInfo { - /** - * The current model configuration (string or function). - * The function should return a LanguageModel instance (V2 or V3 depending on AI SDK version). - */ - model: string | (() => Promise); - - /** - * The current step number (0-indexed). - */ - stepNumber: number; - - /** - * All previous steps with their results. - */ - steps: StepResult[]; - - /** - * The messages that will be sent to the model. - * This is the LanguageModelV2Prompt format used internally. - */ - messages: LanguageModelV2Prompt; - - /** - * The context passed via the experimental_context setting (experimental). - */ - experimental_context: unknown; -} - -/** - * Return type from the prepareStep callback. - * All properties are optional - only return the ones you want to override. - */ -export interface PrepareStepResult extends Partial { - /** - * Override the model for this step. - * The function should return a LanguageModel instance (V2 or V3 depending on AI SDK version). - */ - model?: string | (() => Promise); - - /** - * Override the system message for this step. - */ - system?: string; - - /** - * Override the messages for this step. - * Use this for context management or message injection. - */ - messages?: LanguageModelV2Prompt; - - /** - * Override the tool choice for this step. - */ - toolChoice?: ToolChoice; - - /** - * Override the active tools for this step. - * Limits the tools that are available for the model to call. - */ - activeTools?: string[]; - - /** - * Context that is passed into tool execution. Experimental. - * Changing the context will affect the context in this step and all subsequent steps. - */ - experimental_context?: unknown; -} - -/** - * Callback function called before each step in the agent loop. - * Use this to modify settings, manage context, or implement dynamic behavior. - */ -export type PrepareStepCallback = ( - info: PrepareStepInfo -) => PrepareStepResult | Promise; - -/** - * Configuration options for creating a {@link DurableAgent} instance. - */ -export interface DurableAgentOptions extends GenerationSettings { - /** - * The model provider to use for the agent. - * - * This should be a string compatible with the Vercel AI Gateway (e.g., 'anthropic/claude-opus'), - * or a step function that returns a LanguageModel instance (V2 or V3 depending on AI SDK version). - */ - model: string | (() => Promise); - - /** - * A set of tools available to the agent. - * Tools can be implemented as workflow steps for automatic retries and persistence, - * or as regular workflow-level logic using core library features like sleep() and Hooks. - */ - tools?: ToolSet; - - /** - * Optional system prompt to guide the agent's behavior. - */ - system?: string; - - /** - * The tool choice strategy. Default: 'auto'. - */ - toolChoice?: ToolChoice; - - /** - * Optional telemetry configuration (experimental). - */ - experimental_telemetry?: TelemetrySettings; -} - -/** - * Callback that is called when the LLM response and all request tool executions are finished. - */ -export type StreamTextOnFinishCallback< - TTools extends ToolSet = ToolSet, - OUTPUT = never, -> = (event: { - /** - * Details for all steps. - */ - readonly steps: StepResult[]; - - /** - * The final messages including all tool calls and results. - */ - readonly messages: ModelMessage[]; - - /** - * Context that is passed into tool execution. - */ - readonly experimental_context: unknown; - - /** - * The generated structured output. It uses the `experimental_output` specification. - * Only available when `experimental_output` is specified. - */ - readonly experimental_output: OUTPUT; -}) => PromiseLike | void; - -/** - * Callback that is invoked when an error occurs during streaming. - */ -export type StreamTextOnErrorCallback = (event: { - error: unknown; -}) => PromiseLike | void; - -/** - * Callback that is set using the `onAbort` option. - */ -export type StreamTextOnAbortCallback = - (event: { - /** - * Details for all previously finished steps. - */ - readonly steps: StepResult[]; - }) => PromiseLike | void; - -/** - * Options for the {@link DurableAgent.stream} method. - */ -export interface DurableAgentStreamOptions< - TTools extends ToolSet = ToolSet, - OUTPUT = never, - PARTIAL_OUTPUT = never, -> extends Partial { - /** - * The conversation messages to process. Should follow the AI SDK's ModelMessage format. - */ - messages: ModelMessage[]; - - /** - * Optional system prompt override. If provided, overrides the system prompt from the constructor. - */ - system?: string; - - /** - * The stream to which the agent writes message chunks. For example, use `getWritable()` to write to the workflow's default output stream. - */ - writable: WritableStream; - - /** - * If true, prevents the writable stream from being closed after streaming completes. - * Defaults to false (stream will be closed). - */ - preventClose?: boolean; - - /** - * If true, sends a 'start' chunk at the beginning of the stream. - * Defaults to true. - */ - sendStart?: boolean; - - /** - * If true, sends a 'finish' chunk at the end of the stream. - * Defaults to true. - */ - sendFinish?: boolean; - - /** - * Condition for stopping the generation when there are tool results in the last step. - * When the condition is an array, any of the conditions can be met to stop the generation. - */ - stopWhen?: - | StopCondition> - | Array>>; - - /** - * Maximum number of sequential LLM calls (steps), e.g. when you use tool calls. - * A maximum number can be set to prevent infinite loops in the case of misconfigured tools. - * By default, it's unlimited (the agent loops until completion). - */ - maxSteps?: number; - - /** - * The tool choice strategy. Default: 'auto'. - * Overrides the toolChoice from the constructor if provided. - */ - toolChoice?: ToolChoice; - - /** - * Limits the tools that are available for the model to call without - * changing the tool call and result types in the result. - */ - activeTools?: Array>; - - /** - * Optional telemetry configuration (experimental). - */ - experimental_telemetry?: TelemetrySettings; - - /** - * Context that is passed into tool execution. - * Experimental (can break in patch releases). - * @default undefined - */ - experimental_context?: unknown; - - /** - * Optional specification for parsing structured outputs from the LLM response. - * Use `Output.object({ schema })` for structured output or `Output.text()` for text output. - * - * @example - * ```typescript - * import { Output } from '@workflow/ai'; - * import { z } from 'zod'; - * - * const result = await agent.stream({ - * messages: [...], - * writable: getWritable(), - * experimental_output: Output.object({ - * schema: z.object({ - * sentiment: z.enum(['positive', 'negative', 'neutral']), - * confidence: z.number(), - * }), - * }), - * }); - * - * console.log(result.experimental_output); // { sentiment: 'positive', confidence: 0.95 } - * ``` - */ - experimental_output?: OutputSpecification; - - /** - * Whether to include raw chunks from the provider in the stream. - * When enabled, you will receive raw chunks with type 'raw' that contain the unprocessed data from the provider. - * This allows access to cutting-edge provider features not yet wrapped by the AI SDK. - * Defaults to false. - */ - includeRawChunks?: boolean; - - /** - * A function that attempts to repair a tool call that failed to parse. - */ - experimental_repairToolCall?: ToolCallRepairFunction; - - /** - * Optional stream transformations. - * They are applied in the order they are provided. - * The stream transformations must maintain the stream structure for streamText to work correctly. - */ - experimental_transform?: - | StreamTextTransform - | Array>; - - /** - * Custom download function to use for URLs. - * By default, files are downloaded if the model does not support the URL for the given media type. - */ - experimental_download?: DownloadFunction; - - /** - * Callback function to be called after each step completes. - */ - onStepFinish?: StreamTextOnStepFinishCallback; - - /** - * Callback that is invoked when an error occurs during streaming. - * You can use it to log errors. - */ - onError?: StreamTextOnErrorCallback; - - /** - * Callback that is called when the LLM response and all request tool executions - * (for tools that have an `execute` function) are finished. - */ - onFinish?: StreamTextOnFinishCallback; - - /** - * Callback that is called when the operation is aborted. - */ - onAbort?: StreamTextOnAbortCallback; - - /** - * Callback function called before each step in the agent loop. - * Use this to modify settings, manage context, or inject messages dynamically. - * - * @example - * ```typescript - * prepareStep: async ({ messages, stepNumber }) => { - * // Inject messages from a queue - * const queuedMessages = await getQueuedMessages(); - * if (queuedMessages.length > 0) { - * return { - * messages: [...messages, ...queuedMessages], - * }; - * } - * return {}; - * } - * ``` - */ - prepareStep?: PrepareStepCallback; - - /** - * If true, accumulates UIMessage[] during streaming. - * The accumulated messages will be available in the `uiMessages` property of the result. - * This is useful when you need the final UIMessage representation after streaming completes, - * without having to re-read the stream. - * - * @default false - */ - collectUIMessages?: boolean; -} - -/** - * Result of the DurableAgent.stream method. - */ -export interface DurableAgentStreamResult< - TTools extends ToolSet = ToolSet, - OUTPUT = never, -> { - /** - * The final messages including all tool calls and results. - */ - messages: ModelMessage[]; - - /** - * Details for all steps. - */ - steps: StepResult[]; - - /** - * The generated structured output. It uses the `experimental_output` specification. - * Only available when `experimental_output` is specified. - */ - experimental_output: OUTPUT; - - /** - * The accumulated UI messages from the stream. - * Only available when `collectUIMessages` is set to `true` in the stream options. - */ - uiMessages?: UIMessage[]; -} - -/** - * A class for building durable AI agents within workflows. - * - * DurableAgent enables you to create AI-powered agents that can maintain state - * across workflow steps, call tools, and gracefully handle interruptions and resumptions. - * It integrates seamlessly with the AI SDK and the Workflow DevKit for - * production-grade reliability. - * - * @example - * ```typescript - * const agent = new DurableAgent({ - * model: 'anthropic/claude-opus', - * tools: { - * getWeather: { - * description: 'Get weather for a location', - * inputSchema: z.object({ location: z.string() }), - * execute: getWeatherStep, - * }, - * }, - * system: 'You are a helpful weather assistant.', - * }); - * - * await agent.stream({ - * messages: [{ role: 'user', content: 'What is the weather?' }], - * writable: getWritable(), - * }); - * ``` - */ -export class DurableAgent { - private model: string | (() => Promise); - private tools: TBaseTools; - private system?: string; - private generationSettings: GenerationSettings; - private toolChoice?: ToolChoice; - private telemetry?: TelemetrySettings; - - constructor(options: DurableAgentOptions & { tools?: TBaseTools }) { - this.model = options.model; - this.tools = (options.tools ?? {}) as TBaseTools; - this.system = options.system; - this.toolChoice = options.toolChoice as ToolChoice; - this.telemetry = options.experimental_telemetry; - - // Extract generation settings - this.generationSettings = { - maxOutputTokens: options.maxOutputTokens, - temperature: options.temperature, - topP: options.topP, - topK: options.topK, - presencePenalty: options.presencePenalty, - frequencyPenalty: options.frequencyPenalty, - stopSequences: options.stopSequences, - seed: options.seed, - maxRetries: options.maxRetries, - abortSignal: options.abortSignal, - headers: options.headers, - providerOptions: options.providerOptions, - }; - } - - generate() { - throw new Error('Not implemented'); - } - - async stream< - TTools extends TBaseTools = TBaseTools, - OUTPUT = never, - PARTIAL_OUTPUT = never, - >( - options: DurableAgentStreamOptions - ): Promise> { - const prompt = await standardizePrompt({ - system: options.system || this.system, - messages: options.messages, - }); - - const modelPrompt = await convertToLanguageModelPrompt({ - prompt, - supportedUrls: {}, - download: options.experimental_download, - }); - - // Merge generation settings: constructor defaults < stream options - const mergedGenerationSettings: GenerationSettings = { - ...this.generationSettings, - ...(options.maxOutputTokens !== undefined && { - maxOutputTokens: options.maxOutputTokens, - }), - ...(options.temperature !== undefined && { - temperature: options.temperature, - }), - ...(options.topP !== undefined && { topP: options.topP }), - ...(options.topK !== undefined && { topK: options.topK }), - ...(options.presencePenalty !== undefined && { - presencePenalty: options.presencePenalty, - }), - ...(options.frequencyPenalty !== undefined && { - frequencyPenalty: options.frequencyPenalty, - }), - ...(options.stopSequences !== undefined && { - stopSequences: options.stopSequences, - }), - ...(options.seed !== undefined && { seed: options.seed }), - ...(options.maxRetries !== undefined && { - maxRetries: options.maxRetries, - }), - ...(options.abortSignal !== undefined && { - abortSignal: options.abortSignal, - }), - ...(options.headers !== undefined && { headers: options.headers }), - ...(options.providerOptions !== undefined && { - providerOptions: options.providerOptions, - }), - }; - - // Determine effective tool choice - const effectiveToolChoice = options.toolChoice ?? this.toolChoice; - - // Filter tools if activeTools is specified - const effectiveTools = - options.activeTools && options.activeTools.length > 0 - ? filterTools(this.tools, options.activeTools as string[]) - : this.tools; - - // Initialize context - let experimentalContext = options.experimental_context; - - const steps: StepResult[] = []; - - // Check for abort before starting - if (mergedGenerationSettings.abortSignal?.aborted) { - if (options.onAbort) { - await options.onAbort({ steps }); - } - return { - messages: options.messages as unknown as ModelMessage[], - steps, - experimental_output: undefined as OUTPUT, - uiMessages: undefined, - }; - } - - // Track collected UI chunks if collectUIMessages is enabled - const collectUIChunks = options.collectUIMessages ?? false; - const allUIChunks: UIMessageChunk[] = []; - - const iterator = streamTextIterator({ - model: this.model, - tools: effectiveTools as ToolSet, - writable: options.writable, - prompt: modelPrompt, - stopConditions: options.stopWhen, - maxSteps: options.maxSteps, - sendStart: options.sendStart ?? true, - onStepFinish: options.onStepFinish, - onError: options.onError, - prepareStep: options.prepareStep, - generationSettings: mergedGenerationSettings, - toolChoice: effectiveToolChoice as ToolChoice, - experimental_context: experimentalContext, - experimental_telemetry: options.experimental_telemetry ?? this.telemetry, - includeRawChunks: options.includeRawChunks ?? false, - experimental_transform: options.experimental_transform as - | StreamTextTransform - | Array>, - responseFormat: options.experimental_output?.responseFormat, - collectUIChunks, - }); - - // Track the final conversation messages from the iterator - let finalMessages: LanguageModelV2Prompt | undefined; - let encounteredError: unknown; - let wasAborted = false; - - try { - let result = await iterator.next(); - while (!result.done) { - // Check for abort during iteration - if (mergedGenerationSettings.abortSignal?.aborted) { - wasAborted = true; - if (options.onAbort) { - await options.onAbort({ steps }); - } - break; - } - - const { - toolCalls, - messages: iterMessages, - step, - context, - uiChunks, - providerExecutedToolResults, - } = result.value; - if (step) { - // The step result is compatible with StepResult since we're using the same tools - steps.push(step as unknown as StepResult); - } - // Update context if changed by prepareStep - if (context !== undefined) { - experimentalContext = context; - } - // Collect UI chunks if enabled - if (uiChunks && uiChunks.length > 0) { - allUIChunks.push(...uiChunks); - } - - // Only execute tools if there are tool calls - if (toolCalls.length > 0) { - // Separate provider-executed tool calls from client-executed ones - const clientToolCalls = toolCalls.filter( - (tc) => !tc.providerExecuted - ); - const providerToolCalls = toolCalls.filter( - (tc) => tc.providerExecuted - ); - - // Execute client tools - const clientToolResults = await Promise.all( - clientToolCalls.map( - (toolCall): Promise => - executeTool( - toolCall, - effectiveTools as ToolSet, - iterMessages, - experimentalContext, - options.experimental_repairToolCall as ToolCallRepairFunction - ) - ) - ); - - // For provider-executed tools, use the results from the stream - const providerToolResults: LanguageModelV2ToolResultPart[] = - providerToolCalls.map((toolCall) => { - const streamResult = providerExecutedToolResults?.get( - toolCall.toolCallId - ); - if (streamResult) { - // Use the appropriate output type based on the result and error status - // AI SDK supports 'text'/'error-text' for strings and 'json'/'error-json' for objects - const result = streamResult.result; - const isString = typeof result === 'string'; - - return { - type: 'tool-result' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - output: isString - ? streamResult.isError - ? { type: 'error-text' as const, value: result } - : { type: 'text' as const, value: result } - : streamResult.isError - ? { - type: 'error-json' as const, - value: - result as LanguageModelV2ToolResultPart['output'] extends { - type: 'json'; - value: infer V; - } - ? V - : never, - } - : { - type: 'json' as const, - value: - result as LanguageModelV2ToolResultPart['output'] extends { - type: 'json'; - value: infer V; - } - ? V - : never, - }, - }; - } - // If no result from stream, return an empty result - // This can happen if the provider didn't send a tool-result stream part - console.warn( - `[DurableAgent] Provider-executed tool "${toolCall.toolName}" (${toolCall.toolCallId}) ` + - `did not receive a result from the stream. This may indicate a provider issue.` - ); - return { - type: 'tool-result' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - output: { - type: 'text' as const, - value: '', - }, - }; - }); - - // Combine results in the original order - const toolResults = toolCalls.map((tc) => { - const clientResult = clientToolResults.find( - (r) => r.toolCallId === tc.toolCallId - ); - if (clientResult) return clientResult; - const providerResult = providerToolResults.find( - (r) => r.toolCallId === tc.toolCallId - ); - if (providerResult) return providerResult; - // This should never happen, but return empty result as fallback - return { - type: 'tool-result' as const, - toolCallId: tc.toolCallId, - toolName: tc.toolName, - output: { type: 'text' as const, value: '' }, - }; - }); - - result = await iterator.next(toolResults); - } else { - // Final step with no tool calls - just advance the iterator - result = await iterator.next([]); - } - } - - // When the iterator completes normally, result.value contains the final conversation prompt - if (result.done) { - finalMessages = result.value; - } - } catch (error) { - encounteredError = error; - // Check if this is an abort error - if (error instanceof Error && error.name === 'AbortError') { - wasAborted = true; - if (options.onAbort) { - await options.onAbort({ steps }); - } - } else if (options.onError) { - // Call onError for non-abort errors (including tool execution errors) - await options.onError({ error }); - } - // Don't throw yet - we want to call onFinish first - } - - const sendFinish = options.sendFinish ?? true; - const preventClose = options.preventClose ?? false; - - // Handle stream closing - if (sendFinish || !preventClose) { - await closeStream(options.writable, preventClose, sendFinish); - } - - // Use the final messages from the iterator, or fall back to original messages - const messages = (finalMessages ?? - options.messages) as unknown as ModelMessage[]; - - // Parse structured output if experimental_output is specified - let experimentalOutput: OUTPUT = undefined as OUTPUT; - if (options.experimental_output && steps.length > 0) { - const lastStep = steps[steps.length - 1]; - const text = lastStep.text; - if (text) { - try { - experimentalOutput = await options.experimental_output.parseOutput( - { text }, - { - response: lastStep.response, - usage: lastStep.usage, - finishReason: lastStep.finishReason, - } - ); - } catch (parseError) { - // If there's already an error, don't override it - // If not, set this as the error - if (!encounteredError) { - encounteredError = parseError; - } - } - } - } - - // Call onFinish callback if provided (always call, even on errors, but not on abort) - if (options.onFinish && !wasAborted) { - await options.onFinish({ - steps, - messages: messages as ModelMessage[], - experimental_context: experimentalContext, - experimental_output: experimentalOutput, - }); - } - - // Re-throw any error that occurred - if (encounteredError) { - throw encounteredError; - } - - // Collect accumulated UI messages if requested - // This requires a step function since it performs stream operations - const uiMessages = collectUIChunks - ? await convertChunksToUIMessages(allUIChunks) - : undefined; - - return { - messages: messages as ModelMessage[], - steps, - experimental_output: experimentalOutput, - uiMessages, - }; - } -} - -/** - * Filter tools to only include the specified active tools. - */ -function filterTools( - tools: TTools, - activeTools: string[] -): ToolSet { - const filtered: ToolSet = {}; - for (const toolName of activeTools) { - if (toolName in tools) { - filtered[toolName] = tools[toolName]; - } - } - return filtered; -} - -async function writeFinishChunk(writable: WritableStream) { - 'use step'; - - const writer = writable.getWriter(); - try { - await writer.write({ type: 'finish' }); - } finally { - writer.releaseLock(); - } -} - -async function closeStream( - writable: WritableStream, - preventClose?: boolean, - sendFinish?: boolean -) { - 'use step'; - - // Conditionally write the finish chunk - if (sendFinish) { - await writeFinishChunk(writable); - } - - // Conditionally close the stream - if (!preventClose) { - await writable.close(); - } -} - -/** - * Convert UIMessageChunks to UIMessage[] using the AI SDK's readUIMessageStream. - * This must be a step function because it performs stream operations. - * - * @param chunks - The collected UIMessageChunks to convert - * @returns The accumulated UIMessage array - */ -async function convertChunksToUIMessages( - chunks: UIMessageChunk[] -): Promise { - 'use step'; - - if (chunks.length === 0) { - return []; - } - - // Create a readable stream from the collected chunks. - // AI SDK only supports conversion from UIMessageChunk[] to UIMessage[] - // as a streaming operation, so we need to wrap the chunks in a stream. - const chunkStream = new ReadableStream({ - start: (controller) => { - for (const chunk of chunks) { - controller.enqueue(chunk); - } - controller.close(); - }, - }); - - // Use the AI SDK's readUIMessageStream to convert chunks to messages - const messageStream = readUIMessageStream({ - stream: chunkStream, - onError: (error) => { - console.error('Error processing UI message chunks:', error); - }, - }); - - // Collect all message updates and return the final state - const messages: UIMessage[] = []; - for await (const message of messageStream) { - // readUIMessageStream yields updated versions of the message as it's built - // We want to collect the final state of each message - // Messages are identified by their id, so we update in place - const existingIndex = messages.findIndex((m) => m.id === message.id); - if (existingIndex >= 0) { - messages[existingIndex] = message; - } else { - messages.push(message); - } - } - - return messages; -} - -async function executeTool( - toolCall: LanguageModelV2ToolCall, - tools: ToolSet, - messages: LanguageModelV2Prompt, - experimentalContext?: unknown, - repairToolCall?: ToolCallRepairFunction -): Promise { - const tool = tools[toolCall.toolName]; - if (!tool) throw new Error(`Tool "${toolCall.toolName}" not found`); - if (typeof tool.execute !== 'function') - throw new Error( - `Tool "${toolCall.toolName}" does not have an execute function` - ); - const schema = asSchema(tool.inputSchema); - - let parsedInput: unknown; - try { - const input = await schema.validate?.(JSON.parse(toolCall.input || '{}')); - if (!input?.success) { - // Try to repair the tool call if a repair function is provided - if (repairToolCall) { - const repairedToolCall = await repairToolCall({ - toolCall, - tools, - error: input?.error, - messages, - }); - if (repairedToolCall) { - // Retry with repaired tool call - return executeTool( - repairedToolCall, - tools, - messages, - experimentalContext, - undefined // Don't pass repair function to prevent infinite loops - ); - } - } - throw new Error( - `Invalid input for tool "${toolCall.toolName}": ${input?.error?.message}` - ); - } - parsedInput = input.value; - } catch (parseError) { - // Try to repair the tool call if a repair function is provided - if (repairToolCall) { - const repairedToolCall = await repairToolCall({ - toolCall, - tools, - error: parseError, - messages, - }); - if (repairedToolCall) { - // Retry with repaired tool call - return executeTool( - repairedToolCall, - tools, - messages, - experimentalContext, - undefined // Don't pass repair function to prevent infinite loops - ); - } - } - throw parseError; - } - - try { - // Extract execute function to avoid binding `this` to the tool object. - // If we called `tool.execute(...)` directly, JavaScript would bind `this` - // to `tool`, which contains non-serializable properties like `inputSchema`. - // When the execute function is a workflow step (marked with 'use step'), - // the step system captures `this` for serialization, causing failures. - const { execute } = tool; - const toolResult = await execute(parsedInput, { - toolCallId: toolCall.toolCallId, - // Pass the conversation messages to the tool so it has context about the conversation - messages, - // Pass experimental context to the tool - experimental_context: experimentalContext, - }); - - // Use the appropriate output type based on the result - // AI SDK supports 'text' for strings and 'json' for objects - const output = - typeof toolResult === 'string' - ? { type: 'text' as const, value: toolResult } - : { type: 'json' as const, value: toolResult }; - - return { - type: 'tool-result' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - output, - }; - } catch (error) { - // If it's a FatalError, convert it to a tool error result that gets sent back to the LLM - // This mimics AI SDK behavior where tool call failures are propagated back to the model - if (FatalError.is(error)) { - return { - type: 'tool-result', - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - output: { - type: 'error-text', - value: error.message, - }, - }; - } - // This should technically never happen, since any error that's not FatalError would be caught in the step boundary and re-try the step - throw error; - } -} diff --git a/packages/ai/src/agent/stream-text-iterator.test.ts b/packages/ai/src/agent/stream-text-iterator.test.ts deleted file mode 100644 index 9104291137..0000000000 --- a/packages/ai/src/agent/stream-text-iterator.test.ts +++ /dev/null @@ -1,429 +0,0 @@ -/** - * Tests for streamTextIterator - * - * These tests verify that providerMetadata from tool calls is correctly - * mapped to providerOptions in the conversation prompt, which is critical - * for providers like Gemini that require thoughtSignature to be preserved - * across multi-turn tool calls. - */ -import type { - LanguageModelV2Prompt, - LanguageModelV2ToolCall, - LanguageModelV2ToolResultPart, -} from '@ai-sdk/provider'; -import type { StepResult, ToolSet, UIMessageChunk } from 'ai'; -import { describe, expect, it, vi, beforeEach } from 'vitest'; - -// Mock doStreamStep -vi.mock('./do-stream-step.js', () => ({ - doStreamStep: vi.fn(), -})); - -// Import after mocking -const { streamTextIterator } = await import('./stream-text-iterator.js'); -const { doStreamStep } = await import('./do-stream-step.js'); - -/** - * Helper to create a mock writable stream - */ -function createMockWritable(): WritableStream { - return new WritableStream({ - write: vi.fn(), - close: vi.fn(), - }); -} - -/** - * Helper to create a minimal step result for testing - */ -function createMockStepResult( - overrides: Partial> = {} -): StepResult { - return { - content: [], - text: '', - reasoning: [], - reasoningText: undefined, - files: [], - sources: [], - toolCalls: [], - staticToolCalls: [], - dynamicToolCalls: [], - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: 'stop', - usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, - warnings: [], - request: { body: '' }, - response: { - id: 'test', - timestamp: new Date(), - modelId: 'test', - messages: [], - }, - providerMetadata: {}, - ...overrides, - }; -} - -describe('streamTextIterator', () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - describe('providerMetadata to providerOptions mapping', () => { - it('should preserve providerMetadata as providerOptions in tool-call messages', async () => { - const mockWritable = createMockWritable(); - const mockModel = vi.fn(); - - // Capture the conversation prompt passed to subsequent doStreamStep calls - let capturedPrompt: LanguageModelV2Prompt | undefined; - - const toolCallWithMetadata: LanguageModelV2ToolCall = { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"}', - providerMetadata: { - google: { - thoughtSignature: 'sig_abc123_test_signature', - }, - }, - }; - - // First call returns tool-calls with providerMetadata - // Second call (after tool results) should receive the updated prompt - vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) - .mockImplementationOnce(async (prompt) => { - // Capture the prompt on the second call to verify providerOptions - capturedPrompt = prompt; - return { - toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; - }); - - const iterator = streamTextIterator({ - prompt: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], - tools: { - testTool: { - description: 'A test tool', - execute: async () => ({ result: 'success' }), - }, - } as ToolSet, - writable: mockWritable, - model: mockModel as any, - }); - - // First iteration - get tool calls - const firstResult = await iterator.next(); - expect(firstResult.done).toBe(false); - expect(firstResult.value.toolCalls).toHaveLength(1); - - // Provide tool results and continue - const toolResults: LanguageModelV2ToolResultPart[] = [ - { - type: 'tool-result', - toolCallId: 'call-1', - toolName: 'testTool', - output: { type: 'text', value: '{"result":"success"}' }, - }, - ]; - - // Second iteration - should trigger second doStreamStep call - const secondResult = await iterator.next(toolResults); - - // Verify the captured prompt contains providerOptions - expect(capturedPrompt).toBeDefined(); - - // Find the assistant message with tool calls - const assistantMessage = capturedPrompt?.find( - (msg) => msg.role === 'assistant' - ); - expect(assistantMessage).toBeDefined(); - - // Verify the tool-call part has providerOptions mapped from providerMetadata - const toolCallPart = (assistantMessage?.content as any[])?.find( - (part) => part.type === 'tool-call' - ); - expect(toolCallPart).toBeDefined(); - expect(toolCallPart.providerOptions).toEqual({ - google: { - thoughtSignature: 'sig_abc123_test_signature', - }, - }); - }); - - it('should not add providerOptions when providerMetadata is undefined', async () => { - const mockWritable = createMockWritable(); - const mockModel = vi.fn(); - - let capturedPrompt: LanguageModelV2Prompt | undefined; - - const toolCallWithoutMetadata: LanguageModelV2ToolCall = { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"}', - // No providerMetadata - }; - - vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithoutMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) - .mockImplementationOnce(async (prompt) => { - capturedPrompt = prompt; - return { - toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; - }); - - const iterator = streamTextIterator({ - prompt: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], - tools: { - testTool: { - description: 'A test tool', - execute: async () => ({ result: 'success' }), - }, - } as ToolSet, - writable: mockWritable, - model: mockModel as any, - }); - - const firstResult = await iterator.next(); - expect(firstResult.done).toBe(false); - - const toolResults: LanguageModelV2ToolResultPart[] = [ - { - type: 'tool-result', - toolCallId: 'call-1', - toolName: 'testTool', - output: { type: 'text', value: '{"result":"success"}' }, - }, - ]; - - await iterator.next(toolResults); - - const assistantMessage = capturedPrompt?.find( - (msg) => msg.role === 'assistant' - ); - const toolCallPart = (assistantMessage?.content as any[])?.find( - (part) => part.type === 'tool-call' - ); - - expect(toolCallPart).toBeDefined(); - expect(toolCallPart.providerOptions).toBeUndefined(); - }); - - it('should preserve providerMetadata for multiple parallel tool calls', async () => { - const mockWritable = createMockWritable(); - const mockModel = vi.fn(); - - let capturedPrompt: LanguageModelV2Prompt | undefined; - - const toolCalls: LanguageModelV2ToolCall[] = [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'weatherTool', - input: '{"city":"NYC"}', - providerMetadata: { - google: { thoughtSignature: 'sig_weather_123' }, - }, - }, - { - type: 'tool-call', - toolCallId: 'call-2', - toolName: 'newsTool', - input: '{"topic":"tech"}', - providerMetadata: { - google: { thoughtSignature: 'sig_news_456' }, - }, - }, - ]; - - vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) - .mockImplementationOnce(async (prompt) => { - capturedPrompt = prompt; - return { - toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; - }); - - const iterator = streamTextIterator({ - prompt: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], - tools: { - weatherTool: { - description: 'Weather tool', - execute: async () => ({ temp: 72 }), - }, - newsTool: { - description: 'News tool', - execute: async () => ({ headlines: [] }), - }, - } as ToolSet, - writable: mockWritable, - model: mockModel as any, - }); - - const firstResult = await iterator.next(); - expect(firstResult.done).toBe(false); - expect(firstResult.value.toolCalls).toHaveLength(2); - - const toolResults: LanguageModelV2ToolResultPart[] = [ - { - type: 'tool-result', - toolCallId: 'call-1', - toolName: 'weatherTool', - output: { type: 'text', value: '{"temp":72}' }, - }, - { - type: 'tool-result', - toolCallId: 'call-2', - toolName: 'newsTool', - output: { type: 'text', value: '{"headlines":[]}' }, - }, - ]; - - await iterator.next(toolResults); - - const assistantMessage = capturedPrompt?.find( - (msg) => msg.role === 'assistant' - ); - const toolCallParts = (assistantMessage?.content as any[])?.filter( - (part) => part.type === 'tool-call' - ); - - expect(toolCallParts).toHaveLength(2); - - // Verify each tool call has its own providerOptions - const weatherToolCall = toolCallParts?.find( - (part) => part.toolName === 'weatherTool' - ); - expect(weatherToolCall?.providerOptions).toEqual({ - google: { thoughtSignature: 'sig_weather_123' }, - }); - - const newsToolCall = toolCallParts?.find( - (part) => part.toolName === 'newsTool' - ); - expect(newsToolCall?.providerOptions).toEqual({ - google: { thoughtSignature: 'sig_news_456' }, - }); - }); - - it('should handle mixed tool calls with and without providerMetadata', async () => { - const mockWritable = createMockWritable(); - const mockModel = vi.fn(); - - let capturedPrompt: LanguageModelV2Prompt | undefined; - - const toolCalls: LanguageModelV2ToolCall[] = [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'toolWithMeta', - input: '{}', - providerMetadata: { - vertex: { thoughtSignature: 'sig_vertex_789' }, - }, - }, - { - type: 'tool-call', - toolCallId: 'call-2', - toolName: 'toolWithoutMeta', - input: '{}', - // No providerMetadata - }, - ]; - - vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) - .mockImplementationOnce(async (prompt) => { - capturedPrompt = prompt; - return { - toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; - }); - - const iterator = streamTextIterator({ - prompt: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], - tools: { - toolWithMeta: { - description: 'Tool with metadata', - execute: async () => ({ ok: true }), - }, - toolWithoutMeta: { - description: 'Tool without metadata', - execute: async () => ({ ok: true }), - }, - } as ToolSet, - writable: mockWritable, - model: mockModel as any, - }); - - await iterator.next(); - - const toolResults: LanguageModelV2ToolResultPart[] = [ - { - type: 'tool-result', - toolCallId: 'call-1', - toolName: 'toolWithMeta', - output: { type: 'text', value: '{"ok":true}' }, - }, - { - type: 'tool-result', - toolCallId: 'call-2', - toolName: 'toolWithoutMeta', - output: { type: 'text', value: '{"ok":true}' }, - }, - ]; - - await iterator.next(toolResults); - - const assistantMessage = capturedPrompt?.find( - (msg) => msg.role === 'assistant' - ); - const toolCallParts = (assistantMessage?.content as any[])?.filter( - (part) => part.type === 'tool-call' - ); - - const toolWithMeta = toolCallParts?.find( - (part) => part.toolName === 'toolWithMeta' - ); - expect(toolWithMeta?.providerOptions).toEqual({ - vertex: { thoughtSignature: 'sig_vertex_789' }, - }); - - const toolWithoutMeta = toolCallParts?.find( - (part) => part.toolName === 'toolWithoutMeta' - ); - expect(toolWithoutMeta?.providerOptions).toBeUndefined(); - }); - }); -}); diff --git a/packages/ai/src/agent/stream-text-iterator.ts b/packages/ai/src/agent/stream-text-iterator.ts deleted file mode 100644 index eb7770d69d..0000000000 --- a/packages/ai/src/agent/stream-text-iterator.ts +++ /dev/null @@ -1,476 +0,0 @@ -import type { - LanguageModelV2CallOptions, - LanguageModelV2Prompt, - LanguageModelV2ToolCall, - LanguageModelV2ToolResultPart, -} from '@ai-sdk/provider'; -import type { - FinishReason, - StepResult, - StreamTextOnStepFinishCallback, - ToolChoice, - ToolSet, - UIMessageChunk, -} from 'ai'; -import { - doStreamStep, - type ModelStopCondition, - type ProviderExecutedToolResult, -} from './do-stream-step.js'; -import type { - GenerationSettings, - PrepareStepCallback, - StreamTextOnErrorCallback, - StreamTextTransform, - TelemetrySettings, -} from './durable-agent.js'; -import { toolsToModelTools } from './tools-to-model-tools.js'; -import type { CompatibleLanguageModel } from './types.js'; - -// Re-export for consumers -export type { ProviderExecutedToolResult } from './do-stream-step.js'; - -/** - * The value yielded by the stream text iterator when tool calls are requested. - * Contains both the tool calls and the current conversation messages. - */ -export interface StreamTextIteratorYieldValue { - /** The tool calls requested by the model */ - toolCalls: LanguageModelV2ToolCall[]; - /** The conversation messages up to (and including) the tool call request */ - messages: LanguageModelV2Prompt; - /** The step result from the current step */ - step?: StepResult; - /** The current experimental context */ - context?: unknown; - /** The UIMessageChunks written during this step (only when collectUIChunks is enabled) */ - uiChunks?: UIMessageChunk[]; - /** Provider-executed tool results (keyed by tool call ID) */ - providerExecutedToolResults?: Map; -} - -// This runs in the workflow context -export async function* streamTextIterator({ - prompt, - tools = {}, - writable, - model, - stopConditions, - maxSteps, - sendStart = true, - onStepFinish, - onError, - prepareStep, - generationSettings, - toolChoice, - experimental_context, - experimental_telemetry, - includeRawChunks = false, - experimental_transform, - responseFormat, - collectUIChunks = false, -}: { - prompt: LanguageModelV2Prompt; - tools: ToolSet; - writable: WritableStream; - model: string | (() => Promise); - stopConditions?: ModelStopCondition[] | ModelStopCondition; - maxSteps?: number; - sendStart?: boolean; - onStepFinish?: StreamTextOnStepFinishCallback; - onError?: StreamTextOnErrorCallback; - prepareStep?: PrepareStepCallback; - generationSettings?: GenerationSettings; - toolChoice?: ToolChoice; - experimental_context?: unknown; - experimental_telemetry?: TelemetrySettings; - includeRawChunks?: boolean; - experimental_transform?: - | StreamTextTransform - | Array>; - responseFormat?: LanguageModelV2CallOptions['responseFormat']; - /** If true, collects UIMessageChunks for later conversion to UIMessage[] */ - collectUIChunks?: boolean; -}): AsyncGenerator< - StreamTextIteratorYieldValue, - LanguageModelV2Prompt, - LanguageModelV2ToolResultPart[] -> { - let conversationPrompt = [...prompt]; // Create a mutable copy - let currentModel: string | (() => Promise) = model; - let currentGenerationSettings = generationSettings ?? {}; - let currentToolChoice = toolChoice; - let currentContext = experimental_context; - let currentActiveTools: string[] | undefined; - - const steps: StepResult[] = []; - let done = false; - let isFirstIteration = true; - let stepNumber = 0; - let lastStep: StepResult | undefined; - let lastStepWasToolCalls = false; - let lastStepUIChunks: UIMessageChunk[] | undefined; - let allAccumulatedUIChunks: UIMessageChunk[] = []; - - // Default maxSteps to Infinity to preserve backwards compatibility - // (agent loops until completion unless explicitly limited) - const effectiveMaxSteps = maxSteps ?? Infinity; - - // Convert transforms to array - const transforms = experimental_transform - ? Array.isArray(experimental_transform) - ? experimental_transform - : [experimental_transform] - : []; - - while (!done) { - // Check if we've exceeded the maximum number of steps - if (stepNumber >= effectiveMaxSteps) { - break; - } - - // Check for abort signal - if (currentGenerationSettings.abortSignal?.aborted) { - break; - } - - // Call prepareStep callback before each step if provided - if (prepareStep) { - const prepareResult = await prepareStep({ - model: currentModel, - stepNumber, - steps, - messages: conversationPrompt, - experimental_context: currentContext, - }); - - // Apply any overrides from prepareStep - if (prepareResult.model !== undefined) { - currentModel = prepareResult.model; - } - if (prepareResult.system !== undefined) { - // Update or prepend system message in the conversation prompt - if ( - conversationPrompt.length > 0 && - conversationPrompt[0].role === 'system' - ) { - // Replace existing system message - conversationPrompt[0] = { - role: 'system', - content: prepareResult.system, - }; - } else { - // Prepend new system message - conversationPrompt.unshift({ - role: 'system', - content: prepareResult.system, - }); - } - } - if (prepareResult.messages !== undefined) { - conversationPrompt = [...prepareResult.messages]; - } - if (prepareResult.experimental_context !== undefined) { - currentContext = prepareResult.experimental_context; - } - if (prepareResult.activeTools !== undefined) { - currentActiveTools = prepareResult.activeTools; - } - // Apply generation settings overrides - if (prepareResult.maxOutputTokens !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - maxOutputTokens: prepareResult.maxOutputTokens, - }; - } - if (prepareResult.temperature !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - temperature: prepareResult.temperature, - }; - } - if (prepareResult.topP !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - topP: prepareResult.topP, - }; - } - if (prepareResult.topK !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - topK: prepareResult.topK, - }; - } - if (prepareResult.presencePenalty !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - presencePenalty: prepareResult.presencePenalty, - }; - } - if (prepareResult.frequencyPenalty !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - frequencyPenalty: prepareResult.frequencyPenalty, - }; - } - if (prepareResult.stopSequences !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - stopSequences: prepareResult.stopSequences, - }; - } - if (prepareResult.seed !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - seed: prepareResult.seed, - }; - } - if (prepareResult.maxRetries !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - maxRetries: prepareResult.maxRetries, - }; - } - if (prepareResult.headers !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - headers: prepareResult.headers, - }; - } - if (prepareResult.providerOptions !== undefined) { - currentGenerationSettings = { - ...currentGenerationSettings, - providerOptions: prepareResult.providerOptions, - }; - } - if (prepareResult.toolChoice !== undefined) { - currentToolChoice = prepareResult.toolChoice; - } - } - - try { - // Filter tools if activeTools is specified - const effectiveTools = - currentActiveTools && currentActiveTools.length > 0 - ? filterToolSet(tools, currentActiveTools) - : tools; - - const { - toolCalls, - finish, - step, - uiChunks: stepUIChunks, - providerExecutedToolResults, - } = await doStreamStep( - conversationPrompt, - currentModel, - writable, - toolsToModelTools(effectiveTools), - { - sendStart: sendStart && isFirstIteration, - ...currentGenerationSettings, - toolChoice: currentToolChoice, - includeRawChunks, - experimental_telemetry, - transforms, - responseFormat, - collectUIChunks, - } - ); - isFirstIteration = false; - stepNumber++; - steps.push(step); - lastStep = step; - lastStepWasToolCalls = false; - lastStepUIChunks = stepUIChunks; - - // Aggregate UIChunks from this step (may include tool output chunks later) - let allStepUIChunks = [ - ...allAccumulatedUIChunks, - ...(stepUIChunks ?? []), - ]; - - // Normalize finishReason - AI SDK v6 returns { unified, raw }, v5 returns a string - const finishReason = normalizeFinishReason(finish?.finishReason); - - if (finishReason === 'tool-calls') { - lastStepWasToolCalls = true; - - // Add assistant message with tool calls to the conversation - // Note: providerMetadata from the tool call is mapped to providerOptions - // in the prompt format, following the AI SDK convention. This is critical - // for providers like Gemini that require thoughtSignature to be preserved - // across multi-turn tool calls. - conversationPrompt.push({ - role: 'assistant', - content: toolCalls.map((toolCall) => ({ - type: 'tool-call', - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: JSON.parse(toolCall.input), - ...(toolCall.providerMetadata != null - ? { providerOptions: toolCall.providerMetadata } - : {}), - })), - }); - - // Yield the tool calls along with the current conversation messages - // This allows executeTool to pass the conversation context to tool execute functions - // Also include provider-executed tool results so they can be used instead of local execution - const toolResults = yield { - toolCalls, - messages: conversationPrompt, - step, - context: currentContext, - uiChunks: allStepUIChunks, - providerExecutedToolResults, - }; - - const toolOutputChunks = await writeToolOutputToUI( - writable, - toolResults, - collectUIChunks - ); - // Merge tool output chunks into allStepUIChunks for the next iteration - if (collectUIChunks && toolOutputChunks.length > 0) { - allStepUIChunks = [...(allStepUIChunks ?? []), ...toolOutputChunks]; - // Also accumulate for future steps - allAccumulatedUIChunks = [ - ...allAccumulatedUIChunks, - ...toolOutputChunks, - ]; - } - - conversationPrompt.push({ - role: 'tool', - content: toolResults, - }); - - if (stopConditions) { - const stopConditionList = Array.isArray(stopConditions) - ? stopConditions - : [stopConditions]; - if (stopConditionList.some((test) => test({ steps }))) { - done = true; - } - } - } else if (finishReason === 'stop') { - // Add assistant message with text content to the conversation - const textContent = step.content.filter( - (item) => item.type === 'text' - ) as Array<{ type: 'text'; text: string }>; - - if (textContent.length > 0) { - conversationPrompt.push({ - role: 'assistant', - content: textContent, - }); - } - - done = true; - } else if (finishReason === 'length') { - // Model hit max tokens - stop but don't throw - done = true; - } else if (finishReason === 'content-filter') { - // Content filter triggered - stop but don't throw - done = true; - } else if (finishReason === 'error') { - // Model error - stop but don't throw - done = true; - } else if (finishReason === 'other') { - // Other reason - stop but don't throw - done = true; - } else if (finishReason === 'unknown') { - // Unknown reason - stop but don't throw - done = true; - } else if (!finishReason) { - // No finish reason - this might happen on incomplete streams - done = true; - } else { - throw new Error( - `Unexpected finish reason: ${typeof finish?.finishReason === 'object' ? JSON.stringify(finish?.finishReason) : finish?.finishReason}` - ); - } - - if (onStepFinish) { - await onStepFinish(step); - } - } catch (error) { - if (onError) { - await onError({ error }); - } - throw error; - } - } - - // Yield the final step if it wasn't already yielded (tool-calls steps are yielded inside the loop) - if (lastStep && !lastStepWasToolCalls) { - const finalUIChunks = [ - ...allAccumulatedUIChunks, - ...(lastStepUIChunks ?? []), - ]; - yield { - toolCalls: [], - messages: conversationPrompt, - step: lastStep, - context: currentContext, - uiChunks: finalUIChunks, - }; - } - - return conversationPrompt; -} - -async function writeToolOutputToUI( - writable: WritableStream, - toolResults: LanguageModelV2ToolResultPart[], - collectUIChunks?: boolean -): Promise { - 'use step'; - const writer = writable.getWriter(); - const chunks: UIMessageChunk[] = []; - try { - for (const result of toolResults) { - const chunk: UIMessageChunk = { - type: 'tool-output-available' as const, - toolCallId: result.toolCallId, - output: result.output.value, - }; - if (collectUIChunks) { - chunks.push(chunk); - } - await writer.write(chunk); - } - } finally { - writer.releaseLock(); - } - return chunks; -} - -/** - * Filter a tool set to only include the specified active tools. - */ -function filterToolSet(tools: ToolSet, activeTools: string[]): ToolSet { - const filtered: ToolSet = {}; - for (const toolName of activeTools) { - if (toolName in tools) { - filtered[toolName] = tools[toolName]; - } - } - return filtered; -} - -/** - * Normalize finishReason from different AI SDK versions. - * - AI SDK v6: returns { unified: 'tool-calls', raw: 'tool_use' } - * - AI SDK v5: returns 'tool-calls' string directly - */ -function normalizeFinishReason(raw: unknown): FinishReason | undefined { - if (raw == null) return undefined; - if (typeof raw === 'string') return raw as FinishReason; - if (typeof raw === 'object') { - const obj = raw as { unified?: FinishReason; type?: FinishReason }; - return obj.unified ?? obj.type ?? 'unknown'; - } - return undefined; -} diff --git a/packages/ai/src/agent/tools-to-model-tools.ts b/packages/ai/src/agent/tools-to-model-tools.ts deleted file mode 100644 index 9a30fb3cb6..0000000000 --- a/packages/ai/src/agent/tools-to-model-tools.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { LanguageModelV2FunctionTool } from '@ai-sdk/provider'; -import { asSchema, type ToolSet } from 'ai'; - -export function toolsToModelTools( - tools: ToolSet -): LanguageModelV2FunctionTool[] { - return Object.entries(tools).map(([name, tool]) => ({ - type: 'function', - name, - description: tool.description, - inputSchema: asSchema(tool.inputSchema).jsonSchema, - })); -} diff --git a/packages/ai/src/agent/types.ts b/packages/ai/src/agent/types.ts deleted file mode 100644 index 5eeb2f9d7f..0000000000 --- a/packages/ai/src/agent/types.ts +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Shared types for AI SDK v5 and v6 compatibility. - */ -import type { - LanguageModelV2, - LanguageModelV2CallOptions, - LanguageModelV2StreamPart, -} from '@ai-sdk/provider'; - -/** - * Compatible language model type that works with both AI SDK v5 and v6. - * - * AI SDK v5 uses LanguageModelV2, while AI SDK v6 uses LanguageModelV3. - * Both have compatible `doStream` interfaces for our use case. - * - * This type represents the union of both model versions, allowing code - * to work seamlessly with either AI SDK version. - * - * Note: V3 models accept LanguageModelV2CallOptions at runtime due to - * structural compatibility between V2 and V3 prompt/options formats. - */ -export type CompatibleLanguageModel = - | LanguageModelV2 - | { - readonly specificationVersion: 'v3'; - readonly provider: string; - readonly modelId: string; - /** - * Stream method compatible with both V2 and V3 models. - * V3 models accept V2-style call options due to structural compatibility - * at runtime - the prompt and options structures are essentially identical. - */ - doStream(options: LanguageModelV2CallOptions): PromiseLike<{ - stream: ReadableStream; - }>; - }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ed77517b01..52a4863187 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -310,6 +310,9 @@ importers: packages/ai: dependencies: + '@ai-sdk/durable-agent': + specifier: 0.0.0-27b68fc9-20260206061543 + version: 0.0.0-27b68fc9-20260206061543(zod@4.1.11) '@ai-sdk/provider': specifier: ^2.0.0 || ^3.0.0 version: 2.0.0 @@ -2201,6 +2204,12 @@ packages: peerDependencies: zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/durable-agent@0.0.0-27b68fc9-20260206061543': + resolution: {integrity: sha512-2+8ECb7C3GNW04Fa4l3chU8MrdmNLP44+czCozAWiqA0KZnPylGfOFCwR3v0zHkE/mha00X+LcmRI91g0hQKiA==} + engines: {node: '>=18'} + peerDependencies: + zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/gateway@2.0.0': resolution: {integrity: sha512-Gj0PuawK7NkZuyYgO/h5kDK/l6hFOjhLdTq3/Lli1FTl47iGmwhH1IZQpAL3Z09BeFYWakcwUmn02ovIm2wy9g==} engines: {node: '>=18'} @@ -2219,6 +2228,12 @@ packages: peerDependencies: zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/gateway@3.0.36': + resolution: {integrity: sha512-2r1Q6azvqMYxQ1hqfWZmWg4+8MajoldD/ty65XdhCaCoBfvDu7trcvxXDfTSU+3/wZ1JIDky46SWYFOHnTbsBw==} + engines: {node: '>=18'} + peerDependencies: + zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/google@2.0.43': resolution: {integrity: sha512-qO6giuoYCX/SdZScP/3VO5Xnbd392zm3HrTkhab/efocZU8J/VVEAcAUE1KJh0qOIAYllofRtpJIUGkRK8Q5rw==} engines: {node: '>=18'} @@ -2261,10 +2276,20 @@ packages: peerDependencies: zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/provider-utils@4.0.13': + resolution: {integrity: sha512-HHG72BN4d+OWTcq2NwTxOm/2qvk1duYsnhCDtsbYwn/h/4zeqURu1S0+Cn0nY2Ysq9a9HGKvrYuMn9bgFhR2Og==} + engines: {node: '>=18'} + peerDependencies: + zod: ^3.25.76 || ^4.1.8 + '@ai-sdk/provider@2.0.0': resolution: {integrity: sha512-6o7Y2SeO9vFKB8lArHXehNuusnpddKPk7xqL7T2/b+OvXMRIXUO1rR4wcv1hAFUAT9avGZshty3Wlua/XA7TvA==} engines: {node: '>=18'} + '@ai-sdk/provider@3.0.7': + resolution: {integrity: sha512-VkPLrutM6VdA924/mG8OS+5frbVTcu6e046D2bgDo00tehBANR1QBJ/mPcZ9tXMFOsVcm6SQArOregxePzTFPw==} + engines: {node: '>=18'} + '@ai-sdk/react@2.0.115': resolution: {integrity: sha512-Etu7gWSEi2dmXss1PoR5CAZGwGShXsF9+Pon1eRO6EmatjYaBMhq1CfHPyYhGzWrint8jJIK2VaAhiMef29qZw==} engines: {node: '>=18'} @@ -7359,6 +7384,9 @@ packages: '@standard-schema/spec@1.0.0': resolution: {integrity: sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==} + '@standard-schema/spec@1.1.0': + resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==} + '@standard-schema/utils@0.3.0': resolution: {integrity: sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g==} @@ -8010,6 +8038,10 @@ packages: resolution: {integrity: sha512-fnYhv671l+eTTp48gB4zEsTW/YtRgRPnkI2nT7x6qw5rkI1Lq2hTmQIpHPgyThI0znLK+vX2n9XxKdXZ7BUbbw==} engines: {node: '>= 20'} + '@vercel/oidc@3.1.0': + resolution: {integrity: sha512-Fw28YZpRnA3cAHHDlkt7xQHiJ0fcL+NRcIqsocZQUSmbzeIKRpwttJjik5ZGanXP+vlA4SbTg+AbA3bP363l+w==} + engines: {node: '>= 20'} + '@vercel/otel@1.13.0': resolution: {integrity: sha512-esRkt470Y2jRK1B1g7S1vkt4Csu44gp83Zpu8rIyPoqy2BKgk4z7ik1uSMswzi45UogLHFl6yR5TauDurBQi4Q==} engines: {node: '>=18'} @@ -8376,6 +8408,12 @@ packages: peerDependencies: zod: ^3.25.76 || ^4.1.8 + ai@6.0.73: + resolution: {integrity: sha512-p2/ICXIjAM4+bIFHEkAB+l58zq+aTmxAkotsb6doNt/CEms72zt6gxv2ky1fQDwU4ecMOcmMh78VJUSEKECzlg==} + engines: {node: '>=18'} + peerDependencies: + zod: ^3.25.76 || ^4.1.8 + ajv-formats@2.1.1: resolution: {integrity: sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==} peerDependencies: @@ -10585,11 +10623,13 @@ packages: glob@10.5.0: resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@11.1.0: resolution: {integrity: sha512-vuNwKSaKiqm7g0THUBu2x7ckSs3XJLXE+2ssL7/MfTGPLLcrJQ/4Uq1CjPTtO5cCIiRxqvN6Twy1qOwhL0Xjcw==} engines: {node: 20 || >=22} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@13.0.0: @@ -15459,6 +15499,13 @@ snapshots: zod: 4.1.11 optional: true + '@ai-sdk/durable-agent@0.0.0-27b68fc9-20260206061543(zod@4.1.11)': + dependencies: + '@ai-sdk/provider': 3.0.7 + '@ai-sdk/provider-utils': 4.0.13(zod@4.1.11) + ai: 6.0.73(zod@4.1.11) + zod: 4.1.11 + '@ai-sdk/gateway@2.0.0(zod@4.1.11)': dependencies: '@ai-sdk/provider': 2.0.0 @@ -15487,6 +15534,13 @@ snapshots: '@vercel/oidc': 3.0.5 zod: 4.1.11 + '@ai-sdk/gateway@3.0.36(zod@4.1.11)': + dependencies: + '@ai-sdk/provider': 3.0.7 + '@ai-sdk/provider-utils': 4.0.13(zod@4.1.11) + '@vercel/oidc': 3.1.0 + zod: 4.1.11 + '@ai-sdk/google@2.0.43(zod@4.1.11)': dependencies: '@ai-sdk/provider': 2.0.0 @@ -15544,10 +15598,21 @@ snapshots: eventsource-parser: 3.0.6 zod: 4.1.11 + '@ai-sdk/provider-utils@4.0.13(zod@4.1.11)': + dependencies: + '@ai-sdk/provider': 3.0.7 + '@standard-schema/spec': 1.1.0 + eventsource-parser: 3.0.6 + zod: 4.1.11 + '@ai-sdk/provider@2.0.0': dependencies: json-schema: 0.4.0 + '@ai-sdk/provider@3.0.7': + dependencies: + json-schema: 0.4.0 + '@ai-sdk/react@2.0.115(react@19.2.3)(zod@4.1.11)': dependencies: '@ai-sdk/provider-utils': 3.0.19(zod@4.1.11) @@ -21637,6 +21702,8 @@ snapshots: '@standard-schema/spec@1.0.0': {} + '@standard-schema/spec@1.1.0': {} + '@standard-schema/utils@0.3.0': {} '@sveltejs/acorn-typescript@1.0.6(acorn@8.15.0)': @@ -22485,6 +22552,8 @@ snapshots: '@vercel/oidc@3.0.5': {} + '@vercel/oidc@3.1.0': {} + '@vercel/otel@1.13.0(@opentelemetry/api-logs@0.57.2)(@opentelemetry/api@1.9.0)(@opentelemetry/instrumentation@0.57.2(@opentelemetry/api@1.9.0))(@opentelemetry/resources@1.30.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-logs@0.57.2(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-metrics@1.30.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.30.1(@opentelemetry/api@1.9.0))': dependencies: '@opentelemetry/api': 1.9.0 @@ -23055,6 +23124,14 @@ snapshots: '@opentelemetry/api': 1.9.0 zod: 4.1.11 + ai@6.0.73(zod@4.1.11): + dependencies: + '@ai-sdk/gateway': 3.0.36(zod@4.1.11) + '@ai-sdk/provider': 3.0.7 + '@ai-sdk/provider-utils': 4.0.13(zod@4.1.11) + '@opentelemetry/api': 1.9.0 + zod: 4.1.11 + ajv-formats@2.1.1(ajv@8.17.1): optionalDependencies: ajv: 8.17.1