Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,10 @@ impl BufferManager {
consensus_publisher: Option<Arc<ConsensusPublisher>>,
max_pending_rounds_in_commit_vote_cache: u64,
new_pipeline_enabled: bool,
channel_id: u64,
) -> Self {
let buffer = Buffer::<BufferItem>::new();

warn!("bowu buffer manager start {:?}, channel_id {}", epoch_state, channel_id);
let rb_backoff_policy = ExponentialBackoff::from_millis(2)
.factor(50)
.max_delay(Duration::from_secs(5));
Expand Down Expand Up @@ -947,10 +948,14 @@ impl BufferManager {
fn need_back_pressure(&self) -> bool {
const MAX_BACKLOG: Round = 20;

self.back_pressure_enabled && self.highest_committed_round + MAX_BACKLOG < self.latest_round
let res = self.back_pressure_enabled && self.highest_committed_round + MAX_BACKLOG < self.latest_round;
warn!("bowu_buffer_manager_backpressur {:?}", res);
res
}

pub async fn start(mut self) {
let channel_id = format!("{:p}", &self.block_rx as *const _);
warn!("bowu Buffer manager starts with block_rx channel_id at {:?}", channel_id);
info!("Buffer manager starts.");
let (verified_commit_msg_tx, mut verified_commit_msg_rx) = create_channel();
let mut interval = tokio::time::interval(Duration::from_millis(LOOP_INTERVAL_MS));
Expand All @@ -975,8 +980,10 @@ impl BufferManager {
});
while !self.stop {
// advancing the root will trigger sending requests to the pipeline
warn!("bowu_select_loop: iteration start, stop={}", self.stop);
::tokio::select! {
Some(blocks) = self.block_rx.next(), if !self.need_back_pressure() => {
warn!("bowu_SELECT_BRANCH: block_rx RECEIVED: round={}, num_blocks={}", blocks.latest_round(), blocks.ordered_blocks.len());
self.latest_round = blocks.latest_round();
monitor!("buffer_manager_process_ordered", {
self.process_ordered_blocks(blocks).await;
Expand All @@ -985,14 +992,17 @@ impl BufferManager {
}});
},
Some(reset_event) = self.reset_rx.next() => {
warn!("bowu_SELECT_BRANCH: reset_rx");
monitor!("buffer_manager_process_reset",
self.process_reset_request(reset_event).await);
},
Some(response) = self.execution_schedule_phase_rx.next() => {
warn!("bowu_SELECT_BRANCH: execution_schedule_phase_rx");
monitor!("buffer_manager_process_execution_schedule_response", {
self.process_execution_schedule_response(response).await;
})},
Some(response) = self.execution_wait_phase_rx.next() => {
warn!("bowu_SELECT_BRANCH: execution_wait_phase_rx");
monitor!("buffer_manager_process_execution_wait_response", {
let response_block_id = response.block_id;
self.process_execution_response(response).await;
Expand All @@ -1012,24 +1022,28 @@ impl BufferManager {
}});
},
_ = self.execution_schedule_retry_rx.next() => {
warn!("bowu_SELECT_BRANCH: execution_schedule_retry_rx");
if !self.new_pipeline_enabled {
monitor!("buffer_manager_process_execution_schedule_retry",
self.retry_schedule_phase().await);
}
},
Some(response) = self.signing_phase_rx.next() => {
warn!("bowu_SELECT_BRANCH: signing_phase_rx");
monitor!("buffer_manager_process_signing_response", {
self.process_signing_response(response).await;
self.advance_signing_root().await
})
},
Some(Ok(round)) = self.persisting_phase_rx.next() => {
warn!("bowu_SELECT_BRANCH: persisting_phase_rx, round={}", round);
// see where `need_backpressure()` is called.
self.pending_commit_votes = self.pending_commit_votes.split_off(&(round + 1));
self.highest_committed_round = round;
self.pending_commit_blocks = self.pending_commit_blocks.split_off(&(round + 1));
},
Some(rpc_request) = verified_commit_msg_rx.next() => {
warn!("bowu_SELECT_BRANCH: verified_commit_msg_rx");
monitor!("buffer_manager_process_commit_message",
if let Some(aggregated_block_id) = self.process_commit_message(rpc_request) {
self.advance_head(aggregated_block_id).await;
Expand All @@ -1042,6 +1056,7 @@ impl BufferManager {
});
}
_ = interval.tick().fuse() => {
warn!("bowu_SELECT_BRANCH: interval_tick");
monitor!("buffer_manager_process_interval_tick", {
self.update_buffer_manager_metrics();
self.rebroadcast_commit_votes_if_needed().await
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/pipeline/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn prepare_phases_and_buffer_manager(
consensus_publisher: Option<Arc<ConsensusPublisher>>,
max_pending_rounds_in_commit_vote_cache: u64,
new_pipeline_enabled: bool,
channel_id: u64,
) -> (
PipelinePhase<ExecutionSchedulePhase>,
PipelinePhase<ExecutionWaitPhase>,
Expand Down Expand Up @@ -139,6 +140,7 @@ pub fn prepare_phases_and_buffer_manager(
consensus_publisher,
max_pending_rounds_in_commit_vote_cache,
new_pipeline_enabled,
channel_id,
),
)
}
48 changes: 33 additions & 15 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ use futures::{
};
use futures_channel::mpsc::unbounded;
use move_core_types::account_address::AccountAddress;
use std::{sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration, sync::atomic::{AtomicU64, Ordering}};

static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);

#[async_trait::async_trait]
pub trait TExecutionClient: Send + Sync {
Expand Down Expand Up @@ -115,6 +117,7 @@ pub trait TExecutionClient: Send + Sync {

struct BufferManagerHandle {
pub execute_tx: Option<UnboundedSender<OrderedBlocks>>,
pub channel_id: Option<u64>,
pub commit_tx:
Option<aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>>,
pub reset_tx_to_buffer_manager: Option<UnboundedSender<ResetRequest>>,
Expand All @@ -125,6 +128,7 @@ impl BufferManagerHandle {
pub fn new() -> Self {
Self {
execute_tx: None,
channel_id: None,
commit_tx: None,
reset_tx_to_buffer_manager: None,
reset_tx_to_rand_manager: None,
Expand All @@ -138,7 +142,9 @@ impl BufferManagerHandle {
reset_tx_to_buffer_manager: UnboundedSender<ResetRequest>,
reset_tx_to_rand_manager: Option<UnboundedSender<ResetRequest>>,
) {
let channel_id = CHANNEL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
self.execute_tx = Some(execute_tx);
self.channel_id = Some(channel_id);
self.commit_tx = Some(commit_tx);
self.reset_tx_to_buffer_manager = Some(reset_tx_to_buffer_manager);
self.reset_tx_to_rand_manager = reset_tx_to_rand_manager;
Expand All @@ -153,6 +159,7 @@ impl BufferManagerHandle {
let reset_tx_to_rand_manager = self.reset_tx_to_rand_manager.take();
let reset_tx_to_buffer_manager = self.reset_tx_to_buffer_manager.take();
self.execute_tx = None;
self.channel_id = None;
self.commit_tx = None;
(reset_tx_to_rand_manager, reset_tx_to_buffer_manager)
}
Expand Down Expand Up @@ -231,6 +238,7 @@ impl ExecutionProxyClient {

let (execution_ready_block_tx, execution_ready_block_rx, maybe_reset_tx_to_rand_manager) =
if let Some(rand_config) = rand_config {
warn!("bowu_rand_config: {:?}", rand_config);
let (ordered_block_tx, ordered_block_rx) = unbounded::<OrderedBlocks>();
let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::<OrderedBlocks>();

Expand Down Expand Up @@ -264,16 +272,18 @@ impl ExecutionProxyClient {
Some(reset_tx_to_rand_manager),
)
} else {
warn!("bowu_randomness disabled");
let (ordered_block_tx, ordered_block_rx) = unbounded();
(ordered_block_tx, ordered_block_rx, None)
};

self.handle.write().init(
execution_ready_block_tx,
commit_msg_tx,
reset_buffer_manager_tx,
maybe_reset_tx_to_rand_manager,
);
let channel_id = self.handle.read().channel_id.unwrap();
warn!("bowu_epoch state {:?}, setting the channel with ID {}", epoch_state, channel_id);

let (
execution_schedule_phase,
Expand All @@ -300,6 +310,7 @@ impl ExecutionProxyClient {
self.consensus_config
.max_pending_rounds_in_commit_vote_cache,
new_pipeline_enabled,
channel_id,
);

tokio::spawn(execution_schedule_phase.start());
Expand Down Expand Up @@ -374,12 +385,15 @@ impl TExecutionClient for ExecutionProxyClient {
callback: StateComputerCommitCallBackType,
) -> ExecutorResult<()> {
assert!(!blocks.is_empty());
let mut execute_tx = match self.handle.read().execute_tx.clone() {
Some(tx) => tx,
None => {
debug!("Failed to send to buffer manager, maybe epoch ends");
return Ok(());
},
let (mut execute_tx, channel_id) = {
let handle = self.handle.read();
match (handle.execute_tx.clone(), handle.channel_id) {
(Some(tx), Some(id)) => (tx, id),
_ => {
debug!("Failed to send to buffer manager, maybe epoch ends");
return Ok(());
},
}
};

for block in &blocks {
Expand All @@ -390,17 +404,21 @@ impl TExecutionClient for ExecutionProxyClient {
.map(|tx| tx.send(ordered_proof.clone()));
}
}

if execute_tx
let result = execute_tx
.send(OrderedBlocks {
ordered_blocks: blocks,
ordered_blocks: blocks.clone(),
ordered_proof: ordered_proof.ledger_info().clone(),
callback,
})
.await
.is_err()
{
debug!("Failed to send to buffer manager, maybe epoch ends");
.await;

match result {
Ok(_) => {
warn!("bowu_finalize_order channel_id {}, SUCCESSFULLY sent {} blocks", channel_id, blocks.len());
}
Err(e) => {
warn!("bowu_finalize_order channel_id {}, FAILED to send: {:?}", channel_id, e);
}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub fn prepare_buffer_manager(
None,
100,
true,
0, // channel_id for testing
);

(
Expand Down
6 changes: 4 additions & 2 deletions consensus/src/rand/rand_gen/aug_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::rand::rand_gen::{
};
use anyhow::ensure;
use aptos_consensus_types::common::Author;
use aptos_logger::error;
use aptos_logger::{error, warn};
use aptos_types::validator_signer::ValidatorSigner;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -96,7 +96,9 @@ impl<D: TAugmentedData> AugDataStore<D> {
}

pub fn my_certified_aug_data_exists(&self) -> bool {
self.certified_data.contains_key(&self.config.author())
let res = self.certified_data.contains_key(&self.config.author());
warn!("bowu_rand_gen: {}", res);
res
}

pub fn add_aug_data(&mut self, data: AugData<D>) -> anyhow::Result<AugDataSignature> {
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/rand/rand_gen/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
pipeline::buffer_manager::OrderedBlocks,
};
use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock};
use aptos_logger::warn;
use aptos_reliable_broadcast::DropGuard;
use aptos_types::randomness::{FullRandMetadata, Randomness};
use std::{
Expand Down Expand Up @@ -118,6 +119,7 @@ impl BlockQueue {
pub fn dequeue_rand_ready_prefix(&mut self) -> Vec<OrderedBlocks> {
let mut rand_ready_prefix = vec![];
while let Some((_starting_round, item)) = self.queue.first_key_value() {
warn!("bowu_rand_gen_dequeue: {}, {:?} ", _starting_round, item.num_undecided());
if item.num_undecided() == 0 {
let (_, item) = self.queue.pop_first().unwrap();
for block in item.blocks() {
Expand All @@ -133,6 +135,7 @@ impl BlockQueue {
break;
}
}

rand_ready_prefix
}

Expand Down
1 change: 1 addition & 0 deletions consensus/src/rand/rand_gen/rand_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl<S: TShare, D: TAugmentedData> RandManager<S, D> {
tokio::select! {
Some(blocks) = incoming_blocks.next(), if self.aug_data_store.my_certified_aug_data_exists() => {
self.process_incoming_blocks(blocks);
warn!("bowu_rand_manager_finish_blocks");
}
Some(reset) = reset_rx.next() => {
while matches!(incoming_blocks.try_next(), Ok(Some(_))) {}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/rand/rand_gen/rand_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl<S: TShare> ShareAggregator<S> {
rand_metadata: FullRandMetadata,
decision_tx: Sender<Randomness>,
) -> Either<Self, RandShare<S>> {
warn!("bowu_rand_store: {}, {}", self.total_weight, rand_config.threshold());
if self.total_weight < rand_config.threshold() {
return Either::Left(self);
}
Expand Down
3 changes: 3 additions & 0 deletions tools/l1-migration/local-node-configs/validator_node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ base:
role: validator
waypoint:
from_file: /Users/bowu/data/.maptos/waypoint.txt # update to your path
randomness_override_seq_num: 999999999 # disable randomness - must be at top level, not under consensus
consensus:
sync_only: false
vote_back_pressure_limit: 200000
safety_rules:
service:
type: local
Expand Down
Loading