From 8248af3702fb2f5ec60407d03a8c494848b96a46 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 20 Feb 2026 18:47:04 +0900 Subject: [PATCH] perf: fix TPS bottleneck in worker pipeline --- crates/data-chain/src/worker/batch_maker.rs | 14 ++++ crates/data-chain/src/worker/config.rs | 4 +- crates/data-chain/src/worker/core.rs | 80 +++++++++++++++++---- crates/node/Cargo.toml | 1 + crates/node/src/config.rs | 6 +- crates/node/src/main.rs | 21 ++++-- crates/node/src/node.rs | 12 ++-- crates/rpc/src/adapters.rs | 8 +-- 8 files changed, 114 insertions(+), 32 deletions(-) diff --git a/crates/data-chain/src/worker/batch_maker.rs b/crates/data-chain/src/worker/batch_maker.rs index c86c79b7..7eafc5c8 100644 --- a/crates/data-chain/src/worker/batch_maker.rs +++ b/crates/data-chain/src/worker/batch_maker.rs @@ -54,6 +54,20 @@ impl BatchMaker { None } + /// Add multiple transactions at once, returning any completed batches. + /// + /// Convenience method that collects all resulting batches when adding + /// a pre-drained set of transactions in bulk. + pub fn add_transactions(&mut self, txs: Vec) -> Vec { + let mut batches = Vec::new(); + for tx in txs { + if let Some(batch) = self.add_transaction(tx) { + batches.push(batch); + } + } + batches + } + /// Check if batch should be flushed fn should_flush(&self) -> bool { self.pending_size >= self.max_bytes || self.pending_txs.len() >= self.max_txs diff --git a/crates/data-chain/src/worker/config.rs b/crates/data-chain/src/worker/config.rs index 98608fe8..4e44b699 100644 --- a/crates/data-chain/src/worker/config.rs +++ b/crates/data-chain/src/worker/config.rs @@ -22,14 +22,14 @@ impl WorkerConfig { /// Create a new configuration with defaults /// /// Default batch thresholds are tuned for responsive transaction processing: - /// - `max_batch_txs`: 100 transactions triggers immediate batch flush + /// - `max_batch_txs`: 500 transactions triggers immediate batch flush /// - `flush_interval`: 50ms ensures batches don't wait too long pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self { Self { validator_id, worker_id, max_batch_bytes: 1024 * 1024, // 1MB - max_batch_txs: 100, // Flush after 100 txs for responsive batching + max_batch_txs: 500, // Flush after 500 txs for higher throughput flush_interval: Duration::from_millis(50), // Faster time-based flush } } diff --git a/crates/data-chain/src/worker/core.rs b/crates/data-chain/src/worker/core.rs index ff00a94e..78eca655 100644 --- a/crates/data-chain/src/worker/core.rs +++ b/crates/data-chain/src/worker/core.rs @@ -382,10 +382,18 @@ impl Worker { self.check_time_flush().await; } - // Handle incoming transactions + // Handle incoming transactions - drain all ready TXs at once tx = self.tx_receiver.recv() => { if let Some(tx) = tx { - self.handle_transaction(tx).await; + // Drain additional ready transactions to amortize select! overhead + let mut txs = vec![tx]; + while let Ok(t) = self.tx_receiver.try_recv() { + txs.push(t); + if txs.len() >= self.config.max_batch_txs { + break; + } + } + self.handle_transactions_batch(txs).await; } else { warn!(worker_id = self.config.worker_id, "tx_receiver closed"); self.shutdown = true; @@ -420,9 +428,10 @@ impl Worker { info!(worker_id = self.config.worker_id, "Worker shutting down"); } - /// Handle incoming transaction + /// Handle incoming transaction (used by tests only; production uses handle_transactions_batch) + #[cfg(test)] async fn handle_transaction(&mut self, tx: Transaction) { - info!( + trace!( worker_id = self.config.worker_id, tx_size = tx.len(), "Worker received transaction from channel" @@ -457,7 +466,7 @@ impl Worker { ); self.process_batch(batch).await; } else { - info!( + trace!( worker_id = self.config.worker_id, pending_txs = self.batch_maker.pending_count(), "Transaction added to batch maker, waiting for more or flush" @@ -465,6 +474,52 @@ impl Worker { } } + /// Handle a batch of incoming transactions drained from the channel. + /// + /// Validates each transaction, adds all valid ones to the batch maker, + /// and processes any resulting batches. This amortizes the per-transaction + /// overhead of the select! loop and logging. + async fn handle_transactions_batch(&mut self, txs: Vec) { + let total = txs.len(); + debug!( + worker_id = self.config.worker_id, + tx_count = total, + "Processing drained transaction batch" + ); + + // Validate and collect valid transactions + let mut valid_txs = Vec::with_capacity(total); + for tx in txs { + if let Some(ref validator) = self.validator { + match validator.validate_transaction(&tx).await { + Ok(()) => valid_txs.push(tx), + Err(e) => { + warn!( + worker_id = self.config.worker_id, + error = %e, + "Transaction validation failed, rejecting" + ); + } + } + } else { + valid_txs.push(tx); + } + } + + // Add all valid transactions and collect any completed batches + let batches = self.batch_maker.add_transactions(valid_txs); + + // Process all completed batches + for batch in batches { + info!( + worker_id = self.config.worker_id, + tx_count = batch.transactions.len(), + "Batch ready, processing" + ); + self.process_batch(batch).await; + } + } + /// Handle message from Primary async fn handle_primary_message(&mut self, msg: PrimaryToWorker) { match msg { @@ -694,9 +749,8 @@ impl Worker { let has_pending = self.batch_maker.has_pending(); let elapsed = self.batch_maker.time_since_batch_start(); - // Log every call so we can see the tick is working if has_pending { - info!( + trace!( worker_id = self.config.worker_id, should_flush, has_pending, @@ -706,7 +760,7 @@ impl Worker { } if should_flush && has_pending { - info!( + debug!( worker_id = self.config.worker_id, pending_txs = self.batch_maker.pending_count(), "Time flush triggered, creating batch" @@ -783,7 +837,7 @@ impl Worker { if let Some(ref storage) = self.storage { match storage.put_batch(batch.clone()).await { Ok(_) => { - info!( + debug!( worker_id = self.config.worker_id, digest = %digest.digest, tx_count = batch.transactions.len(), @@ -812,20 +866,20 @@ impl Worker { self.state.store_batch(batch.clone()); // Broadcast to peer Workers - info!( + debug!( worker_id = self.config.worker_id, digest = %digest.digest, "Broadcasting batch to peer Workers..." ); self.network.broadcast_batch(&batch).await; - info!( + debug!( worker_id = self.config.worker_id, digest = %digest.digest, "Broadcast complete" ); // Report to Primary - info!( + debug!( worker_id = self.config.worker_id, digest = %digest.digest, "Sending BatchDigest to Primary" @@ -846,7 +900,7 @@ impl Worker { "Failed to send BatchDigest to Primary - channel closed" ); } else { - info!( + debug!( worker_id = self.config.worker_id, "BatchDigest sent to Primary successfully" ); diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 336418b7..74398137 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -51,6 +51,7 @@ toml = { workspace = true } # Logging tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] } +tracing-appender = "0.2" # CLI clap = { version = "4.4", features = ["derive"] } diff --git a/crates/node/src/config.rs b/crates/node/src/config.rs index ff35df3a..c45993ed 100644 --- a/crates/node/src/config.rs +++ b/crates/node/src/config.rs @@ -247,7 +247,7 @@ impl NodeConfig { data_dir: home_dir.join("data"), genesis_path: None, // Uses default: {home_dir}/config/genesis.json car_interval_ms: 100, - max_batch_txs: 100, + max_batch_txs: 500, max_batch_bytes: 1024 * 1024, // 1MB rpc_enabled: false, rpc_http_port: DEFAULT_RPC_HTTP_PORT + (index as u16), @@ -473,7 +473,7 @@ mod tests { "num_workers": 1, "data_dir": "/tmp/cipherd-0", "car_interval_ms": 100, - "max_batch_txs": 100, + "max_batch_txs": 500, "max_batch_bytes": 1048576 }"#; @@ -556,7 +556,7 @@ mod tests { "num_workers": 1, "data_dir": "/tmp/cipherd-0", "car_interval_ms": 100, - "max_batch_txs": 100, + "max_batch_txs": 500, "max_batch_bytes": 1048576 }"#; diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 29896113..be17a172 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -364,7 +364,8 @@ async fn main() -> Result<()> { let cli = Cli::parse(); // Initialize tracing based on global flags - init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color); + // Hold the guard to keep the non-blocking writer alive for the process lifetime + let _tracing_guard = init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color); let result = match cli.command { Commands::Init { @@ -424,7 +425,11 @@ async fn main() -> Result<()> { Ok(()) } -fn init_tracing(log_level: &str, log_format: &str, no_color: bool) { +fn init_tracing( + log_level: &str, + log_format: &str, + no_color: bool, +) -> tracing_appender::non_blocking::WorkerGuard { let level = match log_level.to_lowercase().as_str() { "trace" => Level::TRACE, "debug" => Level::DEBUG, @@ -437,16 +442,22 @@ fn init_tracing(log_level: &str, log_format: &str, no_color: bool) { let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level.to_string())); + // Use non-blocking writer to avoid blocking the async runtime on log I/O + let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); + let subscriber = tracing_subscriber::fmt() .with_env_filter(filter) .with_target(true) .with_thread_ids(true) - .with_ansi(!no_color); + .with_ansi(!no_color) + .with_writer(non_blocking); match log_format { "json" => subscriber.json().init(), _ => subscriber.init(), } + + guard } // ============================================================================= @@ -554,7 +565,7 @@ fn cmd_init( data_dir: data_dir.clone(), genesis_path: Some(genesis_path.clone()), car_interval_ms: 100, - max_batch_txs: 100, + max_batch_txs: 500, max_batch_bytes: 1024 * 1024, rpc_enabled: true, rpc_http_port: cipherd::DEFAULT_RPC_HTTP_PORT, @@ -968,7 +979,7 @@ fn cmd_testnet_init_files( data_dir: data_dir.clone(), genesis_path: Some(genesis_path.clone()), car_interval_ms: 100, - max_batch_txs: 100, + max_batch_txs: 500, max_batch_bytes: 1024 * 1024, rpc_enabled: true, // Each validator gets HTTP and WS ports spaced by 10 to avoid conflicts diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 6c6c4ffd..f3d07700 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -58,7 +58,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use cipherbft_metrics; @@ -703,7 +703,9 @@ impl Node { // to ensure batches are flushed before Cars are created. Without this, there's a race // condition where Primary creates empty Cars before Worker flushes pending batches. let worker_config = WorkerConfig::new(self.validator_id, worker_id) - .with_flush_interval(std::time::Duration::from_millis(50)); + .with_flush_interval(std::time::Duration::from_millis(50)) + .with_max_batch_txs(self.config.max_batch_txs) + .with_max_batch_bytes(self.config.max_batch_bytes); let mut worker_handle = Worker::spawn_with_storage( worker_config, Box::new(worker_network), @@ -754,12 +756,12 @@ impl Node { } } => { if let Some(tx_bytes) = tx { - info!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len()); + trace!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len()); if worker_handle.submit_transaction(tx_bytes).await.is_err() { warn!("Worker {} submit_transaction failed", worker_id); // Don't break - continue processing other messages } else { - info!("Worker {} forwarded transaction to batch maker", worker_id); + trace!("Worker {} forwarded transaction to batch maker", worker_id); } } } @@ -784,7 +786,7 @@ impl Node { msg = worker_handle.recv_from_worker() => { match msg { Some(m) => { - info!("Worker {} bridge forwarding {:?} to Primary", worker_id, m); + debug!("Worker {} bridge forwarding {:?} to Primary", worker_id, m); if primary_worker_sender.send(m).await.is_err() { warn!("Worker {} send to primary failed", worker_id); break; diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index bbe0f6e8..70fcfa41 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -39,7 +39,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; use cipherbft_execution::precompiles::{CipherBftPrecompileProvider, StakingPrecompile}; use cipherbft_execution::AccountProof; @@ -1573,7 +1573,7 @@ impl ChannelMempoolApi { #[async_trait] impl MempoolApi for ChannelMempoolApi { async fn submit_transaction(&self, tx_bytes: Bytes) -> RpcResult { - info!( + trace!( "ChannelMempoolApi::submit_transaction received {} bytes (chain_id={})", tx_bytes.len(), self.chain_id @@ -1616,7 +1616,7 @@ impl MempoolApi for ChannelMempoolApi { } // Forward to worker via channel - info!( + trace!( "Sending transaction {} to worker channel (capacity: {})", tx_hash, self.tx_sender.capacity() @@ -1626,7 +1626,7 @@ impl MempoolApi for ChannelMempoolApi { RpcError::Execution("Transaction submission failed: worker channel closed".to_string()) })?; - info!( + trace!( "Transaction {} sent to worker channel ({} bytes)", tx_hash, tx_bytes.len()