Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ For the release process and tag conventions, see [RELEASING.md](RELEASING.md).

## Unreleased

### Fixed

- **`get-stream` yields lines on newline, not buffer-fill (ILO-489).** The client-side streaming builtins (`get-stream`, `get-stream-h`, `pst-stream`, `pst-stream-h`, ILO-448) wrapped minreq's `ResponseLazy` in `BufReader::lines()`, which blocks on a full ~8 KiB read-buffer fill before surfacing any line. So a slow SSE upstream that flushes one short event then idles had its lines batched until the buffer filled or the connection closed - functionally correct but latency was buffer-bound, not event-bound. The line splitter now consumes `ResponseLazy`'s byte iterator incrementally and emits each line the instant its `\n` arrives (trailing `\r` stripped for CRLF / chunked encoding), so each event surfaces promptly. EOF still yields a trailing newline-less partial line; mid-stream errors still surface as `ILO-R009 http-stream read error: ...`. Unblocks ILO-482's previously flaky end-to-end streaming test.

### Added

- **`ilo httpd` resolves `use` imports (ILO-481).** Handler files loaded by `ilo httpd` now have their `use` imports resolved at startup, relative to the handler's own directory, matching the existing `ilo run` / `ilo check` semantics. Previously `httpd` lexed, parsed, and verified only the single handler file and silently skipped import resolution, so a handler could not `use` a sibling module - `ilo-lang/crew`'s `crew-server` had to inline ~140 lines of store logic to work around it. A missing module now surfaces a real import diagnostic and the server refuses to start instead of failing later with a generic verifier error. See `docs/streaming.md`.
Expand Down
2 changes: 1 addition & 1 deletion skills/ilo/ilo-builtins-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Timeout variants round up to the nearest second. Err on timeout or connection fa

**WASM targets**: `get` and `pst` work in WASM (`wasm32-wasi`, `wasm32-unknown-unknown`) via `fetch` host import (module `"ilo_http"`); host must export the import set — see `src/interpreter/http_wasm.rs`. `put`, `pat`, `del`, `hed`, `opt`, `get-to`, `pst-to`, `getx`, `pstx`, `get-many`, and `*-stream` are native-only. `--allow-net` enforced on WASM same as native.

**Streaming (ILO-46)**: `get-stream url[+hs]`, `pst-stream url body[+hs]` → lazy `L t` line iter, drained via `@line (get-stream url){...}`. Body never buffered. SSE / log tails / NDJSON. Mid-stream → `ILO-R009`. Tree+VM.
**Streaming (ILO-46)**: `get-stream url[+hs]`, `pst-stream url body[+hs]` → lazy `L t` line iter, drained via `@line (get-stream url){...}`. Body never buffered; each line yields the instant its newline arrives (event-bound, not buffer-bound), so a slow SSE upstream surfaces each event promptly. SSE / log tails / NDJSON. Mid-stream → `ILO-R009`. Tree+VM.

`getx url` / `pstx url body` (rich response, `R (M t _) t`): Ok-map with `status` (n), `headers` (M t t), `body` (t). Non-2xx is still Ok with status surfaced on the map; only transport failure is Err. Optional trailing request-headers map (M t t), same as `get`/`pst`. Use these when you need conditional requests (304), status-code branching (429), response-header reads (ETag, Link, X-RateLimit-*), or redirect following. Body-only `get`/`pst` stay cheaper for fire-and-forget; `getx`/`pstx` are the heavier variant. Response header names are lowercased on the Ok-map.

Expand Down
88 changes: 79 additions & 9 deletions src/interpreter/http_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,7 @@ impl HttpBackend for NativeHttpBackend {
req = req.with_header(k.as_str(), v.as_str());
}
let resp = req.send_lazy().map_err(|e| e.to_string())?;
// BufReader's `lines()` strips both `\n` and the trailing `\r` for
// `\r\n` chunked encoding, which is exactly what we want for SSE-
// style line consumption.
use std::io::BufRead;
let reader = std::io::BufReader::new(resp);
Ok(Box::new(reader.lines()))
Ok(Box::new(LazyLineSplitter::new(resp)))
}

fn post_stream(
Expand All @@ -170,9 +165,84 @@ impl HttpBackend for NativeHttpBackend {
req = req.with_header(k.as_str(), v.as_str());
}
let resp = req.send_lazy().map_err(|e| e.to_string())?;
use std::io::BufRead;
let reader = std::io::BufReader::new(resp);
Ok(Box::new(reader.lines()))
Ok(Box::new(LazyLineSplitter::new(resp)))
}
}

/// Splits a `minreq::ResponseLazy` byte stream into lines, yielding each line
/// the instant a `\n` is seen rather than waiting for a read buffer to fill.
///
/// `ResponseLazy` is an `Iterator<Item = io::Result<(u8, usize)>>` that decodes
/// the (possibly chunked) body one byte at a time. Consuming it directly, and
/// emitting a `String` the moment a newline arrives, gives event-bound latency
/// for SSE-style upstreams that flush a short line then idle, instead of the
/// buffer-bound latency a `BufReader::lines()` wrapper imposes (it blocks on a
/// full ~8 KiB fill before surfacing any line). See ILO-489.
///
/// `\n` terminates a line; a trailing `\r` (CRLF / chunked encoding) is
/// stripped to match the previous `BufReader::lines()` behaviour. At EOF any
/// buffered partial line (no final newline) is yielded once, then iteration
/// ends. Read errors surface as `Err(io::Error)`, consistent with the
/// `ILO-R009 http-stream read error` path the interpreter already raises.
#[cfg(all(feature = "http", not(target_arch = "wasm32")))]
struct LazyLineSplitter {
bytes: minreq::ResponseLazy,
buf: Vec<u8>,
done: bool,
}

#[cfg(all(feature = "http", not(target_arch = "wasm32")))]
impl LazyLineSplitter {
fn new(bytes: minreq::ResponseLazy) -> Self {
LazyLineSplitter {
bytes,
buf: Vec::new(),
done: false,
}
}

/// Turn the accumulated `buf` into a `String`, stripping a trailing `\r`,
/// and reset the buffer for the next line.
fn take_line(&mut self) -> std::result::Result<String, std::io::Error> {
if self.buf.last() == Some(&b'\r') {
self.buf.pop();
}
let line = String::from_utf8_lossy(&self.buf).into_owned();
self.buf.clear();
Ok(line)
}
}

#[cfg(all(feature = "http", not(target_arch = "wasm32")))]
impl Iterator for LazyLineSplitter {
type Item = std::result::Result<String, std::io::Error>;

fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
loop {
match self.bytes.next() {
Some(Ok((b'\n', _))) => return Some(self.take_line()),
Some(Ok((byte, _))) => self.buf.push(byte),
Some(Err(e)) => {
self.done = true;
let io_err = match e {
minreq::Error::IoError(e) => e,
other => std::io::Error::other(other.to_string()),
};
return Some(Err(io_err));
}
None => {
// EOF: emit any buffered trailing partial line once.
self.done = true;
if self.buf.is_empty() {
return None;
}
return Some(self.take_line());
}
}
}
}
}

Expand Down
146 changes: 145 additions & 1 deletion tests/http_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::process::Command;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

fn ilo() -> Command {
Command::new(env!("CARGO_BIN_EXE_ilo"))
Expand Down Expand Up @@ -285,6 +286,149 @@ fn backend_get_stream_native_iterates_lines() {
assert_eq!(collected, vec!["one", "two", "three"]);
}

// ── prompt-yield: first line arrives before the second is sent (ILO-489) ────

/// Spawn a server that emits one short chunk, flushes, waits `gap`, then emits
/// a second chunk, *without* closing the connection until told. Reports (over
/// `tx`) the instant the second line is written, so the test can prove the
/// client surfaced line 1 strictly before line 2 hit the wire.
///
/// This is the regression for ILO-489: a `BufReader::lines()` wrapper would
/// block on a full read-buffer fill and not yield line 1 until line 2 (or the
/// connection close) arrived. The incremental byte-iterator splitter yields
/// line 1 the instant its `\n` lands.
fn spawn_slow_drip_server(
gap: Duration,
) -> (SocketAddr, mpsc::Receiver<Instant>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
let (tx, rx) = mpsc::channel::<Instant>();

let handle = thread::spawn(move || {
let Ok((mut stream, _)) = listener.accept() else {
return;
};
let mut buf = [0u8; 8192];
let _ = stream.read(&mut buf);

let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\n\r\n";
let _ = stream.write_all(head);
let _ = stream.flush();

// First line, flushed immediately.
let _ = stream.write_all(b"6\r\nfirst\n\r\n");
let _ = stream.flush();

// Idle, mimicking an SSE upstream between events.
thread::sleep(gap);

// Mark the instant line 2 goes on the wire, then send it.
let _ = tx.send(Instant::now());
let _ = stream.write_all(b"7\r\nsecond\n\r\n");
let _ = stream.write_all(b"0\r\n\r\n");
let _ = stream.flush();
});

(addr, rx, handle)
}

#[test]
fn get_stream_yields_first_line_before_second_is_sent() {
use ilo::interpreter::http_wasm::default_backend;

let gap = Duration::from_millis(300);
let (addr, rx, srv) = spawn_slow_drip_server(gap);
let url = format!("http://{addr}/events");

let backend = default_backend();
let mut iter = backend
.get_stream(&url, &[])
.expect("backend get_stream should open the connection");

// Pull the first line. With prompt yielding this returns as soon as
// "first\n" arrives, well before the server sends line 2.
let first = iter
.next()
.expect("expected a first line")
.expect("first line read ok");
let first_seen = Instant::now();
assert_eq!(first, "first");

// The server only stamps `line2_sent_at` after its `gap` sleep. Prove the
// client saw line 1 strictly before line 2 hit the wire (with margin).
let line2_sent_at = rx
.recv()
.expect("server should report when it sends line 2");
assert!(
first_seen < line2_sent_at,
"first line should arrive before line 2 is sent: first_seen={first_seen:?} line2_sent_at={line2_sent_at:?}"
);
assert!(
line2_sent_at.duration_since(first_seen) > gap / 2,
"first line should arrive with comfortable margin before line 2 ({:?} elapsed, gap {:?})",
line2_sent_at.duration_since(first_seen),
gap,
);

// Drain the rest so the connection closes cleanly.
let second = iter
.next()
.expect("expected a second line")
.expect("second line read ok");
assert_eq!(second, "second");
assert!(iter.next().is_none(), "stream should end after two lines");

srv.join().expect("server thread panicked");
}

// ── trailing partial line (no final newline) is yielded at EOF ──────────────

/// Spawn a server whose final chunk has *no* trailing newline, then closes.
/// The splitter must still surface that partial line once before ending.
fn spawn_no_final_newline_server() -> (SocketAddr, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");

let handle = thread::spawn(move || {
let Ok((mut stream, _)) = listener.accept() else {
return;
};
let mut buf = [0u8; 8192];
let _ = stream.read(&mut buf);

let head = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n";
let _ = stream.write_all(head);
// "done\n" then "tail" with no newline, then terminate.
let _ = stream.write_all(b"5\r\ndone\n\r\n");
let _ = stream.write_all(b"4\r\ntail\r\n");
let _ = stream.write_all(b"0\r\n\r\n");
let _ = stream.flush();
});

(addr, handle)
}

#[test]
fn get_stream_yields_trailing_partial_line() {
use ilo::interpreter::http_wasm::default_backend;

let (addr, srv) = spawn_no_final_newline_server();
let url = format!("http://{addr}/partial");

let backend = default_backend();
let iter = backend
.get_stream(&url, &[])
.expect("backend get_stream should open the connection");
let collected: Vec<String> = iter.map(|r| r.expect("line read ok")).collect();
srv.join().expect("server thread panicked");

assert_eq!(
collected,
vec!["done", "tail"],
"trailing newline-less line should be yielded at EOF"
);
}

// Silence unused-import warning under cfgs that drop one of the helpers.
#[allow(dead_code)]
fn _force_use(_s: &TcpStream) {}
Loading