From d4720e57f770328941e992fe1a0e032d9053ca1b Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 29 May 2026 17:10:55 -0400 Subject: [PATCH 1/9] feat: add spam-stream subcommand for streaming tx specs (draft) Reads newline-delimited JSON FunctionCallDefinitions from stdin or a file and spams them via the existing TestScenario pipeline. Reuses agent pools, rate limiting, nonce management, and receipt tracking. See docs/stream-mode.md for the design note and scope. --- .../cli/src/commands/contender_subcommand.rs | 11 + crates/cli/src/commands/error.rs | 3 + crates/cli/src/commands/mod.rs | 1 + crates/cli/src/commands/spam_stream.rs | 538 ++++++++++++++++++ crates/cli/src/main.rs | 4 + docs/stream-mode.md | 187 ++++++ 6 files changed, 744 insertions(+) create mode 100644 crates/cli/src/commands/spam_stream.rs create mode 100644 docs/stream-mode.md diff --git a/crates/cli/src/commands/contender_subcommand.rs b/crates/cli/src/commands/contender_subcommand.rs index 92f7e25d..ec886b46 100644 --- a/crates/cli/src/commands/contender_subcommand.rs +++ b/crates/cli/src/commands/contender_subcommand.rs @@ -9,6 +9,7 @@ use crate::default_scenarios::BuiltinScenarioCli; use super::admin::AdminCommand; use super::spam::SpamCliArgs; +use super::spam_stream::SpamStreamCliArgs; use super::ReportFormat; #[derive(Debug, Subcommand)] @@ -34,6 +35,16 @@ pub enum ContenderSubcommand { builtin_scenario_config: Option, }, + #[command( + name = "spam-stream", + long_about = "Read newline-delimited JSON tx specs from stdin or a file and spam them. \ + Each line is a FunctionCallDefinition (same fields as scenario TOML `[[spam.tx]]`)." + )] + SpamStream { + #[command(flatten)] + args: Box, + }, + #[command( name = "setup", long_about = "Deploy contracts and execute one-time setup txs." diff --git a/crates/cli/src/commands/error.rs b/crates/cli/src/commands/error.rs index 9ab196f0..766febf6 100644 --- a/crates/cli/src/commands/error.rs +++ b/crates/cli/src/commands/error.rs @@ -78,6 +78,9 @@ pub enum ArgsError { #[error("failed to parse url")] UrlParse(#[from] url::ParseError), + + #[error("{0}")] + Custom(String), } #[derive(Debug, Error)] diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 6b834134..d3e6c834 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -8,6 +8,7 @@ pub mod replay; pub mod rpc; mod setup; mod spam; +pub mod spam_stream; use clap::{Parser, ValueEnum}; use contender_core::db::DbOps; diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs new file mode 100644 index 00000000..71e68fb2 --- /dev/null +++ b/crates/cli/src/commands/spam_stream.rs @@ -0,0 +1,538 @@ +//! `spam-stream` subcommand: read newline-delimited JSON tx specs from stdin +//! or a file, and spam them through the contender pipeline. +//! +//! This is the entry point for the "stream mode" prototype. See +//! `docs/stream-mode.md` for the design note. + +use crate::{ + commands::{ + common::{HELP_HEADING_COMMON, HELP_HEADING_PAYLOAD, HELP_HEADING_RUNTIME}, + error::ArgsError, + Result, + }, + error::CliError, + util::fund_accounts, + LATENCY_HIST as HIST, PROM, +}; +use alloy::{ + consensus::TxType, + network::{AnyTxEnvelope, Ethereum, NetworkTransactionBuilder}, + primitives::{utils::format_ether, U256}, + providers::Provider, + transports::http::reqwest::Url, +}; +use clap::Args; +use contender_core::{ + generator::{ + agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, + types::SpamRequest, FunctionCallDefinition, Generator, PlanConfig, RandSeed, + }, + spammer::tx_actor::{ActorContext, CacheTx}, + test_scenario::{TestScenario, TestScenarioParams}, + BundleType, +}; +use contender_sqlite::SqliteDb; +use contender_testfile::TestConfig; +use std::{ + path::PathBuf, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::mpsc, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +/// Default agent-pool name when the user does not specify one. +const DEFAULT_POOL: &str = "executors"; +/// Channel buffer between the reader task and the spam loop. +const STREAM_BUFFER: usize = 256; + +#[derive(Clone, Debug, Args)] +pub struct SpamStreamCliArgs { + /// RPC URL to send transactions to. + #[arg( + env = "RPC_URL", + short = 'r', + long, + default_value = "http://localhost:8545", + help_heading = HELP_HEADING_COMMON, + )] + pub rpc_url: Url, + + /// Funder private key. Used to fund the pool of executor accounts before spam begins. + #[arg( + env = "CONTENDER_PRIVATE_KEY", + short = 'p', + long = "priv-key", + help_heading = HELP_HEADING_COMMON, + )] + pub private_key: Option, + + /// Source of the JSON-lines stream: either `stdin` or a file path. + #[arg( + long = "from", + default_value = "stdin", + help_heading = HELP_HEADING_COMMON, + )] + pub from: String, + + /// Pool name used to source signers for each tx. Stream specs that omit + /// `from`/`from_pool` will default to this pool. + #[arg( + long = "from-pool", + default_value = DEFAULT_POOL, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub from_pool: String, + + /// Number of accounts to generate in the pool. + #[arg( + long = "pool-size", + default_value_t = 10, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub pool_size: usize, + + /// Target transactions per second. `0` means "send as fast as the stream + /// can be parsed". + #[arg( + long, + default_value_t = 0, + help_heading = HELP_HEADING_RUNTIME, + )] + pub tps: u64, + + /// Minimum balance to keep in each pool account, in wei. + #[arg( + long, + default_value_t = U256::from(10_000_000_000_000_000u128), + help_heading = HELP_HEADING_RUNTIME, + )] + pub min_balance: U256, + + /// Seed for deterministic pool-signer generation. + #[arg( + env = "CONTENDER_SEED", + long, + help_heading = HELP_HEADING_RUNTIME, + )] + pub seed: Option, + + /// Skip funding the executor accounts. Useful when the pool was funded + /// out-of-band or every spec carries a `from` address with an existing balance. + #[arg(long, default_value_t = false, help_heading = HELP_HEADING_RUNTIME)] + pub skip_funding: bool, +} + +/// Asynchronously reads JSON lines from `from` (stdin or file path), parses each +/// into a `FunctionCallDefinition`, and forwards it to `tx`. The task exits when +/// EOF is reached or the receiver drops. +pub fn spawn_stream_reader( + from: &str, + tx: mpsc::Sender, +) -> Result> { + let handle: tokio::task::JoinHandle<()> = if from == "stdin" { + tokio::spawn(async move { + let reader = BufReader::new(tokio::io::stdin()); + forward_lines(reader, tx).await; + }) + } else { + let path = PathBuf::from(from); + if !path.exists() { + return Err(CliError::Args(ArgsError::Custom(format!( + "stream source file not found: {}", + path.display() + )))); + } + tokio::spawn(async move { + match tokio::fs::File::open(&path).await { + Ok(f) => { + let reader = BufReader::new(f); + forward_lines(reader, tx).await; + } + Err(e) => warn!("failed to open stream source {}: {e}", path.display()), + } + }) + }; + Ok(handle) +} + +async fn forward_lines(reader: R, tx: mpsc::Sender) +where + R: tokio::io::AsyncBufRead + Unpin, +{ + let mut lines = reader.lines(); + let mut line_no: u64 = 0; + loop { + match lines.next_line().await { + Ok(Some(line)) => { + line_no += 1; + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + continue; + } + match serde_json::from_str::(trimmed) { + Ok(spec) => { + if tx.send(spec).await.is_err() { + // receiver dropped — stop reading + return; + } + } + Err(e) => warn!("stream: skipping malformed line {line_no}: {e}"), + } + } + Ok(None) => return, // EOF + Err(e) => { + warn!("stream: read error: {e}"); + return; + } + } + } +} + +/// Build a one-step `TestConfig` that references `from_pool`, so the scenario +/// builds an agent store containing that pool with `pool_size` signers. +/// The decoy entry is never executed; we bypass `load_txs` entirely. +fn build_decoy_config(from_pool: &str) -> TestConfig { + let decoy = FunctionCallDefinition::new("0x0000000000000000000000000000000000000000") + .with_from_pool(from_pool); + TestConfig { + env: None, + create: None, + setup: None, + spam: Some(vec![SpamRequest::Tx(Box::new(decoy))]), + } +} + +/// Drive the stream loop: pull specs, build/sign/send txs, cache in the +/// tx_actor for receipt tracking. Returns when the stream channel closes. +async fn drive_stream( + scenario: &mut TestScenario, + mut rx: mpsc::Receiver, + run_id: u64, + fallback_pool: String, + tps: u64, + cancel: CancellationToken, +) -> Result +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // Rate limiter: only ticks when tps > 0. + let mut ticker = if tps > 0 { + let period = Duration::from_secs_f64(1.0 / tps as f64); + let mut int = tokio::time::interval(period); + int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + Some(int) + } else { + None + }; + + let mut sent: usize = 0; + let mut idx: usize = 0; + let placeholder_map = std::collections::HashMap::::new(); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("stream cancelled; flushing."); + break; + } + spec = rx.recv() => { + let Some(mut spec) = spec else { + debug!("stream EOF received"); + break; + }; + + // Apply default pool if the spec doesn't pick one. + if spec.from.is_none() && spec.from_pool.is_none() { + spec.from_pool = Some(fallback_pool.clone()); + } + + // Rate limit (only when --tps > 0). + if let Some(int) = ticker.as_mut() { + int.tick().await; + } + + match send_one(scenario, &spec, idx, &placeholder_map, run_id).await { + Ok(()) => { + sent += 1; + } + Err(e) => warn!("stream: failed to send tx (idx {idx}): {e}"), + } + idx += 1; + } + } + } + + Ok(sent) +} + +/// Build a single transaction from a stream spec, sign it, send it, and cache +/// it in the tx_actor for receipt tracking. +async fn send_one( + scenario: &mut TestScenario, + spec: &FunctionCallDefinition, + idx: usize, + placeholder_map: &std::collections::HashMap, + _run_id: u64, +) -> Result<()> +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // 1. Resolve `from`/`from_pool` and access list against the scenario's + // agent store + templater. This produces a strict FunctionCallDefinition. + let strict = scenario + .make_strict_call(spec, idx) + .map_err(contender_core::Error::Generator)?; + + // 2. Render the strict definition into a TransactionRequest (encodes + // calldata, threads access_list, sets value/gas_limit). + let tx_req = scenario + .get_templater() + .template_function_call(&strict, placeholder_map) + .map_err(contender_core::Error::Templater)?; + + // 3. Fetch a gas price and assign nonce/gas-limit + sign. + let gas_price = scenario.rpc_client.get_gas_price().await?; + let (prepared, wallet) = scenario.prepare_tx_request(&tx_req, gas_price, 0).await?; + // Build & sign via the alloy Ethereum network. The op-alloy-network re-export + // can create trait-resolution ambiguity, so we fully-qualify the trait + // method and convert the error string-ly instead of relying on From. + let envelope = + >::build( + prepared, &wallet, + ) + .await + .map_err(|e| CliError::Args(ArgsError::Custom(format!("build envelope: {e}"))))?; + let tx_hash = envelope.tx_hash().to_owned(); + + // 4. Send via the same txs_client the regular spammer uses. + let any_envelope = AnyTxEnvelope::Ethereum(envelope); + let start_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let res = scenario.txs_client.send_tx_envelope(any_envelope).await; + let error = match res { + Ok(_) => { + info!("stream tx[{idx}]: {tx_hash} sent"); + None + } + Err(e) => { + let msg = e + .as_error_resp() + .map(|err| err.message.to_string()) + .unwrap_or_else(|| format!("{e}")); + warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); + Some(msg) + } + }; + + // 5. Cache in the tx_actor so its flush loop polls for the receipt. + scenario + .tx_actor() + .cache_run_tx(CacheTx { + tx_hash, + start_timestamp_ms: start_ms, + end_timestamp_ms: None, + kind: spec.kind.clone(), + error, + }) + .await?; + + Ok(()) +} + +/// Top-level entry point invoked from `main.rs`. +pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { + let seed = if let Some(s) = &args.seed { + RandSeed::seed_from_str(s) + } else { + RandSeed::new() + }; + + // Build a decoy TestConfig + agent store so the scenario sets up the + // requested pool with the requested number of signers. + let config = build_decoy_config(&args.from_pool); + let agent_spec = AgentSpec::default() + .create_accounts(0) + .setup_accounts(0) + .spam_accounts(args.pool_size); + + let funder = if let Some(key) = &args.private_key { + let s = key.trim().trim_start_matches("0x"); + Some( + alloy::signers::local::PrivateKeySigner::from_slice( + &alloy::hex::decode(s) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + } else { + None + }; + // TestScenario needs at least one user signer for signer_map. Use a + // throwaway signer if none was provided (we only sign with pool accounts). + let user_signers = if let Some(s) = &funder { + vec![s.clone()] + } else { + vec![alloy::signers::local::PrivateKeySigner::random()] + }; + + let cancel = CancellationToken::new(); + let params = TestScenarioParams { + rpc_url: args.rpc_url.clone(), + builder_rpc_url: None, + txs_rpc_url: None, + signers: user_signers, + agent_spec, + tx_type: TxType::Eip1559, + bundle_type: BundleType::default(), + pending_tx_timeout: Duration::from_secs(60), + extra_msg_handles: None, + sync_nonces_after_batch: false, + rpc_batch_size: 0, + gas_price: None, + scenario_label: Some(format!("stream-{}", args.from_pool)), + send_raw_tx_sync: false, + flashblocks_ws_url: None, + }; + + let mut scenario: TestScenario = TestScenario::new( + config, + Arc::new(db.clone()), + seed, + params, + None, + (&PROM, &HIST).into(), + &cancel, + ) + .await?; + + // Fund the pool from the funder key if provided. + if !args.skip_funding { + if let Some(funder) = &funder { + let addrs = scenario.agent_store.all_signer_addresses(); + if !addrs.is_empty() { + info!( + "funding {} pool account(s) to {} ETH min from {}", + addrs.len(), + format_ether(args.min_balance), + funder.address() + ); + fund_accounts( + &addrs, + funder, + &scenario.rpc_client, + args.min_balance, + TxType::Legacy, + &Default::default(), + ) + .await?; + // Re-sync nonces for the freshly-funded accounts so the first + // sent tx uses the correct nonce. + scenario.sync_nonces().await?; + } + } else { + warn!("no funder key supplied; pool accounts must already be funded"); + } + } + + // Set up tx_actor context so cached txs flush to the DB. + let start_block = scenario.rpc_client.get_block_number().await?; + let run_id = 0u64; + let actor_ctx = + ActorContext::new(start_block, run_id).with_pending_tx_timeout(Duration::from_secs(60)); + scenario.tx_actor().init_ctx(actor_ctx).await?; + + // Spawn the reader and run the drive loop. + let (tx, rx) = mpsc::channel::(STREAM_BUFFER); + let _reader = spawn_stream_reader(&args.from, tx)?; + + let drive_cancel = cancel.clone(); + let sent = tokio::select! { + res = drive_stream(&mut scenario, rx, run_id, args.from_pool.clone(), args.tps, drive_cancel) => { + res? + } + _ = tokio::signal::ctrl_c() => { + info!("CTRL-C: stopping stream loop"); + cancel.cancel(); + 0 + } + }; + + info!("stream complete: {sent} tx(s) sent; draining pending receipts..."); + + tokio::select! { + _ = scenario.dump_tx_cache(run_id) => {} + _ = tokio::signal::ctrl_c() => { + info!("CTRL-C during drain; exiting"); + } + } + scenario.shutdown().await; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_minimal_spec() { + let line = + r#"{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.to, "0xdeAD000000000000000000000000000000000000"); + assert_eq!(spec.value.as_deref(), Some("1")); + assert_eq!(spec.gas_limit, Some(21000)); + assert!(spec.from.is_none() && spec.from_pool.is_none()); + } + + #[test] + fn parses_spec_with_signature_and_args() { + let line = r#"{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "gas_limit": 200000 + }"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.signature.as_deref(), Some("validateMessage(bytes32)")); + assert_eq!(spec.args.as_ref().unwrap().len(), 1); + assert_eq!(spec.gas_limit, Some(200000)); + } + + #[tokio::test] + async fn forward_lines_skips_blank_and_comments_and_emits_specs() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"\n# this is a comment\n{\"to\":\"0xdeAD000000000000000000000000000000000000\",\"value\":\"1\"}\n\n{\"to\":\"0xdeAD000000000000000000000000000000000001\",\"value\":\"2\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 2); + assert_eq!(received[0].value.as_deref(), Some("1")); + assert_eq!(received[1].value.as_deref(), Some("2")); + } + + #[tokio::test] + async fn forward_lines_skips_malformed_lines() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"not json at all\n{\"to\":\"0xdeAD000000000000000000000000000000000000\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 1); + } +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 4e0af9a5..f98b519f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -140,6 +140,10 @@ async fn run() -> Result<(), contender_cli::Error> { commands::spam(&db, &spam_args, SpamCampaignContext::default()).await?; } + ContenderSubcommand::SpamStream { args } => { + contender_cli::commands::spam_stream::spam_stream(&db, *args).await?; + } + ContenderSubcommand::Server => { contender_cli::server::run() .await diff --git a/docs/stream-mode.md b/docs/stream-mode.md new file mode 100644 index 00000000..9186284e --- /dev/null +++ b/docs/stream-mode.md @@ -0,0 +1,187 @@ +# Stream Mode (`spam-stream`) + +> **Status:** prototype / draft. This document accompanies the +> initial implementation of `spam-stream`. Expect the CLI surface and the +> stream format to evolve based on review feedback. + +## Motivation + +Contender is a TPS spammer driven by static scenario TOML files. It cycles +through `[[spam]]` entries, fuzzes args, and sends at a configured rate. + +Some use cases need to feed *dynamically discovered* tx specs into the spam +loop. For example, an interop relayer reads message-emitted events on chain A +and needs to execute the corresponding `validateMessage(...)` call on chain B +— with the right access list, calldata, and target. Today these workflows +end up reimplementing rate limiting, signer pools, nonce management, and +receipt tracking outside contender. + +Stream mode lets contender act as the "sender" half of those workflows: any +upstream process can pipe JSON tx specs into contender and reuse the existing +agent pools, rate limiter, `tx_actor` receipt tracking, gas-price caching, +and Prometheus latency metrics. + +## CLI + +```bash +contender spam-stream \ + -r https://chain-b \ + -p $FUNDING_KEY \ + --from \ + --from-pool executors --pool-size 10 \ + --tps 5 +``` + +Key flags: + +| Flag | Default | Meaning | +|------|---------|---------| +| `-r, --rpc-url` | `http://localhost:8545` | Target RPC. | +| `-p, --priv-key` | none | Funder key (funds the pool before spam starts). | +| `--from` | `stdin` | `stdin` or a file path. | +| `--from-pool` | `executors` | Pool name. Specs that omit `from`/`from_pool` use this pool. | +| `--pool-size` | `10` | Accounts generated in the pool. | +| `--tps` | `0` | `0` = consume as fast as channel emits. | +| `--min-balance` | `0.01 ETH` (wei) | Min pool-account balance during funding. | +| `--skip-funding` | `false` | Skip pre-spam funding. | +| `--seed` | random | Deterministic pool generation. | + +## Stream format + +Newline-delimited JSON, one [`FunctionCallDefinition`](../crates/core/src/generator/function_def.rs) +per line. Same field names as scenario TOML. + +Minimal: + +```json +{"to":"0xdeAD000000000000000000000000000000000000","value":"1 wei","gas_limit":21000} +``` + +Full (interop-style): + +```json +{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "access_list": [ + { + "address": "0x4200000000000000000000000000000000000022", + "storageKeys": ["0x0100000000000000000000000000000000000000000000000000000000000000"] + } + ], + "gas_limit": 200000 +} +``` + +Empty lines and lines beginning with `#` are ignored. Malformed JSON lines +log a warning and the loop continues. + +## Architecture + +Stream mode does **not** introduce a new spammer trait or a new tx pipeline. +It reuses the existing `TestScenario` machinery and wires a JSON-line reader +to it through an mpsc channel. + +``` +stdin/file -> reader task -> mpsc + | + v + drive_stream loop + | + v + for each spec: + scenario.make_strict_call (Generator trait, resolves from_pool + access_list) + scenario.config.template_function_call (Templater, builds TransactionRequest) + scenario.prepare_tx_request (assigns nonce, gas limit, signs key from pool) + scenario.txs_client.send_tx_envelope + scenario.tx_actor().cache_run_tx (queues for receipt polling) +``` + +Code map: + +- [`crates/cli/src/commands/spam_stream.rs`](../crates/cli/src/commands/spam_stream.rs) + is the new subcommand. All logic lives there. +- A "decoy" one-step `TestConfig` is constructed so the existing + `AgentPools::build_agent_store` produces a pool with the requested name and + size. The decoy spam step is never executed; we bypass `load_txs` entirely. +- The reader task is a small wrapper around `tokio::io::BufReader::lines()` + that forwards parsed specs over an mpsc channel and exits on EOF. +- The drive loop honors `--tps` via `tokio::time::interval`. + +## Reuse vs. new code + +| Existing piece | Reused as-is | +|----------------|--------------| +| `TestScenario` constructor (signer map, nonce sync, txs_client) | yes | +| `Generator::make_strict_call` (resolves `from_pool`, access list, EIP-7702) | yes | +| `Templater::template_function_call` (calldata encoding, access list threading) | yes | +| `TestScenario::prepare_tx_request` (nonce, gas limit, complete_tx_request) | yes | +| Pool generation via `AgentPools::build_agent_store` | yes | +| `TxActorHandle::cache_run_tx` + flush loop (DB writes, receipt polling) | yes | +| `fund_accounts` helper | yes | +| Prometheus latency histograms via Tower middleware | yes (inherited from `TestScenario`) | +| The `Spammer` trait + `TimedSpammer`/`BlockwiseSpammer` | **not** reused | + +The reason we skip `TimedSpammer` is that its `on_spam` loop drives ticks +from a pre-loaded `Vec>` returned by +`get_spam_tx_chunks`. Stream mode wants a stream-shaped tick: pull one spec, +send one tx. Bolting the channel into `TimedSpammer` would require a generic +`SpamSource` abstraction across the existing spammers. Out of scope for the +prototype; a candidate for a follow-up if the prototype lands. + +## Scope of the prototype + +In scope: + +- `--from stdin|FILE`, `--from-pool`, `--pool-size`, `--tps`, `--priv-key`, `--rpc-url`, `--seed`, `--min-balance`, `--skip-funding`. +- Same `FunctionCallDefinition` schema as scenario TOML (incl. `access_list`, + `gas_limit`, `signature`, `args`, `value`, `from`, `from_pool`, `kind`). +- Pool funding from `--priv-key` before spam begins. +- Receipt tracking + DB persistence via the existing tx_actor flush loop. +- Graceful CTRL-C and stream EOF handling (drain pending receipts before exit). + +Out of scope (future work): + +- Bundle support (`[[spam.bundle]]` analogue in the stream). +- Blob transactions (EIP-4844) and authorization transactions (EIP-7702) — + the `FunctionCallDefinition` fields are deserialized but not exercised in + prototype tests. +- Fuzzing in stream mode — fuzz happens upstream of the stream. +- Gas-bump / nonce-shift retry logic from the regular spammer's + `handle_tx_outcome`. The stream loop currently logs send errors and moves + on; the upstream is expected to resubmit if it cares. +- `--rpc-batch-size`, `--send-raw-tx-sync` integration. +- Recording the run in the `spam_runs` table — stream runs use `run_id = 0` + and rely on the tx_actor's cache only. +- A generic `SpamSource` trait so `TimedSpammer` can consume a stream too. + +## Validation + +Unit tests live alongside the implementation: + +``` +cargo test -p contender_cli spam_stream +``` + +Smoke test: + +```bash +echo '{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}' | \ + contender spam-stream -r $RPC -p $FUNDING_KEY --from stdin --tps 1 +``` + +The tx should land on the target chain; the funder needs at least enough ETH +to fund the executor pool. + +## Open questions + +1. Should stream mode get its own `Spammer` impl in `contender_core` so + campaigns can reuse it? Today the prototype lives entirely in `cli/`. +2. Is the JSON spec the right shape, or should we standardize on something + like a tagged envelope (`{"v":1,"tx":{...}}`) so we can evolve it later? +3. How should errors propagate back to the upstream producer? Currently the + only feedback is the DB + logs. A structured response stream (stdout JSON + lines mirroring the input) would be valuable for reactive callers. +4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is + "one in flight at a time" acceptable for the relayer case? From c7ddec89d8877eb2f72ea49fa1ccf5b0bbbcc6aa Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Tue, 2 Jun 2026 09:16:09 -0400 Subject: [PATCH 2/9] feat(spam-stream): versioned stdout envelope + drop decoy TestConfig - Address review feedback on PR #589: - #2/#3: emit structured, versioned/tagged JSON envelope on stdout (one tx_result event per spec) so the schema can evolve; default for spam-stream mode (logs stay on stderr). - #5: replace the decoy zero-address TestConfig with a direct AgentStore pool, injecting signers into the scenario and syncing nonces from the RPC. --- crates/cli/src/commands/spam_stream.rs | 123 +++++++++++++++++++++---- docs/stream-mode.md | 39 ++++++-- 2 files changed, 137 insertions(+), 25 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index 71e68fb2..e37e9854 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -23,9 +23,10 @@ use alloy::{ }; use clap::Args; use contender_core::{ + agent_controller::{AgentClass, AgentStore}, generator::{ agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, - types::SpamRequest, FunctionCallDefinition, Generator, PlanConfig, RandSeed, + FunctionCallDefinition, Generator, PlanConfig, RandSeed, }, spammer::tx_actor::{ActorContext, CacheTx}, test_scenario::{TestScenario, TestScenarioParams}, @@ -33,6 +34,7 @@ use contender_core::{ }; use contender_sqlite::SqliteDb; use contender_testfile::TestConfig; +use serde::Serialize; use std::{ path::PathBuf, sync::Arc, @@ -49,6 +51,52 @@ use tracing::{debug, info, warn}; const DEFAULT_POOL: &str = "executors"; /// Channel buffer between the reader task and the spam loop. const STREAM_BUFFER: usize = 256; +/// Schema version of the structured stdout output. Bump when the envelope +/// shape changes in a backward-incompatible way. +const OUTPUT_VERSION: u32 = 1; + +/// Versioned, tagged envelope written to stdout (one JSON line per event) so +/// downstream consumers can evolve with the schema. The `version` field pins +/// the schema and the `type` tag (via `payload`) discriminates event kinds. +#[derive(Debug, Serialize)] +struct StreamEvent { + version: u32, + #[serde(flatten)] + payload: StreamPayload, +} + +#[derive(Debug, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum StreamPayload { + /// Emitted once per input spec after the send attempt. + TxResult { + /// Zero-based index of the spec in the stream. + idx: usize, + /// Transaction hash (present even when the send RPC call failed). + tx_hash: String, + /// Unix-epoch milliseconds when the send was attempted. + start_timestamp_ms: u128, + /// Optional `kind` carried over from the input spec. + #[serde(skip_serializing_if = "Option::is_none")] + kind: Option, + /// Send-time error from the RPC, if any. + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + }, +} + +impl StreamEvent { + fn emit(payload: StreamPayload) { + let event = StreamEvent { + version: OUTPUT_VERSION, + payload, + }; + // Serialization of these fixed-shape payloads cannot fail. + if let Ok(line) = serde_json::to_string(&event) { + println!("{line}"); + } + } +} #[derive(Clone, Debug, Args)] pub struct SpamStreamCliArgs { @@ -193,18 +241,16 @@ where } } -/// Build a one-step `TestConfig` that references `from_pool`, so the scenario -/// builds an agent store containing that pool with `pool_size` signers. -/// The decoy entry is never executed; we bypass `load_txs` entirely. -fn build_decoy_config(from_pool: &str) -> TestConfig { - let decoy = FunctionCallDefinition::new("0x0000000000000000000000000000000000000000") - .with_from_pool(from_pool); - TestConfig { - env: None, - create: None, - setup: None, - spam: Some(vec![SpamRequest::Tx(Box::new(decoy))]), +/// Build an `AgentStore` holding a single spam pool named `from_pool` with +/// `pool_size` signers, derived from `seed`. Stream mode provisions the pool +/// directly instead of round-tripping through a scenario `TestConfig`, since +/// it never executes any pre-defined spam steps. +fn build_pool_agent_store(from_pool: &str, pool_size: usize, seed: &RandSeed) -> AgentStore { + let mut store = AgentStore::new(); + if pool_size > 0 { + store.add_new_agent(from_pool, pool_size, seed, AgentClass::Spammer); } + store } /// Drive the stream loop: pull specs, build/sign/send txs, cache in the @@ -333,7 +379,17 @@ where } }; - // 5. Cache in the tx_actor so its flush loop polls for the receipt. + // 5. Emit a structured result line to stdout (mirrors the input stream so + // reactive callers can correlate sends with their specs). + StreamEvent::emit(StreamPayload::TxResult { + idx, + tx_hash: tx_hash.to_string(), + start_timestamp_ms: start_ms, + kind: spec.kind.clone(), + error: error.clone(), + }); + + // 6. Cache in the tx_actor so its flush loop polls for the receipt. scenario .tx_actor() .cache_run_tx(CacheTx { @@ -356,9 +412,11 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { RandSeed::new() }; - // Build a decoy TestConfig + agent store so the scenario sets up the - // requested pool with the requested number of signers. - let config = build_decoy_config(&args.from_pool); + // Provision the executor pool directly. Stream mode never runs pre-defined + // create/setup/spam steps, so the scenario starts from an empty config and + // we inject the pool's signers below. + let config = TestConfig::new(); + let pool_store = build_pool_agent_store(&args.from_pool, args.pool_size, &seed); let agent_spec = AgentSpec::default() .create_accounts(0) .setup_accounts(0) @@ -414,6 +472,16 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { ) .await?; + // Register the pool's signers with the scenario so it can sign with them + // and track their nonces, then sync nonces from the RPC. + for (_name, store) in pool_store.all_agents() { + for signer in &store.signers { + scenario.signer_map.insert(signer.address(), signer.clone()); + } + } + scenario.agent_store = pool_store; + scenario.sync_nonces().await?; + // Fund the pool from the funder key if provided. if !args.skip_funding { if let Some(funder) = &funder { @@ -483,6 +551,29 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { mod tests { use super::*; + #[test] + fn tx_result_envelope_is_versioned_and_tagged() { + let event = StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::TxResult { + idx: 3, + tx_hash: "0xabc".to_string(), + start_timestamp_ms: 1_733_155_200_000, + kind: Some("validate".to_string()), + error: None, + }, + }; + let json: serde_json::Value = + serde_json::from_str(&serde_json::to_string(&event).unwrap()).unwrap(); + assert_eq!(json["version"], 1); + assert_eq!(json["type"], "tx_result"); + assert_eq!(json["idx"], 3); + assert_eq!(json["tx_hash"], "0xabc"); + assert_eq!(json["kind"], "validate"); + // `error` is omitted when None. + assert!(json.get("error").is_none()); + } + #[test] fn parses_minimal_spec() { let line = diff --git a/docs/stream-mode.md b/docs/stream-mode.md index 9186284e..4dc6d133 100644 --- a/docs/stream-mode.md +++ b/docs/stream-mode.md @@ -102,13 +102,29 @@ Code map: - [`crates/cli/src/commands/spam_stream.rs`](../crates/cli/src/commands/spam_stream.rs) is the new subcommand. All logic lives there. -- A "decoy" one-step `TestConfig` is constructed so the existing - `AgentPools::build_agent_store` produces a pool with the requested name and - size. The decoy spam step is never executed; we bypass `load_txs` entirely. +- The scenario starts from an empty `TestConfig`; the executor pool is + provisioned directly via `AgentStore::add_new_agent` and its signers are + registered with the scenario, then nonces are synced from the RPC. - The reader task is a small wrapper around `tokio::io::BufReader::lines()` that forwards parsed specs over an mpsc channel and exits on EOF. - The drive loop honors `--tps` via `tokio::time::interval`. +## Structured output + +`spam-stream` writes a structured, newline-delimited JSON event stream to +**stdout** (human-readable logs go to stderr via `tracing`). Each event is a +versioned, tagged envelope so the schema can evolve without breaking +consumers: + +```json +{"version":1,"type":"tx_result","idx":0,"tx_hash":"0x...","start_timestamp_ms":1733155200000,"kind":"validate","error":null} +``` + +- `version` pins the schema (bump on breaking changes). +- `type` discriminates the event kind (currently only `tx_result`). +- One `tx_result` is emitted per input spec after the send attempt; `error` is + present only when the send RPC call failed. + ## Reuse vs. new code | Existing piece | Reused as-is | @@ -178,10 +194,15 @@ to fund the executor pool. 1. Should stream mode get its own `Spammer` impl in `contender_core` so campaigns can reuse it? Today the prototype lives entirely in `cli/`. -2. Is the JSON spec the right shape, or should we standardize on something - like a tagged envelope (`{"v":1,"tx":{...}}`) so we can evolve it later? -3. How should errors propagate back to the upstream producer? Currently the - only feedback is the DB + logs. A structured response stream (stdout JSON - lines mirroring the input) would be valuable for reactive callers. + *(Deferred to a follow-up: refactoring the `Spammer` trait is out of scope + for the prototype.)* +2. ~~Is the JSON spec the right shape, or should we standardize on a tagged + envelope so we can evolve it later?~~ **Resolved:** the stdout output is now + a versioned, tagged envelope (`{"version":1,"type":"tx_result",...}`). See + "Structured output" above. +3. ~~How should errors propagate back to the upstream producer?~~ **Resolved:** + `spam-stream` emits a structured `tx_result` event per spec on stdout + (including send errors), in addition to the DB + logs. 4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is - "one in flight at a time" acceptable for the relayer case? + "one in flight at a time" acceptable for the relayer case? *(Deferred: + parallel sends judged not worth the effort for the prototype.)* From 3e53066389b68698cb68df85752c86f8288c649f Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Tue, 2 Jun 2026 10:13:35 -0400 Subject: [PATCH 3/9] fix(spam-stream): record run, cache gas price, surface backpressure/summary Address review gaps on the prototype: - record a real run via insert_run so dumped receipts aren't orphaned under run_id 0 (run_txs has a foreign key into runs) - cache the gas price, refreshing every 6s instead of once per tx - emit `backpressure` and a terminal `summary` event; track sent vs failed - reject blob (4844) / setCode (7702) specs up front - unify the stdin/file reader and drop dead run_id plumbing --- crates/cli/src/commands/spam_stream.rs | 208 ++++++++++++++++++------- docs/stream-mode.md | 19 ++- 2 files changed, 164 insertions(+), 63 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index e37e9854..0368a73d 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -24,6 +24,7 @@ use alloy::{ use clap::Args; use contender_core::{ agent_controller::{AgentClass, AgentStore}, + db::{DbOps, SpamDuration, SpamRunRequest}, generator::{ agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, FunctionCallDefinition, Generator, PlanConfig, RandSeed, @@ -36,12 +37,13 @@ use contender_sqlite::SqliteDb; use contender_testfile::TestConfig; use serde::Serialize; use std::{ + collections::HashMap, path::PathBuf, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::{ - io::{AsyncBufReadExt, BufReader}, + io::{AsyncBufRead, AsyncBufReadExt, BufReader}, sync::mpsc, }; use tokio_util::sync::CancellationToken; @@ -54,6 +56,9 @@ const STREAM_BUFFER: usize = 256; /// Schema version of the structured stdout output. Bump when the envelope /// shape changes in a backward-incompatible way. const OUTPUT_VERSION: u32 = 1; +/// How often to refresh the cached gas price during the stream loop. Avoids an +/// RPC round-trip on every single tx. +const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(6); /// Versioned, tagged envelope written to stdout (one JSON line per event) so /// downstream consumers can evolve with the schema. The `version` field pins @@ -83,6 +88,10 @@ enum StreamPayload { #[serde(skip_serializing_if = "Option::is_none")] error: Option, }, + /// Buffer saturated; the reader is blocking. Signals the producer to slow down. + Backpressure { queued: usize, capacity: usize }, + /// Emitted once when the stream finishes (EOF or cancellation). + Summary { sent: usize, failed: usize }, } impl StreamEvent { @@ -182,30 +191,29 @@ pub fn spawn_stream_reader( from: &str, tx: mpsc::Sender, ) -> Result> { - let handle: tokio::task::JoinHandle<()> = if from == "stdin" { - tokio::spawn(async move { - let reader = BufReader::new(tokio::io::stdin()); - forward_lines(reader, tx).await; - }) - } else { - let path = PathBuf::from(from); - if !path.exists() { - return Err(CliError::Args(ArgsError::Custom(format!( - "stream source file not found: {}", - path.display() - )))); - } - tokio::spawn(async move { - match tokio::fs::File::open(&path).await { - Ok(f) => { - let reader = BufReader::new(f); - forward_lines(reader, tx).await; + // Validate file existence eagerly so a bad path is a clean CLI error rather + // than a silent warning from inside the spawned task. + if from != "stdin" && !PathBuf::from(from).exists() { + return Err(CliError::Args(ArgsError::Custom(format!( + "stream source file not found: {from}" + )))); + } + + let from = from.to_owned(); + Ok(tokio::spawn(async move { + let reader: Box = if from == "stdin" { + Box::new(BufReader::new(tokio::io::stdin())) + } else { + match tokio::fs::File::open(&from).await { + Ok(f) => Box::new(BufReader::new(f)), + Err(e) => { + warn!("failed to open stream source {from}: {e}"); + return; } - Err(e) => warn!("failed to open stream source {}: {e}", path.display()), } - }) - }; - Ok(handle) + }; + forward_lines(reader, tx).await; + })) } async fn forward_lines(reader: R, tx: mpsc::Sender) @@ -214,6 +222,8 @@ where { let mut lines = reader.lines(); let mut line_no: u64 = 0; + // Emit the backpressure event once per saturation episode, not per blocked send. + let mut backpressured = false; loop { match lines.next_line().await { Ok(Some(line)) => { @@ -223,12 +233,24 @@ where continue; } match serde_json::from_str::(trimmed) { - Ok(spec) => { - if tx.send(spec).await.is_err() { - // receiver dropped — stop reading - return; + Ok(spec) => match tx.try_send(spec) { + Ok(()) => backpressured = false, + Err(mpsc::error::TrySendError::Full(spec)) => { + if !backpressured { + backpressured = true; + let capacity = tx.max_capacity(); + StreamEvent::emit(StreamPayload::Backpressure { + queued: capacity.saturating_sub(tx.capacity()), + capacity, + }); + } + // Block until a slot frees, applying real backpressure. + if tx.send(spec).await.is_err() { + return; // receiver dropped + } } - } + Err(mpsc::error::TrySendError::Closed(_)) => return, + }, Err(e) => warn!("stream: skipping malformed line {line_no}: {e}"), } } @@ -258,11 +280,10 @@ fn build_pool_agent_store(from_pool: &str, pool_size: usize, seed: &RandSeed) -> async fn drive_stream( scenario: &mut TestScenario, mut rx: mpsc::Receiver, - run_id: u64, fallback_pool: String, tps: u64, cancel: CancellationToken, -) -> Result +) -> Result<(usize, usize)> where S: SeedGenerator + Send + Sync + Clone, P: PlanConfig + Templater + Send + Sync + Clone, @@ -277,9 +298,14 @@ where None }; + // Cache the gas price instead of fetching it per tx; refreshed below. + let mut gas_price = scenario.rpc_client.get_gas_price().await?; + let mut last_gas_refresh = Instant::now(); + let mut sent: usize = 0; + let mut failed: usize = 0; let mut idx: usize = 0; - let placeholder_map = std::collections::HashMap::::new(); + let placeholder_map = HashMap::::new(); loop { tokio::select! { @@ -303,18 +329,27 @@ where int.tick().await; } - match send_one(scenario, &spec, idx, &placeholder_map, run_id).await { - Ok(()) => { - sent += 1; + if last_gas_refresh.elapsed() >= GAS_REFRESH_INTERVAL { + if let Ok(gp) = scenario.rpc_client.get_gas_price().await { + gas_price = gp; + } + last_gas_refresh = Instant::now(); + } + + match send_one(scenario, &spec, idx, &placeholder_map, gas_price).await { + Ok(true) => sent += 1, + Ok(false) => failed += 1, + Err(e) => { + failed += 1; + warn!("stream: failed to send tx (idx {idx}): {e}"); } - Err(e) => warn!("stream: failed to send tx (idx {idx}): {e}"), } idx += 1; } } } - Ok(sent) + Ok((sent, failed)) } /// Build a single transaction from a stream spec, sign it, send it, and cache @@ -323,13 +358,21 @@ async fn send_one( scenario: &mut TestScenario, spec: &FunctionCallDefinition, idx: usize, - placeholder_map: &std::collections::HashMap, - _run_id: u64, -) -> Result<()> + placeholder_map: &HashMap, + gas_price: u128, +) -> Result where S: SeedGenerator + Send + Sync + Clone, P: PlanConfig + Templater + Send + Sync + Clone, { + // Stream mode only builds EIP-1559 txs (no blob gas price, no auth list), so + // reject blob (4844) / setCode (7702) specs up front instead of silently + // producing an invalid tx. + if spec.blob_data.is_some() || spec.authorization_address.is_some() { + warn!("stream tx[{idx}]: blob/7702 specs are unsupported in stream mode; skipping"); + return Ok(false); + } + // 1. Resolve `from`/`from_pool` and access list against the scenario's // agent store + templater. This produces a strict FunctionCallDefinition. let strict = scenario @@ -343,8 +386,7 @@ where .template_function_call(&strict, placeholder_map) .map_err(contender_core::Error::Templater)?; - // 3. Fetch a gas price and assign nonce/gas-limit + sign. - let gas_price = scenario.rpc_client.get_gas_price().await?; + // 3. Assign nonce/gas-limit + sign using the cached gas price. let (prepared, wallet) = scenario.prepare_tx_request(&tx_req, gas_price, 0).await?; // Build & sign via the alloy Ethereum network. The op-alloy-network re-export // can create trait-resolution ambiguity, so we fully-qualify the trait @@ -379,6 +421,8 @@ where } }; + let submitted = error.is_none(); + // 5. Emit a structured result line to stdout (mirrors the input stream so // reactive callers can correlate sends with their specs). StreamEvent::emit(StreamPayload::TxResult { @@ -401,7 +445,7 @@ where }) .await?; - Ok(()) + Ok(submitted) } /// Top-level entry point invoked from `main.rs`. @@ -511,9 +555,31 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { } } + // Register a run row so receipts dump under a real run_id; run_txs has a FK + // into runs, so a bogus id (e.g. 0) would orphan them. + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as usize; + let run = SpamRunRequest { + timestamp, + tx_count: 0, // unbounded stream; the real count lands as txs flush + scenario_name: format!("stream-{}", args.from_pool), + campaign_id: None, + campaign_name: None, + stage_name: None, + rpc_url: args.rpc_url.to_string(), + txs_per_duration: args.tps, + duration: SpamDuration::Seconds(0), + pending_timeout: Duration::from_secs(60), + }; + let run_id = db + .insert_run(&run) + .map_err(|e| contender_core::Error::Db(e.into()))?; + info!(run_id, "created stream run"); + // Set up tx_actor context so cached txs flush to the DB. let start_block = scenario.rpc_client.get_block_number().await?; - let run_id = 0u64; let actor_ctx = ActorContext::new(start_block, run_id).with_pending_tx_timeout(Duration::from_secs(60)); scenario.tx_actor().init_ctx(actor_ctx).await?; @@ -522,19 +588,24 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { let (tx, rx) = mpsc::channel::(STREAM_BUFFER); let _reader = spawn_stream_reader(&args.from, tx)?; - let drive_cancel = cancel.clone(); - let sent = tokio::select! { - res = drive_stream(&mut scenario, rx, run_id, args.from_pool.clone(), args.tps, drive_cancel) => { - res? - } - _ = tokio::signal::ctrl_c() => { - info!("CTRL-C: stopping stream loop"); - cancel.cancel(); - 0 - } - }; + // CTRL-C cancels the loop; drive_stream observes the token and returns its counts. + let ctrlc_cancel = cancel.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + ctrlc_cancel.cancel(); + }); + + let (sent, failed) = drive_stream( + &mut scenario, + rx, + args.from_pool.clone(), + args.tps, + cancel.clone(), + ) + .await?; + StreamEvent::emit(StreamPayload::Summary { sent, failed }); - info!("stream complete: {sent} tx(s) sent; draining pending receipts..."); + info!("stream complete: {sent} sent, {failed} failed; draining pending receipts..."); tokio::select! { _ = scenario.dump_tx_cache(run_id) => {} @@ -574,6 +645,31 @@ mod tests { assert!(json.get("error").is_none()); } + #[test] + fn summary_and_backpressure_envelopes_are_tagged() { + let summary = serde_json::to_value(StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::Summary { sent: 9, failed: 1 }, + }) + .unwrap(); + assert_eq!(summary["version"], 1); + assert_eq!(summary["type"], "summary"); + assert_eq!(summary["sent"], 9); + assert_eq!(summary["failed"], 1); + + let backpressure = serde_json::to_value(StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::Backpressure { + queued: 256, + capacity: 256, + }, + }) + .unwrap(); + assert_eq!(backpressure["type"], "backpressure"); + assert_eq!(backpressure["queued"], 256); + assert_eq!(backpressure["capacity"], 256); + } + #[test] fn parses_minimal_spec() { let line = diff --git a/docs/stream-mode.md b/docs/stream-mode.md index 4dc6d133..27a1dc55 100644 --- a/docs/stream-mode.md +++ b/docs/stream-mode.md @@ -121,9 +121,14 @@ consumers: ``` - `version` pins the schema (bump on breaking changes). -- `type` discriminates the event kind (currently only `tx_result`). +- `type` discriminates the event kind: `tx_result`, `backpressure`, or `summary`. - One `tx_result` is emitted per input spec after the send attempt; `error` is present only when the send RPC call failed. +- A `backpressure` event is emitted when the input buffer saturates and the + reader blocks, so the producer can slow down: + `{"version":1,"type":"backpressure","queued":256,"capacity":256}` +- A single `summary` event closes the stream (on EOF or CTRL-C): + `{"version":1,"type":"summary","sent":42,"failed":3}` ## Reuse vs. new code @@ -154,22 +159,21 @@ In scope: - Same `FunctionCallDefinition` schema as scenario TOML (incl. `access_list`, `gas_limit`, `signature`, `args`, `value`, `from`, `from_pool`, `kind`). - Pool funding from `--priv-key` before spam begins. -- Receipt tracking + DB persistence via the existing tx_actor flush loop. +- Receipt tracking + DB persistence via the existing tx_actor flush loop, + recorded under a real `run_id` so dumped receipts are queryable. - Graceful CTRL-C and stream EOF handling (drain pending receipts before exit). Out of scope (future work): - Bundle support (`[[spam.bundle]]` analogue in the stream). - Blob transactions (EIP-4844) and authorization transactions (EIP-7702) — - the `FunctionCallDefinition` fields are deserialized but not exercised in - prototype tests. + specs carrying `blob_data` or `authorization_address` are rejected up front + rather than silently producing an invalid tx. - Fuzzing in stream mode — fuzz happens upstream of the stream. - Gas-bump / nonce-shift retry logic from the regular spammer's `handle_tx_outcome`. The stream loop currently logs send errors and moves on; the upstream is expected to resubmit if it cares. - `--rpc-batch-size`, `--send-raw-tx-sync` integration. -- Recording the run in the `spam_runs` table — stream runs use `run_id = 0` - and rely on the tx_actor's cache only. - A generic `SpamSource` trait so `TimedSpammer` can consume a stream too. ## Validation @@ -202,7 +206,8 @@ to fund the executor pool. "Structured output" above. 3. ~~How should errors propagate back to the upstream producer?~~ **Resolved:** `spam-stream` emits a structured `tx_result` event per spec on stdout - (including send errors), in addition to the DB + logs. + (including send errors), plus `backpressure` and a terminal `summary` event, + in addition to the DB + logs. 4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is "one in flight at a time" acceptable for the relayer case? *(Deferred: parallel sends judged not worth the effort for the prototype.)* From dc2dfc9ec829e5f2c4e6de8da6881f68b68ed47a Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Wed, 3 Jun 2026 21:58:20 -0400 Subject: [PATCH 4/9] fix(spam-stream): enable nonce sync so sends don't fail with NonceMissing TestScenario::sync_nonces() is gated on should_sync_nonces (= the sync_nonces_after_batch param). spam-stream set it false, making its two explicit sync_nonces() calls silent no-ops, so pool accounts' nonces were never loaded and prepare_tx_request failed every send with NonceMissing (surfaced as 'core error'). Stream mode sends one tx at a time and never hits the post-batch sync path, so enabling this only makes the initial pool-nonce sync run. --- crates/cli/src/commands/spam_stream.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index 0368a73d..de2b392f 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -497,7 +497,13 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { bundle_type: BundleType::default(), pending_tx_timeout: Duration::from_secs(60), extra_msg_handles: None, - sync_nonces_after_batch: false, + // Must be true: gates TestScenario::sync_nonces(). With it false the + // explicit sync_nonces() calls below are silent no-ops, leaving the + // pool accounts' nonces unset, so prepare_tx_request fails every send + // with NonceMissing ("core error"). Stream mode sends one tx at a time + // and never hits the post-batch sync path, so enabling this only makes + // the initial pool-nonce sync actually run. + sync_nonces_after_batch: true, rpc_batch_size: 0, gas_price: None, scenario_label: Some(format!("stream-{}", args.from_pool)), From a75c9196443158f0458b7116c97e3b2a448a7e93 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Thu, 4 Jun 2026 14:10:33 -0400 Subject: [PATCH 5/9] fix(spam-stream): reclaim nonce on failed send to avoid nonce gaps prepare_tx_request advances an account's local nonce before the send. When the send is rejected (e.g. an interop access-list filter rejecting a not-yet-valid or forged executing message), the tx never enters the mempool but the local nonce stays advanced, leaving a gap that stalls every later tx from that account behind it. Roll the nonce back by one on a failed send. The stream sends serially, so no concurrent send touched the account in between. --- crates/cli/src/commands/spam_stream.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index de2b392f..f3dfa9b8 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -422,6 +422,16 @@ where }; let submitted = error.is_none(); + if !submitted { + // prepare_tx_request already advanced this account's local nonce, but + // the tx never entered the mempool (e.g. rejected by an interop access + // -list filter). Reclaim the nonce so the next send from this account + // doesn't leave a gap that stalls every later tx behind it. The stream + // sends serially, so nothing else has touched this account meanwhile. + if let Some(n) = scenario.nonces.get_mut(&strict.from) { + *n = n.saturating_sub(1); + } + } // 5. Emit a structured result line to stdout (mirrors the input stream so // reactive callers can correlate sends with their specs). From c8657adec5c463564c4fe13efcc9d7ccfd2e5548 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 5 Jun 2026 15:25:10 -0400 Subject: [PATCH 6/9] docs(spam-stream): clarify --tps long_help (review #589) --tps paces how fast specs are pulled off the input stream; each spec is sent exactly once, so a one-line input sends a single tx regardless of the value. Add long_help explaining this, plus tests asserting the help text is present and the clap arg config is valid (debug_assert). --- crates/cli/src/commands/spam_stream.rs | 36 ++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index f3dfa9b8..5bde281c 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -157,6 +157,11 @@ pub struct SpamStreamCliArgs { /// can be parsed". #[arg( long, + long_help = "Target transactions per second. This paces how fast specs are pulled \ + off the input stream, NOT how many txs are duplicated: each input spec is sent \ + exactly once. With `0` (the default) specs are sent as fast as they arrive. If \ + the stream supplies fewer specs per second than `--tps`, the rate is bounded by \ + the stream, so a one-line input sends a single tx regardless of the value.", default_value_t = 0, help_heading = HELP_HEADING_RUNTIME, )] @@ -637,6 +642,37 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { #[cfg(test)] mod tests { use super::*; + use clap::Command; + + /// `SpamStreamCliArgs` derives `Args` (not `Parser`), so build a throwaway + /// `Command` from it to introspect the argument configuration in tests. + fn args_command() -> Command { + SpamStreamCliArgs::augment_args(Command::new("spam-stream")) + } + + #[test] + fn args_config_is_valid() { + // Catches clap misconfiguration (conflicting attrs, bad value_parser, etc.). + args_command().debug_assert(); + } + + #[test] + fn tps_flag_has_long_help() { + // The review asked for long_help clarifying that --tps paces the stream + // rather than duplicating txs. Assert it's wired up and mentions the + // single-spec behavior so the explanation can't silently regress. + let cmd = args_command(); + let tps = cmd + .get_arguments() + .find(|a| a.get_id() == "tps") + .expect("tps arg exists"); + let long_help = tps + .get_long_help() + .expect("tps has long_help") + .to_string(); + assert!(long_help.contains("once"), "long_help: {long_help}"); + assert!(long_help.contains("stream"), "long_help: {long_help}"); + } #[test] fn tx_result_envelope_is_versioned_and_tagged() { From 584057b51ec6b9cf6c8c3aa77104ff9bb903f659 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 5 Jun 2026 15:26:31 -0400 Subject: [PATCH 7/9] feat(spam-stream): accept unit strings for --min-balance (review #589) --min-balance now parses unit-value strings ("10 eth", "0.5 ether", "100 gwei") via util::parse_value, matching the other CLI balance/value flags; a plain number is still wei. Default expressed as "0.01 ether" (unchanged 1e16 wei). Tests cover unit parsing, the wei fallback, and the default round-trip through the value_parser. --- crates/cli/src/commands/spam_stream.rs | 37 +++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index 5bde281c..e6d98c1a 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -27,7 +27,7 @@ use contender_core::{ db::{DbOps, SpamDuration, SpamRunRequest}, generator::{ agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, - FunctionCallDefinition, Generator, PlanConfig, RandSeed, + util::parse_value, FunctionCallDefinition, Generator, PlanConfig, RandSeed, }, spammer::tx_actor::{ActorContext, CacheTx}, test_scenario::{TestScenario, TestScenarioParams}, @@ -167,10 +167,13 @@ pub struct SpamStreamCliArgs { )] pub tps: u64, - /// Minimum balance to keep in each pool account, in wei. + /// Minimum balance to keep in each pool account. #[arg( long, - default_value_t = U256::from(10_000_000_000_000_000u128), + long_help = "The minimum balance to keep in each pool account, with units \ + (e.g. \"10 eth\", \"0.5 ether\", \"100 gwei\"). A plain number is parsed as wei.", + default_value = "0.01 ether", + value_parser = parse_value, help_heading = HELP_HEADING_RUNTIME, )] pub min_balance: U256, @@ -642,7 +645,7 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { #[cfg(test)] mod tests { use super::*; - use clap::Command; + use clap::{Command, FromArgMatches}; /// `SpamStreamCliArgs` derives `Args` (not `Parser`), so build a throwaway /// `Command` from it to introspect the argument configuration in tests. @@ -674,6 +677,32 @@ mod tests { assert!(long_help.contains("stream"), "long_help: {long_help}"); } + /// Parse CLI args from a token list, the way clap would at runtime, so we + /// can assert how value strings are coerced into `SpamStreamCliArgs`. + fn parse_args(tokens: &[&str]) -> SpamStreamCliArgs { + let matches = args_command().get_matches_from( + std::iter::once("spam-stream").chain(tokens.iter().copied()), + ); + SpamStreamCliArgs::from_arg_matches(&matches).expect("from_arg_matches") + } + + #[test] + fn min_balance_accepts_unit_strings() { + // The review asked --min-balance to accept unit-value strings via parse_value. + let eth = parse_args(&["--min-balance", "10 eth"]); + assert_eq!(eth.min_balance, U256::from(10_000_000_000_000_000_000u128)); // 10e18 + // A plain number is still wei (parse_value's fallback). + let wei = parse_args(&["--min-balance", "12345"]); + assert_eq!(wei.min_balance, U256::from(12345u64)); + } + + #[test] + fn min_balance_default_is_point_zero_one_ether() { + // Default must round-trip through value_parser to 0.01 ETH = 1e16 wei. + let args = parse_args(&[]); + assert_eq!(args.min_balance, U256::from(10_000_000_000_000_000u128)); + } + #[test] fn tx_result_envelope_is_versioned_and_tagged() { let event = StreamEvent { From c2b3d029790fe282e40c45ab9dec64b45d8aa2ce Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 5 Jun 2026 15:33:53 -0400 Subject: [PATCH 8/9] fix(spam-stream): persist seed so --skip-funding works across runs (review #589) When --seed was unset, spam-stream generated a fresh random RandSeed each invocation, so the executor pool's addresses differed every run. Funding the pool with --min-balance in one run then re-running with --skip-funding hit "insufficient funds" because the second run derived a different (unfunded) pool. Fall back to the persisted seedfile (data_dir/seed) when --seed is unset, matching spam/setup/campaign. Threads data_dir into spam_stream(). Pool addresses are now stable across invocations for a given data-dir. Test: build_pool_agent_store is deterministic per seed (same seed -> same addresses, different seed -> different). Manually verified against anvil: run 1 funds the pool + writes the seedfile; run 2 with --skip-funding and no funder key sends successfully (previously failed with insufficient funds). --- crates/cli/src/commands/spam_stream.rs | 36 ++++++++++++++++++++------ crates/cli/src/main.rs | 2 +- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index e6d98c1a..e912cf08 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -11,7 +11,7 @@ use crate::{ Result, }, error::CliError, - util::fund_accounts, + util::{fund_accounts, load_seedfile}, LATENCY_HIST as HIST, PROM, }; use alloy::{ @@ -38,7 +38,7 @@ use contender_testfile::TestConfig; use serde::Serialize; use std::{ collections::HashMap, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -467,12 +467,15 @@ where } /// Top-level entry point invoked from `main.rs`. -pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { - let seed = if let Some(s) = &args.seed { - RandSeed::seed_from_str(s) - } else { - RandSeed::new() - }; +pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs, data_dir: &Path) -> Result<()> { + // Fall back to the persisted seedfile (not a fresh random seed) when --seed + // is unset, matching `spam`/`setup`/`campaign`. This keeps the executor + // pool's addresses stable across invocations, so a pool funded in one run + // is still funded for a later `--skip-funding` run. + let seed = RandSeed::seed_from_str(&match &args.seed { + Some(s) => s.clone(), + None => load_seedfile(data_dir)?, + }); // Provision the executor pool directly. Stream mode never runs pre-defined // create/setup/spam steps, so the scenario starts from an empty config and @@ -703,6 +706,23 @@ mod tests { assert_eq!(args.min_balance, U256::from(10_000_000_000_000_000u128)); } + #[test] + fn pool_addresses_are_deterministic_for_a_seed() { + // The --skip-funding bug was that a fresh random seed each run produced + // different pool addresses, so a pool funded in run 1 looked unfunded in + // run 2. Pin the invariant the fix relies on: a given seed always yields + // the same pool addresses, and different seeds yield different ones. + let seed_a = RandSeed::seed_from_str("0xabc"); + let s1 = build_pool_agent_store("executors", 5, &seed_a); + let s2 = build_pool_agent_store("executors", 5, &seed_a); + assert_eq!(s1.all_signer_addresses(), s2.all_signer_addresses()); + assert_eq!(s1.all_signer_addresses().len(), 5); + + let seed_b = RandSeed::seed_from_str("0xdef"); + let s3 = build_pool_agent_store("executors", 5, &seed_b); + assert_ne!(s1.all_signer_addresses(), s3.all_signer_addresses()); + } + #[test] fn tx_result_envelope_is_versioned_and_tagged() { let event = StreamEvent { diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index f98b519f..74faf0ec 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -141,7 +141,7 @@ async fn run() -> Result<(), contender_cli::Error> { } ContenderSubcommand::SpamStream { args } => { - contender_cli::commands::spam_stream::spam_stream(&db, *args).await?; + contender_cli::commands::spam_stream::spam_stream(&db, *args, &data_dir).await?; } ContenderSubcommand::Server => { From bc7bd54bbf89ab88e6c97bf6ca1c173983fab5e5 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 5 Jun 2026 16:31:38 -0400 Subject: [PATCH 9/9] style(spam-stream): cargo fmt test module (CI fmt fix) --- crates/cli/src/commands/spam_stream.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index e912cf08..b1bf958e 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -672,10 +672,7 @@ mod tests { .get_arguments() .find(|a| a.get_id() == "tps") .expect("tps arg exists"); - let long_help = tps - .get_long_help() - .expect("tps has long_help") - .to_string(); + let long_help = tps.get_long_help().expect("tps has long_help").to_string(); assert!(long_help.contains("once"), "long_help: {long_help}"); assert!(long_help.contains("stream"), "long_help: {long_help}"); } @@ -683,17 +680,17 @@ mod tests { /// Parse CLI args from a token list, the way clap would at runtime, so we /// can assert how value strings are coerced into `SpamStreamCliArgs`. fn parse_args(tokens: &[&str]) -> SpamStreamCliArgs { - let matches = args_command().get_matches_from( - std::iter::once("spam-stream").chain(tokens.iter().copied()), - ); + let matches = args_command() + .get_matches_from(std::iter::once("spam-stream").chain(tokens.iter().copied())); SpamStreamCliArgs::from_arg_matches(&matches).expect("from_arg_matches") } #[test] fn min_balance_accepts_unit_strings() { // The review asked --min-balance to accept unit-value strings via parse_value. + // 10 eth == 10e18 wei. let eth = parse_args(&["--min-balance", "10 eth"]); - assert_eq!(eth.min_balance, U256::from(10_000_000_000_000_000_000u128)); // 10e18 + assert_eq!(eth.min_balance, U256::from(10_000_000_000_000_000_000u128)); // A plain number is still wei (parse_value's fallback). let wei = parse_args(&["--min-balance", "12345"]); assert_eq!(wei.min_balance, U256::from(12345u64));