diff --git a/crates/cli/src/commands/contender_subcommand.rs b/crates/cli/src/commands/contender_subcommand.rs index 92f7e25d..ec886b46 100644 --- a/crates/cli/src/commands/contender_subcommand.rs +++ b/crates/cli/src/commands/contender_subcommand.rs @@ -9,6 +9,7 @@ use crate::default_scenarios::BuiltinScenarioCli; use super::admin::AdminCommand; use super::spam::SpamCliArgs; +use super::spam_stream::SpamStreamCliArgs; use super::ReportFormat; #[derive(Debug, Subcommand)] @@ -34,6 +35,16 @@ pub enum ContenderSubcommand { builtin_scenario_config: Option, }, + #[command( + name = "spam-stream", + long_about = "Read newline-delimited JSON tx specs from stdin or a file and spam them. \ + Each line is a FunctionCallDefinition (same fields as scenario TOML `[[spam.tx]]`)." + )] + SpamStream { + #[command(flatten)] + args: Box, + }, + #[command( name = "setup", long_about = "Deploy contracts and execute one-time setup txs." diff --git a/crates/cli/src/commands/error.rs b/crates/cli/src/commands/error.rs index 9ab196f0..766febf6 100644 --- a/crates/cli/src/commands/error.rs +++ b/crates/cli/src/commands/error.rs @@ -78,6 +78,9 @@ pub enum ArgsError { #[error("failed to parse url")] UrlParse(#[from] url::ParseError), + + #[error("{0}")] + Custom(String), } #[derive(Debug, Error)] diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 6b834134..d3e6c834 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -8,6 +8,7 @@ pub mod replay; pub mod rpc; mod setup; mod spam; +pub mod spam_stream; use clap::{Parser, ValueEnum}; use contender_core::db::DbOps; diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs new file mode 100644 index 00000000..b1bf958e --- /dev/null +++ b/crates/cli/src/commands/spam_stream.rs @@ -0,0 +1,823 @@ +//! `spam-stream` subcommand: read newline-delimited JSON tx specs from stdin +//! or a file, and spam them through the contender pipeline. +//! +//! This is the entry point for the "stream mode" prototype. See +//! `docs/stream-mode.md` for the design note. + +use crate::{ + commands::{ + common::{HELP_HEADING_COMMON, HELP_HEADING_PAYLOAD, HELP_HEADING_RUNTIME}, + error::ArgsError, + Result, + }, + error::CliError, + util::{fund_accounts, load_seedfile}, + LATENCY_HIST as HIST, PROM, +}; +use alloy::{ + consensus::TxType, + network::{AnyTxEnvelope, Ethereum, NetworkTransactionBuilder}, + primitives::{utils::format_ether, U256}, + providers::Provider, + transports::http::reqwest::Url, +}; +use clap::Args; +use contender_core::{ + agent_controller::{AgentClass, AgentStore}, + db::{DbOps, SpamDuration, SpamRunRequest}, + generator::{ + agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, + util::parse_value, FunctionCallDefinition, Generator, PlanConfig, RandSeed, + }, + spammer::tx_actor::{ActorContext, CacheTx}, + test_scenario::{TestScenario, TestScenarioParams}, + BundleType, +}; +use contender_sqlite::SqliteDb; +use contender_testfile::TestConfig; +use serde::Serialize; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, BufReader}, + sync::mpsc, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +/// Default agent-pool name when the user does not specify one. +const DEFAULT_POOL: &str = "executors"; +/// Channel buffer between the reader task and the spam loop. +const STREAM_BUFFER: usize = 256; +/// Schema version of the structured stdout output. Bump when the envelope +/// shape changes in a backward-incompatible way. +const OUTPUT_VERSION: u32 = 1; +/// How often to refresh the cached gas price during the stream loop. Avoids an +/// RPC round-trip on every single tx. +const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(6); + +/// Versioned, tagged envelope written to stdout (one JSON line per event) so +/// downstream consumers can evolve with the schema. The `version` field pins +/// the schema and the `type` tag (via `payload`) discriminates event kinds. +#[derive(Debug, Serialize)] +struct StreamEvent { + version: u32, + #[serde(flatten)] + payload: StreamPayload, +} + +#[derive(Debug, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum StreamPayload { + /// Emitted once per input spec after the send attempt. + TxResult { + /// Zero-based index of the spec in the stream. + idx: usize, + /// Transaction hash (present even when the send RPC call failed). + tx_hash: String, + /// Unix-epoch milliseconds when the send was attempted. + start_timestamp_ms: u128, + /// Optional `kind` carried over from the input spec. + #[serde(skip_serializing_if = "Option::is_none")] + kind: Option, + /// Send-time error from the RPC, if any. + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + }, + /// Buffer saturated; the reader is blocking. Signals the producer to slow down. + Backpressure { queued: usize, capacity: usize }, + /// Emitted once when the stream finishes (EOF or cancellation). + Summary { sent: usize, failed: usize }, +} + +impl StreamEvent { + fn emit(payload: StreamPayload) { + let event = StreamEvent { + version: OUTPUT_VERSION, + payload, + }; + // Serialization of these fixed-shape payloads cannot fail. + if let Ok(line) = serde_json::to_string(&event) { + println!("{line}"); + } + } +} + +#[derive(Clone, Debug, Args)] +pub struct SpamStreamCliArgs { + /// RPC URL to send transactions to. + #[arg( + env = "RPC_URL", + short = 'r', + long, + default_value = "http://localhost:8545", + help_heading = HELP_HEADING_COMMON, + )] + pub rpc_url: Url, + + /// Funder private key. Used to fund the pool of executor accounts before spam begins. + #[arg( + env = "CONTENDER_PRIVATE_KEY", + short = 'p', + long = "priv-key", + help_heading = HELP_HEADING_COMMON, + )] + pub private_key: Option, + + /// Source of the JSON-lines stream: either `stdin` or a file path. + #[arg( + long = "from", + default_value = "stdin", + help_heading = HELP_HEADING_COMMON, + )] + pub from: String, + + /// Pool name used to source signers for each tx. Stream specs that omit + /// `from`/`from_pool` will default to this pool. + #[arg( + long = "from-pool", + default_value = DEFAULT_POOL, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub from_pool: String, + + /// Number of accounts to generate in the pool. + #[arg( + long = "pool-size", + default_value_t = 10, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub pool_size: usize, + + /// Target transactions per second. `0` means "send as fast as the stream + /// can be parsed". + #[arg( + long, + long_help = "Target transactions per second. This paces how fast specs are pulled \ + off the input stream, NOT how many txs are duplicated: each input spec is sent \ + exactly once. With `0` (the default) specs are sent as fast as they arrive. If \ + the stream supplies fewer specs per second than `--tps`, the rate is bounded by \ + the stream, so a one-line input sends a single tx regardless of the value.", + default_value_t = 0, + help_heading = HELP_HEADING_RUNTIME, + )] + pub tps: u64, + + /// Minimum balance to keep in each pool account. + #[arg( + long, + long_help = "The minimum balance to keep in each pool account, with units \ + (e.g. \"10 eth\", \"0.5 ether\", \"100 gwei\"). A plain number is parsed as wei.", + default_value = "0.01 ether", + value_parser = parse_value, + help_heading = HELP_HEADING_RUNTIME, + )] + pub min_balance: U256, + + /// Seed for deterministic pool-signer generation. + #[arg( + env = "CONTENDER_SEED", + long, + help_heading = HELP_HEADING_RUNTIME, + )] + pub seed: Option, + + /// Skip funding the executor accounts. Useful when the pool was funded + /// out-of-band or every spec carries a `from` address with an existing balance. + #[arg(long, default_value_t = false, help_heading = HELP_HEADING_RUNTIME)] + pub skip_funding: bool, +} + +/// Asynchronously reads JSON lines from `from` (stdin or file path), parses each +/// into a `FunctionCallDefinition`, and forwards it to `tx`. The task exits when +/// EOF is reached or the receiver drops. +pub fn spawn_stream_reader( + from: &str, + tx: mpsc::Sender, +) -> Result> { + // Validate file existence eagerly so a bad path is a clean CLI error rather + // than a silent warning from inside the spawned task. + if from != "stdin" && !PathBuf::from(from).exists() { + return Err(CliError::Args(ArgsError::Custom(format!( + "stream source file not found: {from}" + )))); + } + + let from = from.to_owned(); + Ok(tokio::spawn(async move { + let reader: Box = if from == "stdin" { + Box::new(BufReader::new(tokio::io::stdin())) + } else { + match tokio::fs::File::open(&from).await { + Ok(f) => Box::new(BufReader::new(f)), + Err(e) => { + warn!("failed to open stream source {from}: {e}"); + return; + } + } + }; + forward_lines(reader, tx).await; + })) +} + +async fn forward_lines(reader: R, tx: mpsc::Sender) +where + R: tokio::io::AsyncBufRead + Unpin, +{ + let mut lines = reader.lines(); + let mut line_no: u64 = 0; + // Emit the backpressure event once per saturation episode, not per blocked send. + let mut backpressured = false; + loop { + match lines.next_line().await { + Ok(Some(line)) => { + line_no += 1; + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + continue; + } + match serde_json::from_str::(trimmed) { + Ok(spec) => match tx.try_send(spec) { + Ok(()) => backpressured = false, + Err(mpsc::error::TrySendError::Full(spec)) => { + if !backpressured { + backpressured = true; + let capacity = tx.max_capacity(); + StreamEvent::emit(StreamPayload::Backpressure { + queued: capacity.saturating_sub(tx.capacity()), + capacity, + }); + } + // Block until a slot frees, applying real backpressure. + if tx.send(spec).await.is_err() { + return; // receiver dropped + } + } + Err(mpsc::error::TrySendError::Closed(_)) => return, + }, + Err(e) => warn!("stream: skipping malformed line {line_no}: {e}"), + } + } + Ok(None) => return, // EOF + Err(e) => { + warn!("stream: read error: {e}"); + return; + } + } + } +} + +/// Build an `AgentStore` holding a single spam pool named `from_pool` with +/// `pool_size` signers, derived from `seed`. Stream mode provisions the pool +/// directly instead of round-tripping through a scenario `TestConfig`, since +/// it never executes any pre-defined spam steps. +fn build_pool_agent_store(from_pool: &str, pool_size: usize, seed: &RandSeed) -> AgentStore { + let mut store = AgentStore::new(); + if pool_size > 0 { + store.add_new_agent(from_pool, pool_size, seed, AgentClass::Spammer); + } + store +} + +/// Drive the stream loop: pull specs, build/sign/send txs, cache in the +/// tx_actor for receipt tracking. Returns when the stream channel closes. +async fn drive_stream( + scenario: &mut TestScenario, + mut rx: mpsc::Receiver, + fallback_pool: String, + tps: u64, + cancel: CancellationToken, +) -> Result<(usize, usize)> +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // Rate limiter: only ticks when tps > 0. + let mut ticker = if tps > 0 { + let period = Duration::from_secs_f64(1.0 / tps as f64); + let mut int = tokio::time::interval(period); + int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + Some(int) + } else { + None + }; + + // Cache the gas price instead of fetching it per tx; refreshed below. + let mut gas_price = scenario.rpc_client.get_gas_price().await?; + let mut last_gas_refresh = Instant::now(); + + let mut sent: usize = 0; + let mut failed: usize = 0; + let mut idx: usize = 0; + let placeholder_map = HashMap::::new(); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("stream cancelled; flushing."); + break; + } + spec = rx.recv() => { + let Some(mut spec) = spec else { + debug!("stream EOF received"); + break; + }; + + // Apply default pool if the spec doesn't pick one. + if spec.from.is_none() && spec.from_pool.is_none() { + spec.from_pool = Some(fallback_pool.clone()); + } + + // Rate limit (only when --tps > 0). + if let Some(int) = ticker.as_mut() { + int.tick().await; + } + + if last_gas_refresh.elapsed() >= GAS_REFRESH_INTERVAL { + if let Ok(gp) = scenario.rpc_client.get_gas_price().await { + gas_price = gp; + } + last_gas_refresh = Instant::now(); + } + + match send_one(scenario, &spec, idx, &placeholder_map, gas_price).await { + Ok(true) => sent += 1, + Ok(false) => failed += 1, + Err(e) => { + failed += 1; + warn!("stream: failed to send tx (idx {idx}): {e}"); + } + } + idx += 1; + } + } + } + + Ok((sent, failed)) +} + +/// Build a single transaction from a stream spec, sign it, send it, and cache +/// it in the tx_actor for receipt tracking. +async fn send_one( + scenario: &mut TestScenario, + spec: &FunctionCallDefinition, + idx: usize, + placeholder_map: &HashMap, + gas_price: u128, +) -> Result +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // Stream mode only builds EIP-1559 txs (no blob gas price, no auth list), so + // reject blob (4844) / setCode (7702) specs up front instead of silently + // producing an invalid tx. + if spec.blob_data.is_some() || spec.authorization_address.is_some() { + warn!("stream tx[{idx}]: blob/7702 specs are unsupported in stream mode; skipping"); + return Ok(false); + } + + // 1. Resolve `from`/`from_pool` and access list against the scenario's + // agent store + templater. This produces a strict FunctionCallDefinition. + let strict = scenario + .make_strict_call(spec, idx) + .map_err(contender_core::Error::Generator)?; + + // 2. Render the strict definition into a TransactionRequest (encodes + // calldata, threads access_list, sets value/gas_limit). + let tx_req = scenario + .get_templater() + .template_function_call(&strict, placeholder_map) + .map_err(contender_core::Error::Templater)?; + + // 3. Assign nonce/gas-limit + sign using the cached gas price. + let (prepared, wallet) = scenario.prepare_tx_request(&tx_req, gas_price, 0).await?; + // Build & sign via the alloy Ethereum network. The op-alloy-network re-export + // can create trait-resolution ambiguity, so we fully-qualify the trait + // method and convert the error string-ly instead of relying on From. + let envelope = + >::build( + prepared, &wallet, + ) + .await + .map_err(|e| CliError::Args(ArgsError::Custom(format!("build envelope: {e}"))))?; + let tx_hash = envelope.tx_hash().to_owned(); + + // 4. Send via the same txs_client the regular spammer uses. + let any_envelope = AnyTxEnvelope::Ethereum(envelope); + let start_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let res = scenario.txs_client.send_tx_envelope(any_envelope).await; + let error = match res { + Ok(_) => { + info!("stream tx[{idx}]: {tx_hash} sent"); + None + } + Err(e) => { + let msg = e + .as_error_resp() + .map(|err| err.message.to_string()) + .unwrap_or_else(|| format!("{e}")); + warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); + Some(msg) + } + }; + + let submitted = error.is_none(); + if !submitted { + // prepare_tx_request already advanced this account's local nonce, but + // the tx never entered the mempool (e.g. rejected by an interop access + // -list filter). Reclaim the nonce so the next send from this account + // doesn't leave a gap that stalls every later tx behind it. The stream + // sends serially, so nothing else has touched this account meanwhile. + if let Some(n) = scenario.nonces.get_mut(&strict.from) { + *n = n.saturating_sub(1); + } + } + + // 5. Emit a structured result line to stdout (mirrors the input stream so + // reactive callers can correlate sends with their specs). + StreamEvent::emit(StreamPayload::TxResult { + idx, + tx_hash: tx_hash.to_string(), + start_timestamp_ms: start_ms, + kind: spec.kind.clone(), + error: error.clone(), + }); + + // 6. Cache in the tx_actor so its flush loop polls for the receipt. + scenario + .tx_actor() + .cache_run_tx(CacheTx { + tx_hash, + start_timestamp_ms: start_ms, + end_timestamp_ms: None, + kind: spec.kind.clone(), + error, + }) + .await?; + + Ok(submitted) +} + +/// Top-level entry point invoked from `main.rs`. +pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs, data_dir: &Path) -> Result<()> { + // Fall back to the persisted seedfile (not a fresh random seed) when --seed + // is unset, matching `spam`/`setup`/`campaign`. This keeps the executor + // pool's addresses stable across invocations, so a pool funded in one run + // is still funded for a later `--skip-funding` run. + let seed = RandSeed::seed_from_str(&match &args.seed { + Some(s) => s.clone(), + None => load_seedfile(data_dir)?, + }); + + // Provision the executor pool directly. Stream mode never runs pre-defined + // create/setup/spam steps, so the scenario starts from an empty config and + // we inject the pool's signers below. + let config = TestConfig::new(); + let pool_store = build_pool_agent_store(&args.from_pool, args.pool_size, &seed); + let agent_spec = AgentSpec::default() + .create_accounts(0) + .setup_accounts(0) + .spam_accounts(args.pool_size); + + let funder = if let Some(key) = &args.private_key { + let s = key.trim().trim_start_matches("0x"); + Some( + alloy::signers::local::PrivateKeySigner::from_slice( + &alloy::hex::decode(s) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + } else { + None + }; + // TestScenario needs at least one user signer for signer_map. Use a + // throwaway signer if none was provided (we only sign with pool accounts). + let user_signers = if let Some(s) = &funder { + vec![s.clone()] + } else { + vec![alloy::signers::local::PrivateKeySigner::random()] + }; + + let cancel = CancellationToken::new(); + let params = TestScenarioParams { + rpc_url: args.rpc_url.clone(), + builder_rpc_url: None, + txs_rpc_url: None, + signers: user_signers, + agent_spec, + tx_type: TxType::Eip1559, + bundle_type: BundleType::default(), + pending_tx_timeout: Duration::from_secs(60), + extra_msg_handles: None, + // Must be true: gates TestScenario::sync_nonces(). With it false the + // explicit sync_nonces() calls below are silent no-ops, leaving the + // pool accounts' nonces unset, so prepare_tx_request fails every send + // with NonceMissing ("core error"). Stream mode sends one tx at a time + // and never hits the post-batch sync path, so enabling this only makes + // the initial pool-nonce sync actually run. + sync_nonces_after_batch: true, + rpc_batch_size: 0, + gas_price: None, + scenario_label: Some(format!("stream-{}", args.from_pool)), + send_raw_tx_sync: false, + flashblocks_ws_url: None, + }; + + let mut scenario: TestScenario = TestScenario::new( + config, + Arc::new(db.clone()), + seed, + params, + None, + (&PROM, &HIST).into(), + &cancel, + ) + .await?; + + // Register the pool's signers with the scenario so it can sign with them + // and track their nonces, then sync nonces from the RPC. + for (_name, store) in pool_store.all_agents() { + for signer in &store.signers { + scenario.signer_map.insert(signer.address(), signer.clone()); + } + } + scenario.agent_store = pool_store; + scenario.sync_nonces().await?; + + // Fund the pool from the funder key if provided. + if !args.skip_funding { + if let Some(funder) = &funder { + let addrs = scenario.agent_store.all_signer_addresses(); + if !addrs.is_empty() { + info!( + "funding {} pool account(s) to {} ETH min from {}", + addrs.len(), + format_ether(args.min_balance), + funder.address() + ); + fund_accounts( + &addrs, + funder, + &scenario.rpc_client, + args.min_balance, + TxType::Legacy, + &Default::default(), + ) + .await?; + // Re-sync nonces for the freshly-funded accounts so the first + // sent tx uses the correct nonce. + scenario.sync_nonces().await?; + } + } else { + warn!("no funder key supplied; pool accounts must already be funded"); + } + } + + // Register a run row so receipts dump under a real run_id; run_txs has a FK + // into runs, so a bogus id (e.g. 0) would orphan them. + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as usize; + let run = SpamRunRequest { + timestamp, + tx_count: 0, // unbounded stream; the real count lands as txs flush + scenario_name: format!("stream-{}", args.from_pool), + campaign_id: None, + campaign_name: None, + stage_name: None, + rpc_url: args.rpc_url.to_string(), + txs_per_duration: args.tps, + duration: SpamDuration::Seconds(0), + pending_timeout: Duration::from_secs(60), + }; + let run_id = db + .insert_run(&run) + .map_err(|e| contender_core::Error::Db(e.into()))?; + info!(run_id, "created stream run"); + + // Set up tx_actor context so cached txs flush to the DB. + let start_block = scenario.rpc_client.get_block_number().await?; + let actor_ctx = + ActorContext::new(start_block, run_id).with_pending_tx_timeout(Duration::from_secs(60)); + scenario.tx_actor().init_ctx(actor_ctx).await?; + + // Spawn the reader and run the drive loop. + let (tx, rx) = mpsc::channel::(STREAM_BUFFER); + let _reader = spawn_stream_reader(&args.from, tx)?; + + // CTRL-C cancels the loop; drive_stream observes the token and returns its counts. + let ctrlc_cancel = cancel.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + ctrlc_cancel.cancel(); + }); + + let (sent, failed) = drive_stream( + &mut scenario, + rx, + args.from_pool.clone(), + args.tps, + cancel.clone(), + ) + .await?; + StreamEvent::emit(StreamPayload::Summary { sent, failed }); + + info!("stream complete: {sent} sent, {failed} failed; draining pending receipts..."); + + tokio::select! { + _ = scenario.dump_tx_cache(run_id) => {} + _ = tokio::signal::ctrl_c() => { + info!("CTRL-C during drain; exiting"); + } + } + scenario.shutdown().await; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::{Command, FromArgMatches}; + + /// `SpamStreamCliArgs` derives `Args` (not `Parser`), so build a throwaway + /// `Command` from it to introspect the argument configuration in tests. + fn args_command() -> Command { + SpamStreamCliArgs::augment_args(Command::new("spam-stream")) + } + + #[test] + fn args_config_is_valid() { + // Catches clap misconfiguration (conflicting attrs, bad value_parser, etc.). + args_command().debug_assert(); + } + + #[test] + fn tps_flag_has_long_help() { + // The review asked for long_help clarifying that --tps paces the stream + // rather than duplicating txs. Assert it's wired up and mentions the + // single-spec behavior so the explanation can't silently regress. + let cmd = args_command(); + let tps = cmd + .get_arguments() + .find(|a| a.get_id() == "tps") + .expect("tps arg exists"); + let long_help = tps.get_long_help().expect("tps has long_help").to_string(); + assert!(long_help.contains("once"), "long_help: {long_help}"); + assert!(long_help.contains("stream"), "long_help: {long_help}"); + } + + /// Parse CLI args from a token list, the way clap would at runtime, so we + /// can assert how value strings are coerced into `SpamStreamCliArgs`. + fn parse_args(tokens: &[&str]) -> SpamStreamCliArgs { + let matches = args_command() + .get_matches_from(std::iter::once("spam-stream").chain(tokens.iter().copied())); + SpamStreamCliArgs::from_arg_matches(&matches).expect("from_arg_matches") + } + + #[test] + fn min_balance_accepts_unit_strings() { + // The review asked --min-balance to accept unit-value strings via parse_value. + // 10 eth == 10e18 wei. + let eth = parse_args(&["--min-balance", "10 eth"]); + assert_eq!(eth.min_balance, U256::from(10_000_000_000_000_000_000u128)); + // A plain number is still wei (parse_value's fallback). + let wei = parse_args(&["--min-balance", "12345"]); + assert_eq!(wei.min_balance, U256::from(12345u64)); + } + + #[test] + fn min_balance_default_is_point_zero_one_ether() { + // Default must round-trip through value_parser to 0.01 ETH = 1e16 wei. + let args = parse_args(&[]); + assert_eq!(args.min_balance, U256::from(10_000_000_000_000_000u128)); + } + + #[test] + fn pool_addresses_are_deterministic_for_a_seed() { + // The --skip-funding bug was that a fresh random seed each run produced + // different pool addresses, so a pool funded in run 1 looked unfunded in + // run 2. Pin the invariant the fix relies on: a given seed always yields + // the same pool addresses, and different seeds yield different ones. + let seed_a = RandSeed::seed_from_str("0xabc"); + let s1 = build_pool_agent_store("executors", 5, &seed_a); + let s2 = build_pool_agent_store("executors", 5, &seed_a); + assert_eq!(s1.all_signer_addresses(), s2.all_signer_addresses()); + assert_eq!(s1.all_signer_addresses().len(), 5); + + let seed_b = RandSeed::seed_from_str("0xdef"); + let s3 = build_pool_agent_store("executors", 5, &seed_b); + assert_ne!(s1.all_signer_addresses(), s3.all_signer_addresses()); + } + + #[test] + fn tx_result_envelope_is_versioned_and_tagged() { + let event = StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::TxResult { + idx: 3, + tx_hash: "0xabc".to_string(), + start_timestamp_ms: 1_733_155_200_000, + kind: Some("validate".to_string()), + error: None, + }, + }; + let json: serde_json::Value = + serde_json::from_str(&serde_json::to_string(&event).unwrap()).unwrap(); + assert_eq!(json["version"], 1); + assert_eq!(json["type"], "tx_result"); + assert_eq!(json["idx"], 3); + assert_eq!(json["tx_hash"], "0xabc"); + assert_eq!(json["kind"], "validate"); + // `error` is omitted when None. + assert!(json.get("error").is_none()); + } + + #[test] + fn summary_and_backpressure_envelopes_are_tagged() { + let summary = serde_json::to_value(StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::Summary { sent: 9, failed: 1 }, + }) + .unwrap(); + assert_eq!(summary["version"], 1); + assert_eq!(summary["type"], "summary"); + assert_eq!(summary["sent"], 9); + assert_eq!(summary["failed"], 1); + + let backpressure = serde_json::to_value(StreamEvent { + version: OUTPUT_VERSION, + payload: StreamPayload::Backpressure { + queued: 256, + capacity: 256, + }, + }) + .unwrap(); + assert_eq!(backpressure["type"], "backpressure"); + assert_eq!(backpressure["queued"], 256); + assert_eq!(backpressure["capacity"], 256); + } + + #[test] + fn parses_minimal_spec() { + let line = + r#"{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.to, "0xdeAD000000000000000000000000000000000000"); + assert_eq!(spec.value.as_deref(), Some("1")); + assert_eq!(spec.gas_limit, Some(21000)); + assert!(spec.from.is_none() && spec.from_pool.is_none()); + } + + #[test] + fn parses_spec_with_signature_and_args() { + let line = r#"{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "gas_limit": 200000 + }"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.signature.as_deref(), Some("validateMessage(bytes32)")); + assert_eq!(spec.args.as_ref().unwrap().len(), 1); + assert_eq!(spec.gas_limit, Some(200000)); + } + + #[tokio::test] + async fn forward_lines_skips_blank_and_comments_and_emits_specs() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"\n# this is a comment\n{\"to\":\"0xdeAD000000000000000000000000000000000000\",\"value\":\"1\"}\n\n{\"to\":\"0xdeAD000000000000000000000000000000000001\",\"value\":\"2\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 2); + assert_eq!(received[0].value.as_deref(), Some("1")); + assert_eq!(received[1].value.as_deref(), Some("2")); + } + + #[tokio::test] + async fn forward_lines_skips_malformed_lines() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"not json at all\n{\"to\":\"0xdeAD000000000000000000000000000000000000\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 1); + } +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 4e0af9a5..74faf0ec 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -140,6 +140,10 @@ async fn run() -> Result<(), contender_cli::Error> { commands::spam(&db, &spam_args, SpamCampaignContext::default()).await?; } + ContenderSubcommand::SpamStream { args } => { + contender_cli::commands::spam_stream::spam_stream(&db, *args, &data_dir).await?; + } + ContenderSubcommand::Server => { contender_cli::server::run() .await diff --git a/docs/stream-mode.md b/docs/stream-mode.md new file mode 100644 index 00000000..27a1dc55 --- /dev/null +++ b/docs/stream-mode.md @@ -0,0 +1,213 @@ +# Stream Mode (`spam-stream`) + +> **Status:** prototype / draft. This document accompanies the +> initial implementation of `spam-stream`. Expect the CLI surface and the +> stream format to evolve based on review feedback. + +## Motivation + +Contender is a TPS spammer driven by static scenario TOML files. It cycles +through `[[spam]]` entries, fuzzes args, and sends at a configured rate. + +Some use cases need to feed *dynamically discovered* tx specs into the spam +loop. For example, an interop relayer reads message-emitted events on chain A +and needs to execute the corresponding `validateMessage(...)` call on chain B +— with the right access list, calldata, and target. Today these workflows +end up reimplementing rate limiting, signer pools, nonce management, and +receipt tracking outside contender. + +Stream mode lets contender act as the "sender" half of those workflows: any +upstream process can pipe JSON tx specs into contender and reuse the existing +agent pools, rate limiter, `tx_actor` receipt tracking, gas-price caching, +and Prometheus latency metrics. + +## CLI + +```bash +contender spam-stream \ + -r https://chain-b \ + -p $FUNDING_KEY \ + --from \ + --from-pool executors --pool-size 10 \ + --tps 5 +``` + +Key flags: + +| Flag | Default | Meaning | +|------|---------|---------| +| `-r, --rpc-url` | `http://localhost:8545` | Target RPC. | +| `-p, --priv-key` | none | Funder key (funds the pool before spam starts). | +| `--from` | `stdin` | `stdin` or a file path. | +| `--from-pool` | `executors` | Pool name. Specs that omit `from`/`from_pool` use this pool. | +| `--pool-size` | `10` | Accounts generated in the pool. | +| `--tps` | `0` | `0` = consume as fast as channel emits. | +| `--min-balance` | `0.01 ETH` (wei) | Min pool-account balance during funding. | +| `--skip-funding` | `false` | Skip pre-spam funding. | +| `--seed` | random | Deterministic pool generation. | + +## Stream format + +Newline-delimited JSON, one [`FunctionCallDefinition`](../crates/core/src/generator/function_def.rs) +per line. Same field names as scenario TOML. + +Minimal: + +```json +{"to":"0xdeAD000000000000000000000000000000000000","value":"1 wei","gas_limit":21000} +``` + +Full (interop-style): + +```json +{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "access_list": [ + { + "address": "0x4200000000000000000000000000000000000022", + "storageKeys": ["0x0100000000000000000000000000000000000000000000000000000000000000"] + } + ], + "gas_limit": 200000 +} +``` + +Empty lines and lines beginning with `#` are ignored. Malformed JSON lines +log a warning and the loop continues. + +## Architecture + +Stream mode does **not** introduce a new spammer trait or a new tx pipeline. +It reuses the existing `TestScenario` machinery and wires a JSON-line reader +to it through an mpsc channel. + +``` +stdin/file -> reader task -> mpsc + | + v + drive_stream loop + | + v + for each spec: + scenario.make_strict_call (Generator trait, resolves from_pool + access_list) + scenario.config.template_function_call (Templater, builds TransactionRequest) + scenario.prepare_tx_request (assigns nonce, gas limit, signs key from pool) + scenario.txs_client.send_tx_envelope + scenario.tx_actor().cache_run_tx (queues for receipt polling) +``` + +Code map: + +- [`crates/cli/src/commands/spam_stream.rs`](../crates/cli/src/commands/spam_stream.rs) + is the new subcommand. All logic lives there. +- The scenario starts from an empty `TestConfig`; the executor pool is + provisioned directly via `AgentStore::add_new_agent` and its signers are + registered with the scenario, then nonces are synced from the RPC. +- The reader task is a small wrapper around `tokio::io::BufReader::lines()` + that forwards parsed specs over an mpsc channel and exits on EOF. +- The drive loop honors `--tps` via `tokio::time::interval`. + +## Structured output + +`spam-stream` writes a structured, newline-delimited JSON event stream to +**stdout** (human-readable logs go to stderr via `tracing`). Each event is a +versioned, tagged envelope so the schema can evolve without breaking +consumers: + +```json +{"version":1,"type":"tx_result","idx":0,"tx_hash":"0x...","start_timestamp_ms":1733155200000,"kind":"validate","error":null} +``` + +- `version` pins the schema (bump on breaking changes). +- `type` discriminates the event kind: `tx_result`, `backpressure`, or `summary`. +- One `tx_result` is emitted per input spec after the send attempt; `error` is + present only when the send RPC call failed. +- A `backpressure` event is emitted when the input buffer saturates and the + reader blocks, so the producer can slow down: + `{"version":1,"type":"backpressure","queued":256,"capacity":256}` +- A single `summary` event closes the stream (on EOF or CTRL-C): + `{"version":1,"type":"summary","sent":42,"failed":3}` + +## Reuse vs. new code + +| Existing piece | Reused as-is | +|----------------|--------------| +| `TestScenario` constructor (signer map, nonce sync, txs_client) | yes | +| `Generator::make_strict_call` (resolves `from_pool`, access list, EIP-7702) | yes | +| `Templater::template_function_call` (calldata encoding, access list threading) | yes | +| `TestScenario::prepare_tx_request` (nonce, gas limit, complete_tx_request) | yes | +| Pool generation via `AgentPools::build_agent_store` | yes | +| `TxActorHandle::cache_run_tx` + flush loop (DB writes, receipt polling) | yes | +| `fund_accounts` helper | yes | +| Prometheus latency histograms via Tower middleware | yes (inherited from `TestScenario`) | +| The `Spammer` trait + `TimedSpammer`/`BlockwiseSpammer` | **not** reused | + +The reason we skip `TimedSpammer` is that its `on_spam` loop drives ticks +from a pre-loaded `Vec>` returned by +`get_spam_tx_chunks`. Stream mode wants a stream-shaped tick: pull one spec, +send one tx. Bolting the channel into `TimedSpammer` would require a generic +`SpamSource` abstraction across the existing spammers. Out of scope for the +prototype; a candidate for a follow-up if the prototype lands. + +## Scope of the prototype + +In scope: + +- `--from stdin|FILE`, `--from-pool`, `--pool-size`, `--tps`, `--priv-key`, `--rpc-url`, `--seed`, `--min-balance`, `--skip-funding`. +- Same `FunctionCallDefinition` schema as scenario TOML (incl. `access_list`, + `gas_limit`, `signature`, `args`, `value`, `from`, `from_pool`, `kind`). +- Pool funding from `--priv-key` before spam begins. +- Receipt tracking + DB persistence via the existing tx_actor flush loop, + recorded under a real `run_id` so dumped receipts are queryable. +- Graceful CTRL-C and stream EOF handling (drain pending receipts before exit). + +Out of scope (future work): + +- Bundle support (`[[spam.bundle]]` analogue in the stream). +- Blob transactions (EIP-4844) and authorization transactions (EIP-7702) — + specs carrying `blob_data` or `authorization_address` are rejected up front + rather than silently producing an invalid tx. +- Fuzzing in stream mode — fuzz happens upstream of the stream. +- Gas-bump / nonce-shift retry logic from the regular spammer's + `handle_tx_outcome`. The stream loop currently logs send errors and moves + on; the upstream is expected to resubmit if it cares. +- `--rpc-batch-size`, `--send-raw-tx-sync` integration. +- A generic `SpamSource` trait so `TimedSpammer` can consume a stream too. + +## Validation + +Unit tests live alongside the implementation: + +``` +cargo test -p contender_cli spam_stream +``` + +Smoke test: + +```bash +echo '{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}' | \ + contender spam-stream -r $RPC -p $FUNDING_KEY --from stdin --tps 1 +``` + +The tx should land on the target chain; the funder needs at least enough ETH +to fund the executor pool. + +## Open questions + +1. Should stream mode get its own `Spammer` impl in `contender_core` so + campaigns can reuse it? Today the prototype lives entirely in `cli/`. + *(Deferred to a follow-up: refactoring the `Spammer` trait is out of scope + for the prototype.)* +2. ~~Is the JSON spec the right shape, or should we standardize on a tagged + envelope so we can evolve it later?~~ **Resolved:** the stdout output is now + a versioned, tagged envelope (`{"version":1,"type":"tx_result",...}`). See + "Structured output" above. +3. ~~How should errors propagate back to the upstream producer?~~ **Resolved:** + `spam-stream` emits a structured `tx_result` event per spec on stdout + (including send errors), plus `backpressure` and a terminal `summary` event, + in addition to the DB + logs. +4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is + "one in flight at a time" acceptable for the relayer case? *(Deferred: + parallel sends judged not worth the effort for the prototype.)*