Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion aptos-core/consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,18 @@ impl PipelinedBlock {
}

pub fn set_randomness(&self, randomness: Randomness) {
assert!(self.randomness.set(randomness.clone()).is_ok());
if self.randomness.set(randomness.clone()).is_err() {
if let Some(existing) = self.randomness.get() {
if *existing != randomness {
warn!(
"set_randomness called again with a different value on block {}: existing={:?}, new={:?}",
self.id(),
existing,
randomness,
);
}
}
}
}

pub fn set_insertion_time(&self) {
Expand Down
4 changes: 2 additions & 2 deletions aptos-core/consensus/src/consensusdb/include/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ impl ConsensusDB {
// TODO(gravity_byteyue): this is a temporary solution to enable quorum store
// We should get the value from the storage instead of using env variable
fn enable_quorum_store() -> bool {
std::env::var("ENABLE_QUORUM_STORE").map(|s| s.parse().unwrap()).unwrap_or(true)
std::env::var("ENABLE_QUORUM_STORE").ok().and_then(|s| s.parse().ok()).unwrap_or(true)
}

fn fixed_proposer() -> bool {
std::env::var("FIXED_PROPOSER").map(|s| s.parse().unwrap()).unwrap_or(true)
std::env::var("FIXED_PROPOSER").ok().and_then(|s| s.parse().ok()).unwrap_or(true)
}

impl DbReader for ConsensusDB {
Expand Down
2 changes: 1 addition & 1 deletion aptos-core/consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
fn enable_quorum_store(&mut self, onchain_config: &OnChainConsensusConfig) -> bool {
fail_point!("consensus::start_new_epoch::disable_qs", |_| false);
// TODO(gravity_byteyue): Use onchain config in the future
std::env::var("ENABLE_QUORUM_STORE").map(|s| s.parse().unwrap()).unwrap_or(true)
std::env::var("ENABLE_QUORUM_STORE").ok().and_then(|s| s.parse().ok()).unwrap_or(true)
// onchain_config.quorum_store_enabled()
}

Expand Down
4 changes: 2 additions & 2 deletions aptos-core/consensus/src/gravity_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl BlockExecutorTrait for GravityBlockExecutor {
epoch,
)
.await
.unwrap_or_else(|e| panic!("Failed to push commit blocks {}", e));
.expect("Failed to set commit blocks in BlockBufferManager");
for notifier in persist_notifiers.iter_mut() {
let _ = notifier.recv().await;
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl BlockExecutorTrait for GravityBlockExecutor {
epoch,
)
.await
.unwrap();
.expect("Failed to set commit blocks in BlockBufferManager");
for notifier in persist_notifiers.iter_mut() {
let _ = notifier.recv().await;
}
Expand Down
27 changes: 14 additions & 13 deletions aptos-core/consensus/src/liveness/leader_reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl AptosDBBackend {
}

if result.is_empty() {
warn!("No events in the requested window could be found");
error!("[leader reputation] No events in the requested window could be found, leader election may degrade");
(result, HashValue::zero())
} else {
let root_hash = self
Expand Down Expand Up @@ -170,8 +170,8 @@ impl MetadataBackend for AptosDBBackend {
// lazy init db_result
if locked.is_none() {
if let Err(e) = self.refresh_db_result(&mut locked, latest_db_version) {
warn!(
error = ?e, "[leader reputation] Fail to initialize db result",
error!(
error = ?e, "[leader reputation] Fail to initialize db result, leader election may degrade",
);
return (vec![], HashValue::zero());
}
Expand All @@ -195,8 +195,8 @@ impl MetadataBackend for AptosDBBackend {
}
Err(e) => {
// fails if requested events were pruned / or we never backfil them.
warn!(
error = ?e, "[leader reputation] Fail to refresh window",
error!(
error = ?e, "[leader reputation] Fail to refresh window, leader election may degrade",
);
(vec![], HashValue::zero())
}
Expand Down Expand Up @@ -353,8 +353,8 @@ impl NewBlockEventAggregation {
) {
Ok(voters) => {
for &voter in voters {
let count = map.entry(voter).or_insert(0);
*count += 1;
let count = map.entry(voter).or_insert(0u32);
*count = count.saturating_add(1);
}
}
Err(msg) => {
Expand Down Expand Up @@ -393,8 +393,8 @@ impl NewBlockEventAggregation {
Self::history_iter(history, epoch_to_candidates, window_size, from_stale_end).fold(
HashMap::new(),
|mut map, meta| {
let count = map.entry(meta.proposer()).or_insert(0);
*count += 1;
let count = map.entry(meta.proposer()).or_insert(0u32);
*count = count.saturating_add(1);
map
},
)
Expand All @@ -418,8 +418,8 @@ impl NewBlockEventAggregation {
) {
Ok(failed_proposers) => {
for &failed_proposer in failed_proposers {
let count = map.entry(failed_proposer).or_insert(0);
*count += 1;
let count = map.entry(failed_proposer).or_insert(0u32);
*count = count.saturating_add(1);
}
}
Err(msg) => {
Expand Down Expand Up @@ -515,8 +515,9 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic {
let cur_proposals = *proposals.get(author).unwrap_or(&0);
let cur_failed_proposals = *failed_proposals.get(author).unwrap_or(&0);

if cur_failed_proposals * 100 >
(cur_proposals + cur_failed_proposals) * self.failure_threshold_percent
if (cur_failed_proposals as u64) * 100 >
(cur_proposals as u64 + cur_failed_proposals as u64) *
self.failure_threshold_percent as u64
{
self.failed_weight
} else if cur_proposals > 0 || cur_votes > 0 {
Expand Down
18 changes: 13 additions & 5 deletions bin/gravity_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,22 @@ fn run_reth(
handle.node.pool.pending_transactions_listener();
let engine_cli = handle.node.auth_server_handle().clone();
let provider = handle.node.provider;
let recover_block_number = provider.recover_block_number().unwrap();
let recover_block_number = provider
.recover_block_number()
.expect("Failed to recover block number from DB");
info!("The latest_block_number is {}", recover_block_number);
let latest_block_hash =
provider.block_hash(recover_block_number).unwrap().unwrap();
let latest_block_hash = provider
.block_hash(recover_block_number)
.expect("Failed to read block hash from DB")
.unwrap_or_else(|| {
panic!("Block hash not found for block number {recover_block_number}")
});
let latest_block = provider
.block(BlockHashOrNumber::Number(recover_block_number))
.unwrap()
.unwrap();
.expect("Failed to read block from DB")
.unwrap_or_else(|| {
panic!("Block not found for block number {recover_block_number}")
});
let pool = handle.node.pool;

let storage = BlockViewStorage::new(provider.clone());
Expand Down
4 changes: 3 additions & 1 deletion bin/gravity_node/src/reth_coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ impl<EthApi: RethEthCall> RethCoordinator<EthApi> {
.collect();
info!("send_execution_args block_number_to_block_id: {:?}", block_number_to_block_id);
let execution_args = ExecutionArgs { block_number_to_block_id };
execution_args_tx.send(execution_args).unwrap();
execution_args_tx
.send(execution_args)
.expect("Failed to send execution args: reth receiver may have been dropped");
}
}

Expand Down
2 changes: 0 additions & 2 deletions crates/api/src/config_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ impl ConfigStorage for ConfigStorageWrapper {
config_name: OnChainConfig,
block_number: BlockNumber,
) -> Option<OnChainConfigResType> {
println!("fetch_config_bytes: {config_name:?}, block_number: {block_number:?}");

info!("fetch_config_bytes: {:?}, block_number: {:?}", config_name, block_number);
match config_name {
OnChainConfig::Epoch |
Expand Down
6 changes: 4 additions & 2 deletions crates/api/src/consensus_mempool_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ impl<M: MempoolNotificationSender> ConsensusToMempoolHandler<M> {
sync_notification.get_target().ledger_info().block_number(),
)
.unwrap();
let _ = self
if let Err(e) = self
.consensus_notification_listener
.respond_to_sync_target_notification(sync_notification, Ok(()))
.map_err(|e| anyhow::anyhow!(e));
{
warn!("Failed to respond to sync target notification: {:?}", e);
}
Ok(())
}
ConsensusNotification::SyncForDuration(_consensus_sync_duration_notification) => {
Expand Down
3 changes: 2 additions & 1 deletion crates/block-buffer-manager/src/block_buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,11 +1010,12 @@ impl BlockBufferManager {
latest_epoch_change_block_number, block_state_machine.current_epoch
);

self.buffer_state.store(BufferState::EpochChange as u8, Ordering::SeqCst);

block_state_machine
.blocks
.retain(|key, _| key.block_number <= latest_epoch_change_block_number);

self.buffer_state.store(BufferState::EpochChange as u8, Ordering::SeqCst);
block_state_machine
.profile
.retain(|key, _| key.block_number <= latest_epoch_change_block_number);
Expand Down
Loading