diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b411818..ad2276a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ For the release process and tag conventions, see [RELEASING.md](RELEASING.md). ## Unreleased +### Fixed + +- **`get-stream` yields lines on newline, not buffer-fill (ILO-489).** The client-side streaming builtins (`get-stream`, `get-stream-h`, `pst-stream`, `pst-stream-h`, ILO-448) wrapped minreq's `ResponseLazy` in `BufReader::lines()`, which blocks on a full ~8 KiB read-buffer fill before surfacing any line. So a slow SSE upstream that flushes one short event then idles had its lines batched until the buffer filled or the connection closed - functionally correct but latency was buffer-bound, not event-bound. The line splitter now consumes `ResponseLazy`'s byte iterator incrementally and emits each line the instant its `\n` arrives (trailing `\r` stripped for CRLF / chunked encoding), so each event surfaces promptly. EOF still yields a trailing newline-less partial line; mid-stream errors still surface as `ILO-R009 http-stream read error: ...`. Unblocks ILO-482's previously flaky end-to-end streaming test. + ### Added - **`ilo httpd` resolves `use` imports (ILO-481).** Handler files loaded by `ilo httpd` now have their `use` imports resolved at startup, relative to the handler's own directory, matching the existing `ilo run` / `ilo check` semantics. Previously `httpd` lexed, parsed, and verified only the single handler file and silently skipped import resolution, so a handler could not `use` a sibling module - `ilo-lang/crew`'s `crew-server` had to inline ~140 lines of store logic to work around it. A missing module now surfaces a real import diagnostic and the server refuses to start instead of failing later with a generic verifier error. See `docs/streaming.md`. diff --git a/skills/ilo/ilo-builtins-io.md b/skills/ilo/ilo-builtins-io.md index aba8e03f..8854920b 100644 --- a/skills/ilo/ilo-builtins-io.md +++ b/skills/ilo/ilo-builtins-io.md @@ -30,7 +30,7 @@ Timeout variants round up to the nearest second. Err on timeout or connection fa **WASM targets**: `get` and `pst` work in WASM (`wasm32-wasi`, `wasm32-unknown-unknown`) via `fetch` host import (module `"ilo_http"`); host must export the import set — see `src/interpreter/http_wasm.rs`. `put`, `pat`, `del`, `hed`, `opt`, `get-to`, `pst-to`, `getx`, `pstx`, `get-many`, and `*-stream` are native-only. `--allow-net` enforced on WASM same as native. -**Streaming (ILO-46)**: `get-stream url[+hs]`, `pst-stream url body[+hs]` → lazy `L t` line iter, drained via `@line (get-stream url){...}`. Body never buffered. SSE / log tails / NDJSON. Mid-stream → `ILO-R009`. Tree+VM. +**Streaming (ILO-46)**: `get-stream url[+hs]`, `pst-stream url body[+hs]` → lazy `L t` line iter, drained via `@line (get-stream url){...}`. Body never buffered; each line yields the instant its newline arrives (event-bound, not buffer-bound), so a slow SSE upstream surfaces each event promptly. SSE / log tails / NDJSON. Mid-stream → `ILO-R009`. Tree+VM. `getx url` / `pstx url body` (rich response, `R (M t _) t`): Ok-map with `status` (n), `headers` (M t t), `body` (t). Non-2xx is still Ok with status surfaced on the map; only transport failure is Err. Optional trailing request-headers map (M t t), same as `get`/`pst`. Use these when you need conditional requests (304), status-code branching (429), response-header reads (ETag, Link, X-RateLimit-*), or redirect following. Body-only `get`/`pst` stay cheaper for fire-and-forget; `getx`/`pstx` are the heavier variant. Response header names are lowercased on the Ok-map. diff --git a/src/interpreter/http_wasm.rs b/src/interpreter/http_wasm.rs index ebe7951c..99a087d9 100644 --- a/src/interpreter/http_wasm.rs +++ b/src/interpreter/http_wasm.rs @@ -150,12 +150,7 @@ impl HttpBackend for NativeHttpBackend { req = req.with_header(k.as_str(), v.as_str()); } let resp = req.send_lazy().map_err(|e| e.to_string())?; - // BufReader's `lines()` strips both `\n` and the trailing `\r` for - // `\r\n` chunked encoding, which is exactly what we want for SSE- - // style line consumption. - use std::io::BufRead; - let reader = std::io::BufReader::new(resp); - Ok(Box::new(reader.lines())) + Ok(Box::new(LazyLineSplitter::new(resp))) } fn post_stream( @@ -170,9 +165,84 @@ impl HttpBackend for NativeHttpBackend { req = req.with_header(k.as_str(), v.as_str()); } let resp = req.send_lazy().map_err(|e| e.to_string())?; - use std::io::BufRead; - let reader = std::io::BufReader::new(resp); - Ok(Box::new(reader.lines())) + Ok(Box::new(LazyLineSplitter::new(resp))) + } +} + +/// Splits a `minreq::ResponseLazy` byte stream into lines, yielding each line +/// the instant a `\n` is seen rather than waiting for a read buffer to fill. +/// +/// `ResponseLazy` is an `Iterator>` that decodes +/// the (possibly chunked) body one byte at a time. Consuming it directly, and +/// emitting a `String` the moment a newline arrives, gives event-bound latency +/// for SSE-style upstreams that flush a short line then idle, instead of the +/// buffer-bound latency a `BufReader::lines()` wrapper imposes (it blocks on a +/// full ~8 KiB fill before surfacing any line). See ILO-489. +/// +/// `\n` terminates a line; a trailing `\r` (CRLF / chunked encoding) is +/// stripped to match the previous `BufReader::lines()` behaviour. At EOF any +/// buffered partial line (no final newline) is yielded once, then iteration +/// ends. Read errors surface as `Err(io::Error)`, consistent with the +/// `ILO-R009 http-stream read error` path the interpreter already raises. +#[cfg(all(feature = "http", not(target_arch = "wasm32")))] +struct LazyLineSplitter { + bytes: minreq::ResponseLazy, + buf: Vec, + done: bool, +} + +#[cfg(all(feature = "http", not(target_arch = "wasm32")))] +impl LazyLineSplitter { + fn new(bytes: minreq::ResponseLazy) -> Self { + LazyLineSplitter { + bytes, + buf: Vec::new(), + done: false, + } + } + + /// Turn the accumulated `buf` into a `String`, stripping a trailing `\r`, + /// and reset the buffer for the next line. + fn take_line(&mut self) -> std::result::Result { + if self.buf.last() == Some(&b'\r') { + self.buf.pop(); + } + let line = String::from_utf8_lossy(&self.buf).into_owned(); + self.buf.clear(); + Ok(line) + } +} + +#[cfg(all(feature = "http", not(target_arch = "wasm32")))] +impl Iterator for LazyLineSplitter { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + loop { + match self.bytes.next() { + Some(Ok((b'\n', _))) => return Some(self.take_line()), + Some(Ok((byte, _))) => self.buf.push(byte), + Some(Err(e)) => { + self.done = true; + let io_err = match e { + minreq::Error::IoError(e) => e, + other => std::io::Error::other(other.to_string()), + }; + return Some(Err(io_err)); + } + None => { + // EOF: emit any buffered trailing partial line once. + self.done = true; + if self.buf.is_empty() { + return None; + } + return Some(self.take_line()); + } + } + } } } diff --git a/tests/http_streaming.rs b/tests/http_streaming.rs index 13797087..daafb98d 100644 --- a/tests/http_streaming.rs +++ b/tests/http_streaming.rs @@ -10,8 +10,9 @@ use std::io::{BufRead, BufReader, Read, Write}; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::process::Command; +use std::sync::mpsc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; fn ilo() -> Command { Command::new(env!("CARGO_BIN_EXE_ilo")) @@ -285,6 +286,149 @@ fn backend_get_stream_native_iterates_lines() { assert_eq!(collected, vec!["one", "two", "three"]); } +// ── prompt-yield: first line arrives before the second is sent (ILO-489) ──── + +/// Spawn a server that emits one short chunk, flushes, waits `gap`, then emits +/// a second chunk, *without* closing the connection until told. Reports (over +/// `tx`) the instant the second line is written, so the test can prove the +/// client surfaced line 1 strictly before line 2 hit the wire. +/// +/// This is the regression for ILO-489: a `BufReader::lines()` wrapper would +/// block on a full read-buffer fill and not yield line 1 until line 2 (or the +/// connection close) arrived. The incremental byte-iterator splitter yields +/// line 1 the instant its `\n` lands. +fn spawn_slow_drip_server( + gap: Duration, +) -> (SocketAddr, mpsc::Receiver, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr"); + let (tx, rx) = mpsc::channel::(); + + let handle = thread::spawn(move || { + let Ok((mut stream, _)) = listener.accept() else { + return; + }; + let mut buf = [0u8; 8192]; + let _ = stream.read(&mut buf); + + let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\n\r\n"; + let _ = stream.write_all(head); + let _ = stream.flush(); + + // First line, flushed immediately. + let _ = stream.write_all(b"6\r\nfirst\n\r\n"); + let _ = stream.flush(); + + // Idle, mimicking an SSE upstream between events. + thread::sleep(gap); + + // Mark the instant line 2 goes on the wire, then send it. + let _ = tx.send(Instant::now()); + let _ = stream.write_all(b"7\r\nsecond\n\r\n"); + let _ = stream.write_all(b"0\r\n\r\n"); + let _ = stream.flush(); + }); + + (addr, rx, handle) +} + +#[test] +fn get_stream_yields_first_line_before_second_is_sent() { + use ilo::interpreter::http_wasm::default_backend; + + let gap = Duration::from_millis(300); + let (addr, rx, srv) = spawn_slow_drip_server(gap); + let url = format!("http://{addr}/events"); + + let backend = default_backend(); + let mut iter = backend + .get_stream(&url, &[]) + .expect("backend get_stream should open the connection"); + + // Pull the first line. With prompt yielding this returns as soon as + // "first\n" arrives, well before the server sends line 2. + let first = iter + .next() + .expect("expected a first line") + .expect("first line read ok"); + let first_seen = Instant::now(); + assert_eq!(first, "first"); + + // The server only stamps `line2_sent_at` after its `gap` sleep. Prove the + // client saw line 1 strictly before line 2 hit the wire (with margin). + let line2_sent_at = rx + .recv() + .expect("server should report when it sends line 2"); + assert!( + first_seen < line2_sent_at, + "first line should arrive before line 2 is sent: first_seen={first_seen:?} line2_sent_at={line2_sent_at:?}" + ); + assert!( + line2_sent_at.duration_since(first_seen) > gap / 2, + "first line should arrive with comfortable margin before line 2 ({:?} elapsed, gap {:?})", + line2_sent_at.duration_since(first_seen), + gap, + ); + + // Drain the rest so the connection closes cleanly. + let second = iter + .next() + .expect("expected a second line") + .expect("second line read ok"); + assert_eq!(second, "second"); + assert!(iter.next().is_none(), "stream should end after two lines"); + + srv.join().expect("server thread panicked"); +} + +// ── trailing partial line (no final newline) is yielded at EOF ────────────── + +/// Spawn a server whose final chunk has *no* trailing newline, then closes. +/// The splitter must still surface that partial line once before ending. +fn spawn_no_final_newline_server() -> (SocketAddr, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr"); + + let handle = thread::spawn(move || { + let Ok((mut stream, _)) = listener.accept() else { + return; + }; + let mut buf = [0u8; 8192]; + let _ = stream.read(&mut buf); + + let head = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n"; + let _ = stream.write_all(head); + // "done\n" then "tail" with no newline, then terminate. + let _ = stream.write_all(b"5\r\ndone\n\r\n"); + let _ = stream.write_all(b"4\r\ntail\r\n"); + let _ = stream.write_all(b"0\r\n\r\n"); + let _ = stream.flush(); + }); + + (addr, handle) +} + +#[test] +fn get_stream_yields_trailing_partial_line() { + use ilo::interpreter::http_wasm::default_backend; + + let (addr, srv) = spawn_no_final_newline_server(); + let url = format!("http://{addr}/partial"); + + let backend = default_backend(); + let iter = backend + .get_stream(&url, &[]) + .expect("backend get_stream should open the connection"); + let collected: Vec = iter.map(|r| r.expect("line read ok")).collect(); + srv.join().expect("server thread panicked"); + + assert_eq!( + collected, + vec!["done", "tail"], + "trailing newline-less line should be yielded at EOF" + ); +} + // Silence unused-import warning under cfgs that drop one of the helpers. #[allow(dead_code)] fn _force_use(_s: &TcpStream) {}