Skip to content

worker: fix AnswerPostprocessor swallowing SSE responses#63

Open
andreypfau wants to merge 1 commit intoTelegramMessenger:mainfrom
AlphaCompute:fix/sse-postprocessor
Open

worker: fix AnswerPostprocessor swallowing SSE responses#63
andreypfau wants to merge 1 commit intoTelegramMessenger:mainfrom
AlphaCompute:fix/sse-postprocessor

Conversation

@andreypfau
Copy link
Copy Markdown

Fixes #49.

AnswerPostprocessor::add_next_answer_slice assumed the LLM backend returns a single JSON document per response. That holds for non-streaming completions, but with stream: true vLLM emits SSE frames (data: {json}\n\n). The existing parser ran those through nlohmann::json's operator>>, which throws on the data: prefix; the catch block bailed out without advancing pos, so every subsequent slice re-threw and the whole stream was eventually dropped in finalize() with unprocessed data in answer. Clients saw HTTP 200 with content-length 0.

The fix peeks the first non-whitespace byte of the buffer. If it's d (i.e. data:), the buffer is parsed as a sequence of complete SSE events — split on \n\n, extract the JSON payload after data:, pass [DONE] and parse failures through verbatim, run the existing usage/cost logic unchanged, and emit data: {json}\n\n. Any other first byte falls through to the original std::stringstream path so non-streaming responses are untouched. finalize() synthesises a trailing \n\n when the stream ends mid-event so the final frame isn't lost.

Verification

Repro + regression test as a standalone binary: https://gist.github.com/andreypfau/02d0987cf1bc1ebff447f0c6b3cf5f8b

Three scenarios (whole SSE events, byte-by-byte SSE, raw non-streaming JSON) against both the pre-fix and post-fix ValidateRequest.cpp:

scenario pre-fix post-fix
sse_whole_events 0 bytes forwarded, completion=0 541 bytes, completion=7
sse_byte_split 0 bytes forwarded, completion=0 541 bytes, completion=7
raw_json_non_stream 349 bytes, completion=7 349 bytes, completion=7

Pre-fix logs worker request: unprocessed data in answer: bytes=476 — the untouched SSE buffer that got dropped. Post-fix aggregates usage correctly from the trailing chunk and the raw-JSON path is byte-identical to the pre-fix output.

Follow-ups

ByteTokenCounter in runners/helpers/CountTokens.cpp has the same std::stringstream + operator>> pattern but is dead code — create_token_counter is not called from any runner. Worth consolidating through a shared SSE helper in a separate patch if the counter is brought back into use.

AnswerPostprocessor assumed the LLM backend returns a single JSON
document per response. That holds for non-streaming completions, but
with stream:true vLLM emits SSE frames ("data: {json}\n\n"). The
existing parser ran those through nlohmann::json's operator>>, which
throws on the "data:" prefix; the catch bailed out without advancing
pos, so every subsequent slice re-threw and the whole stream was
eventually dropped in finalize() with "unprocessed data in answer".

Fix: peek the first non-whitespace byte. If it's 'd' (i.e. "data:"),
parse the buffer as a sequence of complete SSE events — split on \n\n,
extract the JSON payload after "data:", pass [DONE] and parse-failures
through verbatim, run the existing usage/cost logic, and emit a
"data: {json}\n\n" event. Any other first byte falls through to the
original std::stringstream path, so non-streaming responses are
untouched. finalize() synthesises a trailing \n\n when the stream ends
mid-event so the final frame isn't lost.

Repro and regression tests: https://gist.github.com/andreypfau/02d0987cf1bc1ebff447f0c6b3cf5f8b
Issue: TelegramMessenger#49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Worker AnswerPostprocessor drops stream:true responses

1 participant