Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const TOKEN_METADATA: TableDefinition<&str, (&str, u64, &str)> =
const ERC20_DEPOSITS: TableDefinition<&str, (&str, &str, &str, &str, &str)> =
TableDefinition::new("erc20_deposits"); // tx_hash:log_index -> (account_id, amount, token_address, token_symbol, status)
const SWEEP_META: TableDefinition<&str, (&str, u64)> = TableDefinition::new("sweep_meta"); // deposit_key -> (sweep_tx_hash, zero_balance_retry_count)
const SWEEP_FAILURES: TableDefinition<&str, u64> = TableDefinition::new("sweep_failures"); // deposit_key -> consecutive_failure_count

#[derive(Clone, Debug)]
pub struct Erc20Deposit {
Expand Down Expand Up @@ -40,6 +41,7 @@ impl Db {
let _ = write_txn.open_table(TOKEN_METADATA)?;
let _ = write_txn.open_table(ERC20_DEPOSITS)?;
let _ = write_txn.open_table(SWEEP_META)?;
let _ = write_txn.open_table(SWEEP_FAILURES)?;
}
write_txn.commit()?;

Expand Down Expand Up @@ -409,4 +411,105 @@ impl Db {
(val.0.to_string(), val.1)
}))
}

// ========== Sweep Failure Tracking ==========

/// Increment the sweep failure count for a deposit. Returns the new count.
pub fn increment_sweep_failure_count(&self, key: &str) -> Result<u64> {
let write_txn = self.db.begin_write()?;
let new_count = {
let mut failures = write_txn.open_table(SWEEP_FAILURES)?;
let count = match failures.get(key)? {
Some(v) => v.value(),
None => 0,
};
let new_count = count + 1;
failures.insert(key, new_count)?;
new_count
};
write_txn.commit()?;
Ok(new_count)
}

/// Mark a single ERC20 deposit as permanently failed.
pub fn mark_erc20_deposit_failed(&self, key: &str) -> Result<()> {
let write_txn = self.db.begin_write()?;
{
let mut deposits = write_txn.open_table(ERC20_DEPOSITS)?;
let (account_id, amount, token_address, token_symbol) = {
let current_val = deposits.get(key)?;
if let Some(v) = current_val {
let val = v.value();
(
val.0.to_string(),
val.1.to_string(),
val.2.to_string(),
val.3.to_string(),
)
} else {
return Ok(());
}
};

deposits.insert(
key,
(
account_id.as_str(),
amount.as_str(),
token_address.as_str(),
token_symbol.as_str(),
"failed",
),
)?;
}
write_txn.commit()?;
Ok(())
}

/// Mark all detected ERC20 deposits for a given (account_id, token_address) as permanently failed.
/// Returns the list of deposit keys that were marked.
pub fn mark_erc20_deposits_failed_for_account_token(
&self,
account_id: &str,
token_address: &str,
) -> Result<Vec<String>> {
let write_txn = self.db.begin_write()?;
let mut marked_keys = Vec::new();
{
let mut deposits = write_txn.open_table(ERC20_DEPOSITS)?;

let keys_to_update: Vec<(String, String, String, String)> = {
let mut to_update = Vec::new();
for item in deposits.iter()? {
let (key, value) = item?;
let (acc_id, amount, tok_addr, tok_symbol, status) = value.value();
if status == "detected" && acc_id == account_id && tok_addr == token_address {
to_update.push((
key.value().to_string(),
amount.to_string(),
tok_symbol.to_string(),
acc_id.to_string(),
));
}
}
to_update
};

for (key, amount, tok_symbol, acc_id) in &keys_to_update {
deposits.insert(
key.as_str(),
(
acc_id.as_str(),
amount.as_str(),
token_address,
tok_symbol.as_str(),
"failed",
),
)?;
marked_keys.push(key.clone());
}
}
write_txn.commit()?;
Ok(marked_keys)
}
}
25 changes: 23 additions & 2 deletions src/sweeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ where
/// swept as part of a consolidated sweep and is marked as swept to avoid infinite retries.
const MAX_ZERO_BALANCE_RETRIES: u64 = 10;

/// After this many consecutive sweep failures (e.g. "buffer overrun while deserializing"),
/// a deposit is marked as permanently failed to stop wasting RPC credits on deterministic errors.
const MAX_SWEEP_RETRIES: u64 = 5;

impl<T> Sweeper<alloy::providers::RootProvider<T>>
where
T: alloy::transports::Transport + Clone,
Expand Down Expand Up @@ -190,8 +194,25 @@ where
}
Err(e) => {
error!("Failed to sweep ERC20 deposit {}: {:?}", deposit.key, e);
// Don't return error - continue processing other deposits
// This deposit will be retried in the next sweep cycle
if let Ok(failures) = self.db.increment_sweep_failure_count(&deposit.key) {
if failures >= MAX_SWEEP_RETRIES {
let registration_id = &deposit.account_id;
match self.db.mark_erc20_deposits_failed_for_account_token(
registration_id,
&deposit.token_address,
) {
Ok(failed_keys) => {
error!(
"Permanently marked {} deposit(s) as failed for account={}, token={} after {} attempts: {:?}",
failed_keys.len(), registration_id, deposit.token_symbol, failures, e
);
}
Err(db_err) => {
error!("Failed to mark deposits as failed: {:?}", db_err);
}
}
}
}
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,3 +862,49 @@ async fn test_verify_transfer_reverted_transaction() {
}
}
}

// ========== Sweep Failure Tracking Tests ==========

#[test]
fn test_erc20_deposit_failure_tracking() {
let db_file = NamedTempFile::new().unwrap();
let db = Db::new(db_file.path().to_str().unwrap()).unwrap();

db.record_erc20_deposit("0xabc", 1, "user_1", "1000000", "0xtoken", "USDC")
.unwrap();

assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 1);

for i in 1..=5 {
let count = db.increment_sweep_failure_count("0xabc:1").unwrap();
assert_eq!(count, i);
}

db.mark_erc20_deposit_failed("0xabc:1").unwrap();

assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 0);
}

#[test]
fn test_erc20_bulk_mark_failed_for_account_token() {
let db_file = NamedTempFile::new().unwrap();
let db = Db::new(db_file.path().to_str().unwrap()).unwrap();

db.record_erc20_deposit("0xaaa", 1, "user_1", "1000000", "0xtoken_a", "USDC")
.unwrap();
db.record_erc20_deposit("0xbbb", 2, "user_1", "2000000", "0xtoken_a", "USDC")
.unwrap();
db.record_erc20_deposit("0xccc", 3, "user_1", "3000000", "0xtoken_b", "USDT")
.unwrap();

assert_eq!(db.get_detected_erc20_deposits().unwrap().len(), 3);

let failed = db
.mark_erc20_deposits_failed_for_account_token("user_1", "0xtoken_a")
.unwrap();
assert_eq!(failed.len(), 2);

let remaining = db.get_detected_erc20_deposits().unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].token_symbol, "USDT");
}
Loading