diff --git a/.changeset/proud-dingos-peel.md b/.changeset/proud-dingos-peel.md new file mode 100644 index 000000000..05f967941 --- /dev/null +++ b/.changeset/proud-dingos-peel.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-postgres": patch +--- + +Fix racing conditions in Postgres streamer diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 49d9794e9..24d0b6aa0 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -68,9 +68,7 @@ export function createStreamer( const STREAM_TOPIC = 'workflow_event_chunk'; const listenSubscription = postgres.listen(STREAM_TOPIC, async (msg) => { - const parsed = await Promise.resolve(msg) - .then(JSON.parse) - .then(StreamPublishMessage.parse); + const parsed = StreamPublishMessage.parse(JSON.parse(msg)); const key = `strm:${parsed.streamId}` as const; if (!events.listenerCount(key)) { @@ -116,7 +114,7 @@ export function createStreamer( chunkData: toBuffer(chunk), eof: false, }); - postgres.notify( + await postgres.notify( STREAM_TOPIC, JSON.stringify( StreamPublishMessage.encode({ @@ -153,7 +151,7 @@ export function createStreamer( // Notify for each chunk (could be batched in future if needed) for (const chunkId of chunkIds) { - postgres.notify( + await postgres.notify( STREAM_TOPIC, JSON.stringify( StreamPublishMessage.encode({ @@ -179,7 +177,7 @@ export function createStreamer( chunkData: Buffer.from([]), eof: true, }); - postgres.notify( + await postgres.notify( 'workflow_event_chunk', JSON.stringify( StreamPublishMessage.encode({