diff --git a/aptos-core/consensus/consensus-types/src/pipelined_block.rs b/aptos-core/consensus/consensus-types/src/pipelined_block.rs index 1bd7b5a35..14121af02 100644 --- a/aptos-core/consensus/consensus-types/src/pipelined_block.rs +++ b/aptos-core/consensus/consensus-types/src/pipelined_block.rs @@ -249,7 +249,18 @@ impl PipelinedBlock { } pub fn set_randomness(&self, randomness: Randomness) { - assert!(self.randomness.set(randomness.clone()).is_ok()); + if self.randomness.set(randomness.clone()).is_err() { + if let Some(existing) = self.randomness.get() { + if *existing != randomness { + warn!( + "set_randomness called again with a different value on block {}: existing={:?}, new={:?}", + self.id(), + existing, + randomness, + ); + } + } + } } pub fn set_insertion_time(&self) { diff --git a/aptos-core/consensus/src/consensusdb/include/reader.rs b/aptos-core/consensus/src/consensusdb/include/reader.rs index 7f9472f48..58f385e26 100644 --- a/aptos-core/consensus/src/consensusdb/include/reader.rs +++ b/aptos-core/consensus/src/consensusdb/include/reader.rs @@ -40,11 +40,11 @@ impl ConsensusDB { // TODO(gravity_byteyue): this is a temporary solution to enable quorum store // We should get the value from the storage instead of using env variable fn enable_quorum_store() -> bool { - std::env::var("ENABLE_QUORUM_STORE").map(|s| s.parse().unwrap()).unwrap_or(true) + std::env::var("ENABLE_QUORUM_STORE").ok().and_then(|s| s.parse().ok()).unwrap_or(true) } fn fixed_proposer() -> bool { - std::env::var("FIXED_PROPOSER").map(|s| s.parse().unwrap()).unwrap_or(true) + std::env::var("FIXED_PROPOSER").ok().and_then(|s| s.parse().ok()).unwrap_or(true) } impl DbReader for ConsensusDB { diff --git a/aptos-core/consensus/src/epoch_manager.rs b/aptos-core/consensus/src/epoch_manager.rs index 0db2d81c8..fb6a5098e 100644 --- a/aptos-core/consensus/src/epoch_manager.rs +++ b/aptos-core/consensus/src/epoch_manager.rs @@ -1476,7 +1476,7 @@ impl EpochManager

{ fn enable_quorum_store(&mut self, onchain_config: &OnChainConsensusConfig) -> bool { fail_point!("consensus::start_new_epoch::disable_qs", |_| false); // TODO(gravity_byteyue): Use onchain config in the future - std::env::var("ENABLE_QUORUM_STORE").map(|s| s.parse().unwrap()).unwrap_or(true) + std::env::var("ENABLE_QUORUM_STORE").ok().and_then(|s| s.parse().ok()).unwrap_or(true) // onchain_config.quorum_store_enabled() } diff --git a/aptos-core/consensus/src/gravity_state_computer.rs b/aptos-core/consensus/src/gravity_state_computer.rs index 5d86f0134..7d1e4988f 100644 --- a/aptos-core/consensus/src/gravity_state_computer.rs +++ b/aptos-core/consensus/src/gravity_state_computer.rs @@ -120,7 +120,7 @@ impl BlockExecutorTrait for GravityBlockExecutor { epoch, ) .await - .unwrap_or_else(|e| panic!("Failed to push commit blocks {}", e)); + .expect("Failed to set commit blocks in BlockBufferManager"); for notifier in persist_notifiers.iter_mut() { let _ = notifier.recv().await; } @@ -181,7 +181,7 @@ impl BlockExecutorTrait for GravityBlockExecutor { epoch, ) .await - .unwrap(); + .expect("Failed to set commit blocks in BlockBufferManager"); for notifier in persist_notifiers.iter_mut() { let _ = notifier.recv().await; } diff --git a/aptos-core/consensus/src/liveness/leader_reputation.rs b/aptos-core/consensus/src/liveness/leader_reputation.rs index ccad91cc8..9ffd3cf39 100644 --- a/aptos-core/consensus/src/liveness/leader_reputation.rs +++ b/aptos-core/consensus/src/liveness/leader_reputation.rs @@ -140,7 +140,7 @@ impl AptosDBBackend { } if result.is_empty() { - warn!("No events in the requested window could be found"); + error!("[leader reputation] No events in the requested window could be found, leader election may degrade"); (result, HashValue::zero()) } else { let root_hash = self @@ -170,8 +170,8 @@ impl MetadataBackend for AptosDBBackend { // lazy init db_result if locked.is_none() { if let Err(e) = self.refresh_db_result(&mut locked, latest_db_version) { - warn!( - error = ?e, "[leader reputation] Fail to initialize db result", + error!( + error = ?e, "[leader reputation] Fail to initialize db result, leader election may degrade", ); return (vec![], HashValue::zero()); } @@ -195,8 +195,8 @@ impl MetadataBackend for AptosDBBackend { } Err(e) => { // fails if requested events were pruned / or we never backfil them. - warn!( - error = ?e, "[leader reputation] Fail to refresh window", + error!( + error = ?e, "[leader reputation] Fail to refresh window, leader election may degrade", ); (vec![], HashValue::zero()) } @@ -353,8 +353,8 @@ impl NewBlockEventAggregation { ) { Ok(voters) => { for &voter in voters { - let count = map.entry(voter).or_insert(0); - *count += 1; + let count = map.entry(voter).or_insert(0u32); + *count = count.saturating_add(1); } } Err(msg) => { @@ -393,8 +393,8 @@ impl NewBlockEventAggregation { Self::history_iter(history, epoch_to_candidates, window_size, from_stale_end).fold( HashMap::new(), |mut map, meta| { - let count = map.entry(meta.proposer()).or_insert(0); - *count += 1; + let count = map.entry(meta.proposer()).or_insert(0u32); + *count = count.saturating_add(1); map }, ) @@ -418,8 +418,8 @@ impl NewBlockEventAggregation { ) { Ok(failed_proposers) => { for &failed_proposer in failed_proposers { - let count = map.entry(failed_proposer).or_insert(0); - *count += 1; + let count = map.entry(failed_proposer).or_insert(0u32); + *count = count.saturating_add(1); } } Err(msg) => { @@ -515,8 +515,9 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic { let cur_proposals = *proposals.get(author).unwrap_or(&0); let cur_failed_proposals = *failed_proposals.get(author).unwrap_or(&0); - if cur_failed_proposals * 100 > - (cur_proposals + cur_failed_proposals) * self.failure_threshold_percent + if (cur_failed_proposals as u64) * 100 > + (cur_proposals as u64 + cur_failed_proposals as u64) * + self.failure_threshold_percent as u64 { self.failed_weight } else if cur_proposals > 0 || cur_votes > 0 { diff --git a/bin/gravity_node/src/main.rs b/bin/gravity_node/src/main.rs index c8d78f948..9a7cfad52 100644 --- a/bin/gravity_node/src/main.rs +++ b/bin/gravity_node/src/main.rs @@ -110,14 +110,22 @@ fn run_reth( handle.node.pool.pending_transactions_listener(); let engine_cli = handle.node.auth_server_handle().clone(); let provider = handle.node.provider; - let recover_block_number = provider.recover_block_number().unwrap(); + let recover_block_number = provider + .recover_block_number() + .expect("Failed to recover block number from DB"); info!("The latest_block_number is {}", recover_block_number); - let latest_block_hash = - provider.block_hash(recover_block_number).unwrap().unwrap(); + let latest_block_hash = provider + .block_hash(recover_block_number) + .expect("Failed to read block hash from DB") + .unwrap_or_else(|| { + panic!("Block hash not found for block number {recover_block_number}") + }); let latest_block = provider .block(BlockHashOrNumber::Number(recover_block_number)) - .unwrap() - .unwrap(); + .expect("Failed to read block from DB") + .unwrap_or_else(|| { + panic!("Block not found for block number {recover_block_number}") + }); let pool = handle.node.pool; let storage = BlockViewStorage::new(provider.clone()); diff --git a/bin/gravity_node/src/reth_coordinator/mod.rs b/bin/gravity_node/src/reth_coordinator/mod.rs index 8a4f03fc9..97433654b 100644 --- a/bin/gravity_node/src/reth_coordinator/mod.rs +++ b/bin/gravity_node/src/reth_coordinator/mod.rs @@ -33,7 +33,9 @@ impl RethCoordinator { .collect(); info!("send_execution_args block_number_to_block_id: {:?}", block_number_to_block_id); let execution_args = ExecutionArgs { block_number_to_block_id }; - execution_args_tx.send(execution_args).unwrap(); + execution_args_tx + .send(execution_args) + .expect("Failed to send execution args: reth receiver may have been dropped"); } } diff --git a/crates/api/src/config_storage.rs b/crates/api/src/config_storage.rs index 2a2d436ae..f568e0212 100644 --- a/crates/api/src/config_storage.rs +++ b/crates/api/src/config_storage.rs @@ -20,8 +20,6 @@ impl ConfigStorage for ConfigStorageWrapper { config_name: OnChainConfig, block_number: BlockNumber, ) -> Option { - println!("fetch_config_bytes: {config_name:?}, block_number: {block_number:?}"); - info!("fetch_config_bytes: {:?}, block_number: {:?}", config_name, block_number); match config_name { OnChainConfig::Epoch | diff --git a/crates/api/src/consensus_mempool_handler.rs b/crates/api/src/consensus_mempool_handler.rs index 7648d6fe6..776ef717d 100644 --- a/crates/api/src/consensus_mempool_handler.rs +++ b/crates/api/src/consensus_mempool_handler.rs @@ -104,10 +104,12 @@ impl ConsensusToMempoolHandler { sync_notification.get_target().ledger_info().block_number(), ) .unwrap(); - let _ = self + if let Err(e) = self .consensus_notification_listener .respond_to_sync_target_notification(sync_notification, Ok(())) - .map_err(|e| anyhow::anyhow!(e)); + { + warn!("Failed to respond to sync target notification: {:?}", e); + } Ok(()) } ConsensusNotification::SyncForDuration(_consensus_sync_duration_notification) => { diff --git a/crates/block-buffer-manager/src/block_buffer_manager.rs b/crates/block-buffer-manager/src/block_buffer_manager.rs index 4f1771a5b..3602925ca 100644 --- a/crates/block-buffer-manager/src/block_buffer_manager.rs +++ b/crates/block-buffer-manager/src/block_buffer_manager.rs @@ -1010,11 +1010,12 @@ impl BlockBufferManager { latest_epoch_change_block_number, block_state_machine.current_epoch ); + self.buffer_state.store(BufferState::EpochChange as u8, Ordering::SeqCst); + block_state_machine .blocks .retain(|key, _| key.block_number <= latest_epoch_change_block_number); - self.buffer_state.store(BufferState::EpochChange as u8, Ordering::SeqCst); block_state_machine .profile .retain(|key, _| key.block_number <= latest_epoch_change_block_number);