worker: fix AnswerPostprocessor swallowing SSE responses#63
Open
andreypfau wants to merge 1 commit intoTelegramMessenger:mainfrom
Open
worker: fix AnswerPostprocessor swallowing SSE responses#63andreypfau wants to merge 1 commit intoTelegramMessenger:mainfrom
andreypfau wants to merge 1 commit intoTelegramMessenger:mainfrom
Conversation
AnswerPostprocessor assumed the LLM backend returns a single JSON
document per response. That holds for non-streaming completions, but
with stream:true vLLM emits SSE frames ("data: {json}\n\n"). The
existing parser ran those through nlohmann::json's operator>>, which
throws on the "data:" prefix; the catch bailed out without advancing
pos, so every subsequent slice re-threw and the whole stream was
eventually dropped in finalize() with "unprocessed data in answer".
Fix: peek the first non-whitespace byte. If it's 'd' (i.e. "data:"),
parse the buffer as a sequence of complete SSE events — split on \n\n,
extract the JSON payload after "data:", pass [DONE] and parse-failures
through verbatim, run the existing usage/cost logic, and emit a
"data: {json}\n\n" event. Any other first byte falls through to the
original std::stringstream path, so non-streaming responses are
untouched. finalize() synthesises a trailing \n\n when the stream ends
mid-event so the final frame isn't lost.
Repro and regression tests: https://gist.github.com/andreypfau/02d0987cf1bc1ebff447f0c6b3cf5f8b
Issue: TelegramMessenger#49
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #49.
AnswerPostprocessor::add_next_answer_sliceassumed the LLM backend returns a single JSON document per response. That holds for non-streaming completions, but withstream: truevLLM emits SSE frames (data: {json}\n\n). The existing parser ran those throughnlohmann::json'soperator>>, which throws on thedata:prefix; the catch block bailed out without advancingpos, so every subsequent slice re-threw and the whole stream was eventually dropped infinalize()withunprocessed data in answer. Clients saw HTTP 200 with content-length 0.The fix peeks the first non-whitespace byte of the buffer. If it's
d(i.e.data:), the buffer is parsed as a sequence of complete SSE events — split on\n\n, extract the JSON payload afterdata:, pass[DONE]and parse failures through verbatim, run the existing usage/cost logic unchanged, and emitdata: {json}\n\n. Any other first byte falls through to the originalstd::stringstreampath so non-streaming responses are untouched.finalize()synthesises a trailing\n\nwhen the stream ends mid-event so the final frame isn't lost.Verification
Repro + regression test as a standalone binary: https://gist.github.com/andreypfau/02d0987cf1bc1ebff447f0c6b3cf5f8b
Three scenarios (whole SSE events, byte-by-byte SSE, raw non-streaming JSON) against both the pre-fix and post-fix
ValidateRequest.cpp:Pre-fix logs
worker request: unprocessed data in answer: bytes=476— the untouched SSE buffer that got dropped. Post-fix aggregates usage correctly from the trailing chunk and the raw-JSON path is byte-identical to the pre-fix output.Follow-ups
ByteTokenCounterinrunners/helpers/CountTokens.cpphas the samestd::stringstream+operator>>pattern but is dead code —create_token_counteris not called from any runner. Worth consolidating through a shared SSE helper in a separate patch if the counter is brought back into use.