diff --git a/src/db.rs b/src/db.rs index 3374fe7..34ecaed 100644 --- a/src/db.rs +++ b/src/db.rs @@ -11,6 +11,7 @@ const TOKEN_METADATA: TableDefinition<&str, (&str, u64, &str)> = const ERC20_DEPOSITS: TableDefinition<&str, (&str, &str, &str, &str, &str)> = TableDefinition::new("erc20_deposits"); // tx_hash:log_index -> (account_id, amount, token_address, token_symbol, status) const SWEEP_META: TableDefinition<&str, (&str, u64)> = TableDefinition::new("sweep_meta"); // deposit_key -> (sweep_tx_hash, zero_balance_retry_count) +const SWEEP_FAILURES: TableDefinition<&str, u64> = TableDefinition::new("sweep_failures"); // deposit_key -> consecutive_failure_count #[derive(Clone, Debug)] pub struct Erc20Deposit { @@ -40,6 +41,7 @@ impl Db { let _ = write_txn.open_table(TOKEN_METADATA)?; let _ = write_txn.open_table(ERC20_DEPOSITS)?; let _ = write_txn.open_table(SWEEP_META)?; + let _ = write_txn.open_table(SWEEP_FAILURES)?; } write_txn.commit()?; @@ -409,4 +411,105 @@ impl Db { (val.0.to_string(), val.1) })) } + + // ========== Sweep Failure Tracking ========== + + /// Increment the sweep failure count for a deposit. Returns the new count. + pub fn increment_sweep_failure_count(&self, key: &str) -> Result { + let write_txn = self.db.begin_write()?; + let new_count = { + let mut failures = write_txn.open_table(SWEEP_FAILURES)?; + let count = match failures.get(key)? { + Some(v) => v.value(), + None => 0, + }; + let new_count = count + 1; + failures.insert(key, new_count)?; + new_count + }; + write_txn.commit()?; + Ok(new_count) + } + + /// Mark a single ERC20 deposit as permanently failed. + pub fn mark_erc20_deposit_failed(&self, key: &str) -> Result<()> { + let write_txn = self.db.begin_write()?; + { + let mut deposits = write_txn.open_table(ERC20_DEPOSITS)?; + let (account_id, amount, token_address, token_symbol) = { + let current_val = deposits.get(key)?; + if let Some(v) = current_val { + let val = v.value(); + ( + val.0.to_string(), + val.1.to_string(), + val.2.to_string(), + val.3.to_string(), + ) + } else { + return Ok(()); + } + }; + + deposits.insert( + key, + ( + account_id.as_str(), + amount.as_str(), + token_address.as_str(), + token_symbol.as_str(), + "failed", + ), + )?; + } + write_txn.commit()?; + Ok(()) + } + + /// Mark all detected ERC20 deposits for a given (account_id, token_address) as permanently failed. + /// Returns the list of deposit keys that were marked. + pub fn mark_erc20_deposits_failed_for_account_token( + &self, + account_id: &str, + token_address: &str, + ) -> Result> { + let write_txn = self.db.begin_write()?; + let mut marked_keys = Vec::new(); + { + let mut deposits = write_txn.open_table(ERC20_DEPOSITS)?; + + let keys_to_update: Vec<(String, String, String, String)> = { + let mut to_update = Vec::new(); + for item in deposits.iter()? { + let (key, value) = item?; + let (acc_id, amount, tok_addr, tok_symbol, status) = value.value(); + if status == "detected" && acc_id == account_id && tok_addr == token_address { + to_update.push(( + key.value().to_string(), + amount.to_string(), + tok_symbol.to_string(), + acc_id.to_string(), + )); + } + } + to_update + }; + + for (key, amount, tok_symbol, acc_id) in &keys_to_update { + deposits.insert( + key.as_str(), + ( + acc_id.as_str(), + amount.as_str(), + token_address, + tok_symbol.as_str(), + "failed", + ), + )?; + marked_keys.push(key.clone()); + } + } + write_txn.commit()?; + Ok(marked_keys) + } } diff --git a/src/sweeper.rs b/src/sweeper.rs index 9166f02..4cb3027 100644 --- a/src/sweeper.rs +++ b/src/sweeper.rs @@ -60,6 +60,10 @@ where /// swept as part of a consolidated sweep and is marked as swept to avoid infinite retries. const MAX_ZERO_BALANCE_RETRIES: u64 = 10; +/// After this many consecutive sweep failures (e.g. "buffer overrun while deserializing"), +/// a deposit is marked as permanently failed to stop wasting RPC credits on deterministic errors. +const MAX_SWEEP_RETRIES: u64 = 5; + impl Sweeper> where T: alloy::transports::Transport + Clone, @@ -190,8 +194,25 @@ where } Err(e) => { error!("Failed to sweep ERC20 deposit {}: {:?}", deposit.key, e); - // Don't return error - continue processing other deposits - // This deposit will be retried in the next sweep cycle + if let Ok(failures) = self.db.increment_sweep_failure_count(&deposit.key) { + if failures >= MAX_SWEEP_RETRIES { + let registration_id = &deposit.account_id; + match self.db.mark_erc20_deposits_failed_for_account_token( + registration_id, + &deposit.token_address, + ) { + Ok(failed_keys) => { + error!( + "Permanently marked {} deposit(s) as failed for account={}, token={} after {} attempts: {:?}", + failed_keys.len(), registration_id, deposit.token_symbol, failures, e + ); + } + Err(db_err) => { + error!("Failed to mark deposits as failed: {:?}", db_err); + } + } + } + } } } } diff --git a/src/tests.rs b/src/tests.rs index 5bcb842..59094f0 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -862,3 +862,49 @@ async fn test_verify_transfer_reverted_transaction() { } } } + +// ========== Sweep Failure Tracking Tests ========== + +#[test] +fn test_erc20_deposit_failure_tracking() { + let db_file = NamedTempFile::new().unwrap(); + let db = Db::new(db_file.path().to_str().unwrap()).unwrap(); + + db.record_erc20_deposit("0xabc", 1, "user_1", "1000000", "0xtoken", "USDC") + .unwrap(); + + assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 1); + + for i in 1..=5 { + let count = db.increment_sweep_failure_count("0xabc:1").unwrap(); + assert_eq!(count, i); + } + + db.mark_erc20_deposit_failed("0xabc:1").unwrap(); + + assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 0); +} + +#[test] +fn test_erc20_bulk_mark_failed_for_account_token() { + let db_file = NamedTempFile::new().unwrap(); + let db = Db::new(db_file.path().to_str().unwrap()).unwrap(); + + db.record_erc20_deposit("0xaaa", 1, "user_1", "1000000", "0xtoken_a", "USDC") + .unwrap(); + db.record_erc20_deposit("0xbbb", 2, "user_1", "2000000", "0xtoken_a", "USDC") + .unwrap(); + db.record_erc20_deposit("0xccc", 3, "user_1", "3000000", "0xtoken_b", "USDT") + .unwrap(); + + assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 3); + + let failed = db + .mark_erc20_deposits_failed_for_account_token("user_1", "0xtoken_a") + .unwrap(); + assert_eq!(failed.len(), 2); + + let remaining = db.get_detected_erc20_deposits().unwrap(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].token_symbol, "USDT"); +}