diff --git a/.changeset/e2e-encryption.md b/.changeset/e2e-encryption.md new file mode 100644 index 000000000..302bc31b4 --- /dev/null +++ b/.changeset/e2e-encryption.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"@workflow/world-vercel": patch +--- + +Wire AES-GCM encryption into serialization layer with stream support diff --git a/packages/core/package.json b/packages/core/package.json index ff7b0107d..4ab1e53de 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -64,10 +64,6 @@ "types": "./dist/serialization-format.d.ts", "default": "./dist/serialization-format.js" }, - "./encryption": { - "types": "./dist/encryption.d.ts", - "default": "./dist/encryption.js" - }, "./_workflow": "./dist/workflow/index.js" }, "scripts": { diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 2ed8667c0..0c34a7c95 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -156,11 +156,11 @@ export class Run { const encryptionKey = await this.world.getEncryptionKeyForRun?.( this.runId ); - return await hydrateWorkflowReturnValue( + return (await hydrateWorkflowReturnValue( run.output, this.runId, encryptionKey - ); + )) as TResult; } if (run.status === 'cancelled') { diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index f0a4cf74c..2a388483a 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -300,19 +300,19 @@ const stepHandler = getWorldHandlers().createQueueHandler( // operations (e.g., stream loading) are added to `ops` and executed later // via Promise.all(ops) - their timing is not included in this measurement. const ops: Promise[] = []; - const hydratedInput = await trace( + const hydratedInput = (await trace( 'step.hydrate', {}, async (hydrateSpan) => { const startTime = Date.now(); const encryptionKey = await world.getEncryptionKeyForRun?.(workflowRunId); - const result = await hydrateStepArguments( + const result = (await hydrateStepArguments( step.input, workflowRunId, encryptionKey, ops - ); + )) as any; const durationMs = Date.now() - startTime; hydrateSpan?.setAttributes({ ...Attribute.StepArgumentsCount(result.args.length), @@ -320,7 +320,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); return result; } - ); + )) as any; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 186c11396..60ca13806 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -1932,12 +1932,17 @@ describe('step function serialization', () => { enumerable: false, configurable: false, }); - const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); - const ops: Promise[] = []; + const dehydrated = await dehydrateStepArguments( + [fnWithStepId], + mockRunId, + undefined, + globalThis + ); const hydrated = await hydrateStepArguments( dehydrated, - ops, mockRunId, + undefined, + [], globalThis ); const result = hydrated[0]; @@ -1959,12 +1964,17 @@ describe('step function serialization', () => { enumerable: false, configurable: false, }); - const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); - const ops: Promise[] = []; + const dehydrated = await dehydrateStepArguments( + [fnWithStepId], + mockRunId, + undefined, + globalThis + ); const hydrated = await hydrateStepArguments( dehydrated, - ops, mockRunId, + undefined, + [], globalThis ); const result = hydrated[0]; @@ -3146,3 +3156,218 @@ describe('getDeserializeStream legacy fallback', () => { expect(results[1]).toBe('world'); }); }); + +describe('encryption integration', () => { + // Real 32-byte AES-256 test key + const testKey = new Uint8Array([ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, + 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, + 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, + ]); + // A different key for wrong-key tests + const wrongKey = new Uint8Array(32); + wrongKey.fill(0xff); + + it('should encrypt workflow arguments when key is provided', async () => { + const testRunId = 'wrun_test123'; + const testValue = { message: 'secret data', count: 42 }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testKey, + [], + globalThis, + false + ); + + // Should be a Uint8Array with 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + }); + + it('should decrypt workflow arguments with correct key', async () => { + const testRunId = 'wrun_test123'; + const testValue = { message: 'secret data', count: 42 }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testKey, + [], + globalThis, + false + ); + + const decrypted = await hydrateWorkflowArguments( + encrypted, + testRunId, + testKey, + globalThis, + {} + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should fail to decrypt with wrong key', async () => { + const testRunId = 'wrun_test123'; + const testValue = { message: 'secret data' }; + + const encrypted = await dehydrateWorkflowArguments( + testValue, + testRunId, + testKey, + [], + globalThis, + false + ); + + // AES-GCM auth tag check should fail with wrong key + await expect( + hydrateWorkflowArguments(encrypted, testRunId, wrongKey, globalThis, {}) + ).rejects.toThrow(); + }); + + it('should not encrypt when no key is provided', async () => { + const testRunId = 'wrun_test123'; + const testValue = { message: 'plain data' }; + + const serialized = await dehydrateWorkflowArguments( + testValue, + testRunId, + undefined, + [], + globalThis, + false + ); + + // Should be a Uint8Array with 'devl' prefix (not encrypted) + expect(serialized).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (serialized as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('devl'); + }); + + it('should handle unencrypted data when key is provided', async () => { + const testRunId = 'wrun_test123'; + const testValue = { message: 'plain data' }; + + // Serialize without encryption + const serialized = await dehydrateWorkflowArguments( + testValue, + testRunId, + undefined, + [], + globalThis, + false + ); + + // Hydrate with key — should still work because data isn't encrypted + const hydrated = await hydrateWorkflowArguments( + serialized, + testRunId, + testKey, + globalThis, + {} + ); + + expect(hydrated).toEqual(testValue); + }); + + it('should encrypt step arguments', async () => { + const testRunId = 'wrun_test123'; + const testValue = ['arg1', { nested: 'value' }, 123]; + + const encrypted = await dehydrateStepArguments( + testValue, + testRunId, + testKey, + globalThis, + false + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + const decrypted = await hydrateStepArguments( + encrypted, + testRunId, + testKey, + [], + globalThis + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should encrypt step return values', async () => { + const testRunId = 'wrun_test123'; + const testValue = { result: 'success', data: [1, 2, 3] }; + + const encrypted = await dehydrateStepReturnValue( + testValue, + testRunId, + testKey, + [], + globalThis + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + const decrypted = await hydrateStepReturnValue( + encrypted, + testRunId, + testKey, + globalThis + ); + + expect(decrypted).toEqual(testValue); + }); + + it('should encrypt workflow return values', async () => { + const testRunId = 'wrun_test123'; + const testValue = { final: 'result', timestamp: Date.now() }; + + const encrypted = await dehydrateWorkflowReturnValue( + testValue, + testRunId, + testKey, + globalThis + ); + + // Should have 'encr' prefix + expect(encrypted).toBeInstanceOf(Uint8Array); + const prefix = new TextDecoder().decode( + (encrypted as Uint8Array).subarray(0, 4) + ); + expect(prefix).toBe('encr'); + + // Should round-trip correctly + const decrypted = await hydrateWorkflowReturnValue( + encrypted, + testRunId, + testKey, + [], + globalThis, + {} + ); + + expect(decrypted).toEqual(testValue); + }); +}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index d5282490b..573410c5d 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -4,6 +4,10 @@ import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; import { getSerializationClass } from './class-serialization.js'; +import { + decrypt as aesGcmDecrypt, + encrypt as aesGcmEncrypt, +} from './encryption.js'; import { createFlushableState, flushablePipe, @@ -53,6 +57,8 @@ import { export const SerializationFormat = { /** devalue stringify/parse with TextEncoder/TextDecoder */ DEVALUE_V1: 'devl', + /** Encrypted payload (inner payload has its own format prefix) */ + ENCRYPTED: 'encr', } as const; export type SerializationFormatType = @@ -95,6 +101,38 @@ export function encodeWithFormatPrefix( return result; } +/** + * Peek at the format prefix without consuming it. + * Useful for checking if data is encrypted before deciding how to process it. + * + * @param data - The format-prefixed data + * @returns The format identifier, or null if data is legacy/non-binary + */ +export function peekFormatPrefix( + data: Uint8Array | unknown +): SerializationFormatType | null { + if (!(data instanceof Uint8Array) || data.length < FORMAT_PREFIX_LENGTH) { + return null; + } + const prefixBytes = data.subarray(0, FORMAT_PREFIX_LENGTH); + const format = formatDecoder.decode(prefixBytes); + const knownFormats = Object.values(SerializationFormat) as string[]; + if (!knownFormats.includes(format)) { + return null; + } + return format as SerializationFormatType; +} + +/** + * Check if data is encrypted (has 'encr' format prefix). + * + * @param data - The format-prefixed data + * @returns true if data has the encrypted format prefix + */ +export function isEncrypted(data: Uint8Array | unknown): boolean { + return peekFormatPrefix(data) === SerializationFormat.ENCRYPTED; +} + /** * Decode a format-prefixed payload. * @@ -314,7 +352,7 @@ export function getDeserializeStream( export class WorkflowServerReadableStream extends ReadableStream { #reader?: ReadableStreamDefaultReader; - constructor(name: string, startIndex?: number) { + constructor(name: string, startIndex?: number, runId?: string) { if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } @@ -339,7 +377,37 @@ export class WorkflowServerReadableStream extends ReadableStream { this.#reader = undefined; controller.close(); } else { - controller.enqueue(result.value); + // Decrypt chunk if encrypted + let chunk = result.value; + if (isEncrypted(chunk)) { + if (!runId) { + controller.error( + new WorkflowRuntimeError( + 'Encrypted stream data encountered but no runId provided for decryption context.' + ) + ); + return; + } + const world = getWorld(); + const key = await world.getEncryptionKeyForRun?.(runId); + if (!key) { + controller.error( + new WorkflowRuntimeError( + 'Encrypted stream data encountered but no encryption key available. ' + + 'Ensure encryption is configured.' + ) + ); + return; + } + runtimeLogger.debug('[encryption] stream decrypt', { + runId, + inputBytes: chunk.byteLength, + }); + // Strip 'encr' prefix — prefix is a core framing concern + const { payload } = decodeFormatPrefix(chunk); + chunk = await aesGcmDecrypt(key, payload); + } + controller.enqueue(chunk); } }, }); @@ -377,7 +445,26 @@ export class WorkflowServerWritableStream extends WritableStream { // Copy chunks to flush, but don't clear buffer until write succeeds // This prevents data loss if the write operation fails - const chunksToFlush = buffer.slice(); + let chunksToFlush = buffer.slice(); + + // Encrypt chunks if world supports encryption + const key = await world.getEncryptionKeyForRun?.(runId); + if (key) { + runtimeLogger.debug('[encryption] stream encrypt', { + runId, + chunks: chunksToFlush.length, + totalBytes: chunksToFlush.reduce((sum, c) => sum + c.byteLength, 0), + }); + chunksToFlush = await Promise.all( + chunksToFlush.map(async (chunk) => { + const encrypted = await aesGcmEncrypt(key, chunk); + return encodeWithFormatPrefix( + SerializationFormat.ENCRYPTED, + encrypted + ) as Uint8Array; + }) + ); + } // Use writeToStreamMulti if available for batch writes if ( @@ -725,7 +812,7 @@ export function getExternalReducers( const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); const name = `strm_${streamId}`; - const readable = new WorkflowServerReadableStream(name); + const readable = new WorkflowServerReadableStream(name, undefined, runId); ops.push(readable.pipeTo(value)); return { name }; @@ -811,12 +898,6 @@ function getStepReducers( let type = value[STREAM_TYPE_SYMBOL]; if (!name) { - if (!runId) { - throw new Error( - 'ReadableStream cannot be serialized without a valid runId' - ); - } - const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; type = getStreamType(value); @@ -845,16 +926,10 @@ function getStepReducers( let name = value[STREAM_NAME_SYMBOL]; if (!name) { - if (!runId) { - throw new Error( - 'WritableStream cannot be serialized without a valid runId' - ); - } - const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; ops.push( - new WorkflowServerReadableStream(name) + new WorkflowServerReadableStream(name, undefined, runId) .pipeThrough( getDeserializeStream(getStepRevivers(global, ops, runId)) ) @@ -1031,7 +1106,8 @@ export function getExternalRevivers( const readable = new WorkflowServerReadableStream( value.name, - value.startIndex + value.startIndex, + runId ); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling @@ -1299,7 +1375,11 @@ function getStepRevivers( return response.body; } - const readable = new WorkflowServerReadableStream(value.name); + const readable = new WorkflowServerReadableStream( + value.name, + undefined, + runId + ); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling const state = createFlushableState(); @@ -1337,12 +1417,6 @@ function getStepRevivers( } }, WritableStream: (value) => { - if (!runId) { - throw new Error( - 'WritableStream cannot be revived without a valid runId' - ); - } - const serialize = getSerializeStream(getStepReducers(global, ops, runId)); const serverWritable = new WorkflowServerWritableStream( value.name, @@ -1366,20 +1440,84 @@ function getStepRevivers( }; } +// ============================================================================ +// Encryption Helpers +// ============================================================================ + +/** + * Encrypt data if the world supports encryption. + * Returns original data if encryption is not available. + * + * @param data - Serialized data to encrypt + * @param key - Encryption key (undefined to skip encryption) + * @param context - Encryption context with runId + * @returns Encrypted data if encryption available, original data otherwise + */ +export async function maybeEncrypt( + data: Uint8Array, + key: Uint8Array | undefined +): Promise { + if (!key) return data; + const encrypted = await aesGcmEncrypt(key, data); + return encodeWithFormatPrefix( + SerializationFormat.ENCRYPTED, + encrypted + ) as Uint8Array; +} + +/** + * Decrypt data if it has the 'encr' prefix. + * Throws if encrypted but no key is available. + * + * @param data - Data that may be encrypted + * @param key - Encryption key (undefined if no key available) + * @param context - Encryption context with runId + * @returns Decrypted data if encrypted, original data otherwise + */ +export async function maybeDecrypt( + data: Uint8Array | unknown, + key: Uint8Array | undefined +): Promise { + if (!(data instanceof Uint8Array)) { + return data; + } + + if (isEncrypted(data)) { + if (!key) { + throw new WorkflowRuntimeError( + 'Encrypted data encountered but no encryption key available. ' + + 'Ensure VERCEL_DEPLOYMENT_KEY is set.' + ); + } + // Strip the 'encr' format prefix — the prefix is a core framing concern + const { payload } = decodeFormatPrefix(data); + return aesGcmDecrypt(key, payload); + } + + return data; +} + +// ============================================================================ +// Dehydrate / Hydrate Functions +// ============================================================================ + /** * Called from the `start()` function to serialize the workflow arguments * into a format that can be saved to the database and then hydrated from * within the workflow execution environment. * - * @param value - * @param global - * @param runId + * @param value - The value to serialize + * @param runId - The workflow run ID (required for encryption context) + * @param key - Encryption key (undefined to skip encryption) + * @param ops - Promise array for stream operations + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateWorkflowArguments( value: unknown, runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, ops: Promise[] = [], global: Record = globalThis, v1Compat = false @@ -1390,7 +1528,13 @@ export async function dehydrateWorkflowArguments( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, key); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('workflow arguments', error), @@ -1404,25 +1548,30 @@ export async function dehydrateWorkflowArguments( * arguments from the database at the start of workflow execution. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param global - * @param extraRevivers + * @param runId - Run ID for decryption context + * @param key - Encryption key (undefined to skip decryption) + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated value */ export async function hydrateWorkflowArguments( value: Uint8Array | unknown, _runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, key); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getWorkflowRevivers(global), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1440,14 +1589,16 @@ export async function hydrateWorkflowArguments( * Called at the end of a completed workflow execution to serialize the * return value into a format that can be saved to the database. * - * @param value - * @param global + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param key - Encryption key (undefined to skip encryption) + * @param global - Global object for serialization context * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateWorkflowReturnValue( value: unknown, _runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, global: Record = globalThis, v1Compat = false ): Promise { @@ -1457,7 +1608,13 @@ export async function dehydrateWorkflowReturnValue( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, key); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('workflow return value', error), @@ -1472,28 +1629,32 @@ export async function dehydrateWorkflowReturnValue( * return value of a completed workflow run. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param ops - * @param global - * @param extraRevivers - * @param runId + * @param runId - Run ID for decryption context + * @param key - Encryption key (undefined to skip decryption) + * @param ops - Promise array for stream operations + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated return value, ready to be consumed by the client */ export async function hydrateWorkflowReturnValue( value: Uint8Array | unknown, runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, key); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getExternalRevivers(global, ops, runId), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1512,14 +1673,17 @@ export async function hydrateWorkflowReturnValue( * Dehydrates values from within the workflow execution environment * into a format that can be saved to the database. * - * @param value - * @param global + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param key - Encryption key (undefined to skip encryption) + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateStepArguments( value: unknown, _runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, global: Record = globalThis, v1Compat = false ): Promise { @@ -1529,7 +1693,13 @@ export async function dehydrateStepArguments( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, key); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('step arguments', error), @@ -1543,28 +1713,32 @@ export async function dehydrateStepArguments( * from the database at the start of the step execution. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param ops - * @param global - * @param extraRevivers - * @param runId + * @param runId - Run ID for decryption context + * @param key - Encryption key (undefined to skip decryption) + * @param ops - Promise array for stream operations + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated value, ready to be consumed by the step user-code function */ export async function hydrateStepArguments( value: Uint8Array | unknown, runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, key); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getStepRevivers(global, ops, runId), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); @@ -1583,16 +1757,18 @@ export async function hydrateStepArguments( * Dehydrates values from within the step execution environment * into a format that can be saved to the database. * - * @param value - * @param ops - * @param global - * @param runId + * @param value - The value to serialize + * @param runId - Run ID for encryption context + * @param key - Encryption key (undefined to skip encryption) + * @param ops - Promise array for stream operations + * @param global - Global object for serialization context + * @param v1Compat - Enable legacy v1 compatibility mode * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateStepReturnValue( value: unknown, runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, ops: Promise[] = [], global: Record = globalThis, v1Compat = false @@ -1603,7 +1779,13 @@ export async function dehydrateStepReturnValue( return revive(str); } const payload = new TextEncoder().encode(str); - return encodeWithFormatPrefix(SerializationFormat.DEVALUE_V1, payload); + const serialized = encodeWithFormatPrefix( + SerializationFormat.DEVALUE_V1, + payload + ) as Uint8Array; + + // Encrypt if world supports encryption + return maybeEncrypt(serialized, key); } catch (error) { throw new WorkflowRuntimeError( formatSerializationError('step return value', error), @@ -1617,25 +1799,30 @@ export async function dehydrateStepReturnValue( * Hydrates the return value of a step from the database. * * @param value - Binary serialized data (Uint8Array) with format prefix - * @param global - * @param extraRevivers + * @param runId - Run ID for decryption context + * @param key - Encryption key (undefined to skip decryption) + * @param global - Global object for deserialization context + * @param extraRevivers - Additional revivers for custom types * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ export async function hydrateStepReturnValue( value: Uint8Array | unknown, _runId: string, - _key: Uint8Array | undefined, + key: Uint8Array | undefined, global: Record = globalThis, extraRevivers: Record any> = {} -) { - if (!(value instanceof Uint8Array)) { - return unflatten(value as any[], { +): Promise { + // Decrypt if needed + const decrypted = await maybeDecrypt(value, key); + + if (!(decrypted instanceof Uint8Array)) { + return unflatten(decrypted as any[], { ...getWorkflowRevivers(global), ...extraRevivers, }); } - const { format, payload } = decodeFormatPrefix(value); + const { format, payload } = decodeFormatPrefix(decrypted); if (format === SerializationFormat.DEVALUE_V1) { const str = new TextDecoder().decode(payload); diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index a84b11e12..bd85da410 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -627,12 +627,12 @@ export async function runWorkflow( ); } - const args = await hydrateWorkflowArguments( + const args = (await hydrateWorkflowArguments( workflowRun.input, workflowRun.runId, encryptionKey, vmGlobalThis - ); + )) as any[]; span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index f4e7a3036..fc1d9b087 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -101,7 +101,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { ctx.globalThis ) .then((payload) => { - next.resolve(payload); + next.resolve(payload as T); }) .catch((error) => { next.reject(error); @@ -152,7 +152,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { ctx.globalThis ) .then((payload) => { - resolvers.resolve(payload); + resolvers.resolve(payload as T); }) .catch((error) => { resolvers.reject(error);