diff --git a/Cargo.lock b/Cargo.lock index cb3cd084..5bf9c532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1492,14 +1492,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http", "http-body", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -1512,6 +1512,39 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -1532,6 +1565,25 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "az" version = "1.2.1" @@ -2245,6 +2297,33 @@ dependencies = [ "webbrowser", ] +[[package]] +name = "contender_server" +version = "0.9.0" +dependencies = [ + "async-trait", + "axum 0.8.8", + "base64 0.22.1", + "contender_cli", + "contender_core", + "contender_sqlite", + "contender_testfile", + "jsonrpsee 0.24.10", + "quote", + "rand 0.8.5", + "serde", + "serde_json", + "syn 2.0.108", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.4.13", + "tower-http 0.5.2", + "tracing", + "tracing-subscriber 0.3.20", +] + [[package]] name = "contender_sqlite" version = "0.9.0" @@ -2597,7 +2676,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.108", ] [[package]] @@ -4162,24 +4241,62 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpsee" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e281ae70cc3b98dac15fced3366a880949e65fc66e345ce857a5682d152f3e62" +dependencies = [ + "jsonrpsee-core 0.24.10", + "jsonrpsee-proc-macros 0.24.10", + "jsonrpsee-server 0.24.10", + "jsonrpsee-types 0.24.10", + "jsonrpsee-ws-client 0.24.10", + "tokio", + "tracing", +] + [[package]] name = "jsonrpsee" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3f48dc3e6b8bd21e15436c1ddd0bc22a6a54e8ec46fedd6adf3425f396ec6a" dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", + "jsonrpsee-client-transport 0.26.0", + "jsonrpsee-core 0.26.0", "jsonrpsee-http-client", - "jsonrpsee-proc-macros", - "jsonrpsee-server", - "jsonrpsee-types", + "jsonrpsee-proc-macros 0.26.0", + "jsonrpsee-server 0.26.0", + "jsonrpsee-types 0.26.0", "jsonrpsee-wasm-client", - "jsonrpsee-ws-client", + "jsonrpsee-ws-client 0.26.0", "tokio", "tracing", ] +[[package]] +name = "jsonrpsee-client-transport" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc4280b709ac3bb5e16cf3bad5056a0ec8df55fa89edfe996361219aadc2c7ea" +dependencies = [ + "base64 0.22.1", + "futures-util", + "http", + "jsonrpsee-core 0.24.10", + "pin-project", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", + "url", +] + [[package]] name = "jsonrpsee-client-transport" version = "0.26.0" @@ -4191,7 +4308,7 @@ dependencies = [ "futures-util", "gloo-net", "http", - "jsonrpsee-core", + "jsonrpsee-core 0.26.0", "pin-project", "rustls", "rustls-pki-types", @@ -4205,6 +4322,32 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-core" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348ee569eaed52926b5e740aae20863762b16596476e943c9e415a6479021622" +dependencies = [ + "async-trait", + "bytes", + "futures-timer", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types 0.24.10", + "parking_lot", + "pin-project", + "rand 0.8.5", + "rustc-hash", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "jsonrpsee-core" version = "0.26.0" @@ -4218,7 +4361,7 @@ dependencies = [ "http", "http-body", "http-body-util", - "jsonrpsee-types", + "jsonrpsee-types 0.26.0", "parking_lot", "pin-project", "rand 0.9.2", @@ -4244,8 +4387,8 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "rustls", "rustls-platform-verifier", "serde", @@ -4256,6 +4399,19 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7398cddf5013cca4702862a2692b66c48a3bd6cf6ec681a47453c93d63cf8de5" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "jsonrpsee-proc-macros" version = "0.26.0" @@ -4269,6 +4425,33 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "jsonrpsee-server" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21429bcdda37dcf2d43b68621b994adede0e28061f816b038b0f18c70c143d51" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core 0.24.10", + "jsonrpsee-types 0.24.10", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.4.13", + "tracing", +] + [[package]] name = "jsonrpsee-server" version = "0.26.0" @@ -4281,8 +4464,8 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "pin-project", "route-recognizer", "serde", @@ -4296,6 +4479,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-types" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f05e0028e55b15dbd2107163b3c744cd3bb4474f193f95d9708acbf5677e44" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "jsonrpsee-types" version = "0.26.0" @@ -4314,12 +4509,25 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7902885de4779f711a95d82c8da2d7e5f9f3a7c7cfa44d51c067fd1c29d72a3c" dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-client-transport 0.26.0", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "tower 0.5.2", ] +[[package]] +name = "jsonrpsee-ws-client" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78fc744f17e7926d57f478cf9ca6e1ee5d8332bf0514860b1a3cdf1742e614cc" +dependencies = [ + "http", + "jsonrpsee-client-transport 0.24.10", + "jsonrpsee-core 0.24.10", + "jsonrpsee-types 0.24.10", + "url", +] + [[package]] name = "jsonrpsee-ws-client" version = "0.26.0" @@ -4327,9 +4535,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6fceceeb05301cc4c065ab3bd2fa990d41ff4eb44e4ca1b30fa99c057c3e79" dependencies = [ "http", - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-client-transport 0.26.0", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "tower 0.5.2", "url", ] @@ -4677,6 +4885,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.6" @@ -5258,7 +5472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8eb878fc5ea95adb5abe55fb97475b3eb0dcc77dfcd6f61bd626a68ae0bdba1" dependencies = [ "alloy-primitives", - "jsonrpsee", + "jsonrpsee 0.26.0", ] [[package]] @@ -6225,7 +6439,7 @@ dependencies = [ "tokio-rustls", "tokio-util", "tower 0.5.2", - "tower-http", + "tower-http 0.6.6", "tower-service", "url", "wasm-bindgen", @@ -7123,7 +7337,7 @@ dependencies = [ "alloy-rpc-types-debug", "eyre", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "pretty_assertions", "reth-engine-primitives", "reth-evm", @@ -7148,7 +7362,7 @@ dependencies = [ "futures", "futures-util", "interprocess", - "jsonrpsee", + "jsonrpsee 0.26.0", "pin-project", "serde_json", "thiserror 2.0.17", @@ -7405,7 +7619,7 @@ dependencies = [ "eyre", "fdlimit", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "rayon", "reth-basic-payload-builder", "reth-chain-state", @@ -7565,7 +7779,7 @@ source = "git+https://github.com/paradigmxyz/reth?tag=v1.8.2#9c30bf7af5e0d45deaf dependencies = [ "eyre", "http", - "jsonrpsee-server", + "jsonrpsee-server 0.26.0", "metrics", "metrics-exporter-prometheus", "metrics-process", @@ -7838,9 +8052,9 @@ dependencies = [ "derive_more", "eyre", "futures", - "jsonrpsee", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee 0.26.0", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "metrics", "op-alloy-consensus", "op-alloy-network", @@ -8160,8 +8374,8 @@ dependencies = [ "http-body", "hyper", "itertools 0.14.0", - "jsonrpsee", - "jsonrpsee-types", + "jsonrpsee 0.26.0", + "jsonrpsee-types 0.26.0", "jsonwebtoken", "parking_lot", "pin-project", @@ -8225,7 +8439,7 @@ dependencies = [ "alloy-rpc-types-trace", "alloy-rpc-types-txpool", "alloy-serde", - "jsonrpsee", + "jsonrpsee 0.26.0", "reth-chain-state", "reth-engine-primitives", "reth-network-peers", @@ -8242,7 +8456,7 @@ dependencies = [ "alloy-provider", "dyn-clone", "http", - "jsonrpsee", + "jsonrpsee 0.26.0", "metrics", "pin-project", "reth-chain-state", @@ -8268,7 +8482,7 @@ dependencies = [ "tokio", "tokio-util", "tower 0.5.2", - "tower-http", + "tower-http 0.6.6", "tracing", ] @@ -8285,7 +8499,7 @@ dependencies = [ "alloy-signer", "auto_impl", "dyn-clone", - "jsonrpsee-types", + "jsonrpsee-types 0.26.0", "op-alloy-consensus", "op-alloy-network", "op-alloy-rpc-types", @@ -8308,8 +8522,8 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "async-trait", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "metrics", "parking_lot", "reth-chainspec", @@ -8349,8 +8563,8 @@ dependencies = [ "auto_impl", "dyn-clone", "futures", - "jsonrpsee", - "jsonrpsee-types", + "jsonrpsee 0.26.0", + "jsonrpsee-types 0.26.0", "parking_lot", "reth-chain-state", "reth-chainspec", @@ -8390,8 +8604,8 @@ dependencies = [ "derive_more", "futures", "itertools 0.14.0", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "metrics", "rand 0.9.2", "reqwest", @@ -8430,7 +8644,7 @@ dependencies = [ "jsonrpsee-http-client", "pin-project", "tower 0.5.2", - "tower-http", + "tower-http 0.6.6", "tracing", ] @@ -8442,8 +8656,8 @@ dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", - "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-core 0.26.0", + "jsonrpsee-types 0.26.0", "reth-errors", "reth-network-api", "serde", @@ -9598,6 +9812,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -10347,7 +10572,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", "h2", @@ -10409,6 +10634,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.10.0", + "bytes", + "http", + "http-body", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.6.6" diff --git a/Cargo.toml b/Cargo.toml index c38a486e..46aa5a84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/core/", "crates/engine_provider", "crates/report", + "crates/server", "crates/sqlite_db/", "crates/testfile/", ] @@ -21,12 +22,14 @@ homepage = "https://github.com/flashbots/contender" repository = "https://github.com/flashbots/contender" [workspace.dependencies] +contender_cli = { path = "crates/cli" } contender_core = { path = "crates/core/" } contender_sqlite = { path = "crates/sqlite_db/" } contender_testfile = { path = "crates/testfile/" } contender_bundle_provider = { path = "crates/bundle_provider/" } contender_engine_provider = { path = "crates/engine_provider/" } contender_report = { path = "crates/report/" } +contender_server = { path = "crates/server/" } tokio = { version = "1.40.0" } tokio-tungstenite = { version = "0.26", features = ["native-tls"] } @@ -49,6 +52,11 @@ csv = "1.3.0" miette = { version = "7.6.0" } url = "2.5.7" uuid = "1.19.0" +base64 = "0.22" + +## server +axum = "0.8" +tokio-stream = "0.1" ## core futures = "0.3.30" @@ -57,6 +65,7 @@ jsonrpsee = { version = "0.24" } alloy-serde = "0.5.4" serde_json = "1.0.132" tower = "0.5.2" +tower-http = { version = "0.6", features = ["cors"] } alloy-rpc-types-engine = { version = "1.0.22", default-features = false } alloy-json-rpc = { version = "1.0.22", default-features = false } alloy-chains = { version = "0.2.5", default-features = false } diff --git a/crates/cli/src/commands/campaign.rs b/crates/cli/src/commands/campaign.rs index 153d3471..ba159006 100644 --- a/crates/cli/src/commands/campaign.rs +++ b/crates/cli/src/commands/campaign.rs @@ -5,14 +5,16 @@ use crate::commands::{ common::{ScenarioSendTxsCliArgs, SendTxsCliArgsInner}, SpamCliArgs, }; +use crate::default_scenarios::fill_block::SpamRate; +use crate::default_scenarios::{BuiltinOptions, BuiltinScenarioCli}; use crate::error::CliError; use crate::util::load_testconfig; use crate::util::{load_seedfile, parse_duration}; -use crate::BuiltinScenarioCli; use alloy::primitives::{keccak256, U256}; use clap::Args; use contender_core::db::DbOps; use contender_core::error::RuntimeParamErrorKind; +use contender_core::generator::RandSeed; use contender_testfile::{CampaignConfig, CampaignMode, ResolvedMixEntry, ResolvedStage}; use std::path::Path; use std::time::Duration; @@ -445,10 +447,21 @@ async fn prepare_scenario( skip_setup, ); + let rand_seed = RandSeed::seed_from_str(&scenario_seed); let spam_scenario = if let Some(builtin_cli) = parse_builtin_reference(&mix.scenario) { let provider = args.eth_json_rpc_args.new_rpc_provider()?; let builtin = builtin_cli - .to_builtin_scenario(&provider, &spam_cli_args, ctx.data_dir) + .to_builtin_scenario( + &provider, + BuiltinOptions { + accounts_per_agent: ctx.args.eth_json_rpc_args.accounts_per_agent, + seed: rand_seed, + spam_rate: Some(match ctx.campaign.spam.mode { + CampaignMode::Tps => SpamRate::TxsPerSecond(mix.rate), + CampaignMode::Tpb => SpamRate::TxsPerBlock(mix.rate), + }), + }, + ) .await?; SpamScenario::Builtin(builtin) } else { diff --git a/crates/cli/src/commands/common.rs b/crates/cli/src/commands/common.rs index 1e3e87c2..2eb3b7d9 100644 --- a/crates/cli/src/commands/common.rs +++ b/crates/cli/src/commands/common.rs @@ -3,6 +3,7 @@ use super::EngineArgs; use crate::commands::error::ArgsError; use crate::commands::SpamScenario; +use crate::default_scenarios::fill_block::SpamRate; use crate::error::CliError; use crate::util::get_signers_with_defaults; use alloy::consensus::TxType; @@ -336,6 +337,17 @@ Requires --priv-key to be set for each 'from' address in the given testfile.", pub run_forever: bool, } +impl SendSpamCliArgs { + pub fn spam_rate(&self) -> Result { + match (self.txs_per_second, self.txs_per_block) { + (Some(_), Some(_)) => Err(ArgsError::SpamRateNotFound), + (None, None) => Err(ArgsError::SpamRateNotFound), + (Some(tps), None) => Ok(SpamRate::TxsPerSecond(tps)), + (None, Some(tpb)) => Ok(SpamRate::TxsPerBlock(tpb)), + } + } +} + #[derive(Copy, Debug, Clone, clap::ValueEnum)] pub enum TxTypeCli { /// Legacy transaction (type `0x0`) diff --git a/crates/cli/src/commands/spam.rs b/crates/cli/src/commands/spam.rs index faa88770..d9f4b0f7 100644 --- a/crates/cli/src/commands/spam.rs +++ b/crates/cli/src/commands/spam.rs @@ -5,7 +5,7 @@ use crate::{ error::ArgsError, Result, }, - default_scenarios::BuiltinScenario, + default_scenarios::{BuiltinOptions, BuiltinScenario}, error::CliError, util::{ bold, check_private_keys, fund_accounts, load_seedfile, load_testconfig, parse_duration, @@ -151,6 +151,24 @@ pub struct SpamCliArgs { )] pub flashblocks_ws_url: Option, } + +impl SpamCliArgs { + pub fn builtin_options(&self, data_dir: &PathBuf) -> Result { + let seed = self + .eth_json_rpc_args + .rpc_args + .seed + .clone() + .unwrap_or(load_seedfile(data_dir)?); + let seed = RandSeed::seed_from_str(&seed); + Ok(BuiltinOptions { + accounts_per_agent: self.eth_json_rpc_args.rpc_args.accounts_per_agent, + seed, + spam_rate: Some(self.spam_args.spam_rate()?), + }) + } +} + #[derive(Clone)] pub enum SpamScenario { Testfile(String), diff --git a/crates/cli/src/default_scenarios/blobs.rs b/crates/cli/src/default_scenarios/blobs.rs index e42b164e..bb7a88d9 100644 --- a/crates/cli/src/default_scenarios/blobs.rs +++ b/crates/cli/src/default_scenarios/blobs.rs @@ -1,10 +1,11 @@ use clap::Parser; use contender_core::generator::{types::SpamRequest, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use crate::default_scenarios::builtin::ToTestConfig; -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Debug, Deserialize, Serialize)] /// Send blob transactions. Note: the tx type will always be overridden to eip4844. pub struct BlobsCliArgs { #[arg( diff --git a/crates/cli/src/default_scenarios/builtin.rs b/crates/cli/src/default_scenarios/builtin.rs index aec91422..dff571e4 100644 --- a/crates/cli/src/default_scenarios/builtin.rs +++ b/crates/cli/src/default_scenarios/builtin.rs @@ -1,13 +1,11 @@ -use std::path::Path; - use super::fill_block::{fill_block, FillBlockArgs, FillBlockCliArgs}; use crate::{ - commands::SpamCliArgs, default_scenarios::{ blobs::BlobsCliArgs, custom_contract::{CustomContractArgs, CustomContractCliArgs}, erc20::{Erc20Args, Erc20CliArgs}, eth_functions::{opcodes::EthereumOpcode, EthFunctionsArgs, EthFunctionsCliArgs}, + fill_block::SpamRate, revert::RevertCliArgs, setcode::{SetCodeArgs, SetCodeCliArgs, SetCodeSubCommand}, storage::{StorageStressArgs, StorageStressCliArgs}, @@ -16,7 +14,7 @@ use crate::{ uni_v2::{UniV2Args, UniV2CliArgs}, }, error::CliError, - util::{bold, load_seedfile}, + util::bold, }; use alloy::primitives::U256; use clap::Subcommand; @@ -26,10 +24,12 @@ use contender_core::{ generator::{constants::setcode_placeholder, types::AnyProvider, RandSeed}, }; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use strum::IntoEnumIterator; use tracing::warn; -#[derive(Clone, Debug, Subcommand)] +#[derive(Clone, Debug, Subcommand, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] pub enum BuiltinScenarioCli { /// Send EIP-4844 blob transactions. Blobs(BlobsCliArgs), @@ -76,12 +76,18 @@ pub trait ToTestConfig { fn to_testconfig(&self) -> TestConfig; } +#[derive(Default)] +pub struct BuiltinOptions { + pub accounts_per_agent: Option, + pub seed: RandSeed, + pub spam_rate: Option, +} + impl BuiltinScenarioCli { pub async fn to_builtin_scenario( &self, provider: &AnyProvider, - spam_args: &SpamCliArgs, - data_dir: &Path, + options: BuiltinOptions, ) -> Result { match self.to_owned() { BuiltinScenarioCli::Blobs(args) => Ok(BuiltinScenario::Blobs(args)), @@ -91,21 +97,11 @@ impl BuiltinScenarioCli { )), BuiltinScenarioCli::Erc20(args) => { - let seed = spam_args - .eth_json_rpc_args - .rpc_args - .seed - .to_owned() - .unwrap_or(load_seedfile(data_dir)?); - let seed = RandSeed::seed_from_str(&seed); let mut agents = AgentStore::new(); agents.init( &["spammers"], - spam_args - .eth_json_rpc_args - .rpc_args - .accounts_per_agent_or(10) as usize, - &seed, + options.accounts_per_agent.unwrap_or(10) as usize, + &options.seed, ); let spammers = agents .get_agent("spammers") @@ -118,7 +114,7 @@ impl BuiltinScenarioCli { } BuiltinScenarioCli::FillBlock(args) => { - fill_block(provider, &spam_args.spam_args, &args).await + fill_block(provider, options.spam_rate.unwrap_or_default(), &args).await } BuiltinScenarioCli::EthFunctions(args) => { diff --git a/crates/cli/src/default_scenarios/custom_contract.rs b/crates/cli/src/default_scenarios/custom_contract.rs index d947919b..734823bd 100644 --- a/crates/cli/src/default_scenarios/custom_contract.rs +++ b/crates/cli/src/default_scenarios/custom_contract.rs @@ -6,13 +6,14 @@ use contender_core::generator::types::SpamRequest; use contender_core::generator::util::encode_calldata; use contender_core::generator::{CompiledContract, CreateDefinition, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use std::process::Command; use thiserror::Error; use tracing::debug; const ARTIFACTS_PATH: &str = "/tmp/contender-contracts"; -#[derive(Clone, Debug, clap::Parser)] +#[derive(Clone, Debug, clap::Parser, Deserialize, Serialize)] pub struct CustomContractCliArgs { /// Path to smart contract source. Format: : contract_path: std::path::PathBuf, diff --git a/crates/cli/src/default_scenarios/erc20.rs b/crates/cli/src/default_scenarios/erc20.rs index 2493c15b..2f1f691d 100644 --- a/crates/cli/src/default_scenarios/erc20.rs +++ b/crates/cli/src/default_scenarios/erc20.rs @@ -1,8 +1,10 @@ use alloy::primitives::{Address, U256}; use contender_core::generator::{ - types::SpamRequest, util::parse_value, CreateDefinition, FunctionCallDefinition, FuzzParam, + types::SpamRequest, util::parse_value, util::deserialize_value, CreateDefinition, + FunctionCallDefinition, FuzzParam, }; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use std::str::FromStr; use crate::default_scenarios::{builtin::ToTestConfig, contracts::test_token}; @@ -10,7 +12,7 @@ use crate::default_scenarios::{builtin::ToTestConfig, contracts::test_token}; pub static DEFAULT_TOKENS_SENT: &str = "0.00001 ether"; pub static DEFAULT_TOKENS_FUNDED: &str = "1000000 ether"; -#[derive(Clone, Debug, clap::Parser)] +#[derive(Clone, Debug, clap::Parser, Deserialize, Serialize)] pub struct Erc20CliArgs { #[arg( short, @@ -19,6 +21,7 @@ pub struct Erc20CliArgs { default_value = DEFAULT_TOKENS_SENT, value_parser = parse_value, )] + #[serde(deserialize_with = "deserialize_value")] pub send_amount: U256, #[arg( @@ -28,6 +31,7 @@ pub struct Erc20CliArgs { default_value = DEFAULT_TOKENS_FUNDED, value_parser = parse_value, )] + #[serde(deserialize_with = "deserialize_value")] pub fund_amount: U256, #[arg( diff --git a/crates/cli/src/default_scenarios/eth_functions/command.rs b/crates/cli/src/default_scenarios/eth_functions/command.rs index 5cac9660..38c3381e 100644 --- a/crates/cli/src/default_scenarios/eth_functions/command.rs +++ b/crates/cli/src/default_scenarios/eth_functions/command.rs @@ -9,8 +9,9 @@ use crate::default_scenarios::{ use clap::Parser; use contender_core::generator::CreateDefinition; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Debug, Deserialize, Serialize)] pub struct EthFunctionsCliArgs { #[arg( short, diff --git a/crates/cli/src/default_scenarios/eth_functions/opcodes.rs b/crates/cli/src/default_scenarios/eth_functions/opcodes.rs index 7dab15ce..99c0cdef 100644 --- a/crates/cli/src/default_scenarios/eth_functions/opcodes.rs +++ b/crates/cli/src/default_scenarios/eth_functions/opcodes.rs @@ -1,9 +1,12 @@ use crate::default_scenarios::contracts::SPAM_ME; use clap::ValueEnum; use contender_core::generator::{types::SpamRequest, FunctionCallDefinition}; +use serde::{Deserialize, Serialize}; use strum::EnumIter; -#[derive(ValueEnum, Clone, Debug, strum::Display, EnumIter, PartialEq, Eq)] +#[derive( + ValueEnum, Clone, Debug, strum::Display, EnumIter, PartialEq, Eq, Deserialize, Serialize, +)] pub enum EthereumOpcode { Stop, Add, diff --git a/crates/cli/src/default_scenarios/eth_functions/precompiles.rs b/crates/cli/src/default_scenarios/eth_functions/precompiles.rs index e5662378..a06cfae1 100644 --- a/crates/cli/src/default_scenarios/eth_functions/precompiles.rs +++ b/crates/cli/src/default_scenarios/eth_functions/precompiles.rs @@ -1,9 +1,12 @@ use crate::default_scenarios::contracts::SPAM_ME; use clap::ValueEnum; use contender_core::generator::{types::SpamRequest, FunctionCallDefinition}; +use serde::{Deserialize, Serialize}; use strum::EnumIter; -#[derive(ValueEnum, Clone, Debug, strum::Display, EnumIter, PartialEq, Eq)] +#[derive( + ValueEnum, Clone, Debug, strum::Display, EnumIter, PartialEq, Eq, Deserialize, Serialize, +)] // TODO: add missing precompiles to SpamMe contract & here. pub enum EthereumPrecompile { #[clap(aliases = ["sha256"])] diff --git a/crates/cli/src/default_scenarios/fill_block.rs b/crates/cli/src/default_scenarios/fill_block.rs index 0713d0f4..309d9a3c 100644 --- a/crates/cli/src/default_scenarios/fill_block.rs +++ b/crates/cli/src/default_scenarios/fill_block.rs @@ -1,5 +1,4 @@ use crate::{ - commands::common::SendSpamCliArgs, default_scenarios::{builtin::ToTestConfig, contracts, BuiltinScenario}, error::CliError, }; @@ -10,9 +9,10 @@ use contender_core::generator::{ CreateDefinition, FunctionCallDefinition, }; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use tracing::{info, warn}; -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Debug, Deserialize, Serialize)] /// Taken from the CLI, this is used to fill a block with transactions. pub struct FillBlockCliArgs { #[arg(short = 'g', long, long_help = "Override gas used per block. By default, the block limit is used.", visible_aliases = ["gas"])] @@ -26,17 +26,31 @@ pub struct FillBlockArgs { pub num_txs: u64, } +pub enum SpamRate { + TxsPerBlock(u64), + TxsPerSecond(u64), +} + +impl Default for SpamRate { + fn default() -> Self { + SpamRate::TxsPerSecond(10) + } +} + +impl SpamRate { + /// Get the number of transactions to send based on the spam rate. + pub fn num_txs(&self) -> u64 { + match self { + SpamRate::TxsPerBlock(n) | SpamRate::TxsPerSecond(n) => *n, + } + } +} + pub async fn fill_block( provider: &AnyProvider, - spam_args: &SendSpamCliArgs, + spam_rate: SpamRate, args: &FillBlockCliArgs, ) -> Result { - let SendSpamCliArgs { - txs_per_block, - txs_per_second, - .. - } = spam_args.to_owned(); - // determine gas limit let gas_limit = if let Some(max_gas) = args.max_gas_per_block { max_gas @@ -51,20 +65,21 @@ pub async fn fill_block( block_gas_limit.unwrap_or(30_000_000) }; - let num_txs = match (txs_per_block, txs_per_second) { - (Some(0), _) | (_, Some(0)) => { - return Err(CliError::Args( - crate::commands::error::ArgsError::SpamRateNotFound, - )); - } - (Some(n), _) => n, - (_, Some(n)) => n, - (None, None) => { - return Err(CliError::Args( - crate::commands::error::ArgsError::SpamRateNotFound, - )); - } - }; + // let num_txs = match (txs_per_block, txs_per_second) { + // (Some(0), _) | (_, Some(0)) => { + // return Err(CliError::Args( + // crate::commands::error::ArgsError::SpamRateNotFound, + // )); + // } + // (Some(n), _) => n, + // (_, Some(n)) => n, + // (None, None) => { + // return Err(CliError::Args( + // crate::commands::error::ArgsError::SpamRateNotFound, + // )); + // } + // }; + let num_txs = spam_rate.num_txs(); let gas_per_tx = gas_limit / num_txs; info!("Attempting to fill blocks with {gas_limit} gas; sending {num_txs} txs, each with gas limit {gas_per_tx}."); diff --git a/crates/cli/src/default_scenarios/mod.rs b/crates/cli/src/default_scenarios/mod.rs index 639d5f37..5cba4eab 100644 --- a/crates/cli/src/default_scenarios/mod.rs +++ b/crates/cli/src/default_scenarios/mod.rs @@ -12,4 +12,4 @@ pub mod stress; pub mod transfers; pub mod uni_v2; -pub use builtin::{BuiltinScenario, BuiltinScenarioCli}; +pub use builtin::{BuiltinOptions, BuiltinScenario, BuiltinScenarioCli, ToTestConfig}; diff --git a/crates/cli/src/default_scenarios/revert.rs b/crates/cli/src/default_scenarios/revert.rs index 13e1e00f..09dcafac 100644 --- a/crates/cli/src/default_scenarios/revert.rs +++ b/crates/cli/src/default_scenarios/revert.rs @@ -1,9 +1,10 @@ use contender_core::generator::{types::SpamRequest, CreateDefinition, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use crate::default_scenarios::{builtin::ToTestConfig, contracts::SPAM_ME_6}; -#[derive(Clone, Debug, clap::Parser)] +#[derive(Clone, Debug, clap::Parser, Deserialize, Serialize)] pub struct RevertCliArgs { /// Amount of gas to use before reverting. #[arg( diff --git a/crates/cli/src/default_scenarios/setcode/base.rs b/crates/cli/src/default_scenarios/setcode/base.rs index e04f22b7..85839899 100644 --- a/crates/cli/src/default_scenarios/setcode/base.rs +++ b/crates/cli/src/default_scenarios/setcode/base.rs @@ -12,9 +12,10 @@ use crate::{ use clap::Parser; use contender_core::generator::{types::SpamRequest, CreateDefinition, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use tracing::warn; -#[derive(Clone, Debug, Parser)] +#[derive(Clone, Debug, Parser, Deserialize, Serialize)] pub struct SetCodeCliArgs { #[command(subcommand)] pub command: Option, diff --git a/crates/cli/src/default_scenarios/setcode/execute.rs b/crates/cli/src/default_scenarios/setcode/execute.rs index 8c20d1e2..a156dbea 100644 --- a/crates/cli/src/default_scenarios/setcode/execute.rs +++ b/crates/cli/src/default_scenarios/setcode/execute.rs @@ -5,11 +5,12 @@ use contender_core::generator::{ error::GeneratorError, util::{encode_calldata, parse_value}, }; +use serde::{Deserialize, Serialize}; pub const DEFAULT_SIG: &str = "execute((address,uint256,bytes)[])"; pub const DEFAULT_ARGS: &str = "[(0x{Counter},0,0xd09de08a)]"; -#[derive(Clone, Debug, Parser)] +#[derive(Clone, Debug, Parser, Deserialize, Serialize)] pub struct SetCodeExecuteCliArgs { /// The address to call via the smart-wallet's execute function. #[arg( @@ -54,7 +55,7 @@ Example: pub value: Option, } -#[derive(clap::Subcommand, Clone, Debug)] +#[derive(clap::Subcommand, Clone, Debug, Deserialize, Serialize)] pub enum SetCodeSubCommand { /// Helper function to delegate function calls via `execute(Call[])` on a smart-wallet contract. Execute(SetCodeExecuteCliArgs), diff --git a/crates/cli/src/default_scenarios/storage.rs b/crates/cli/src/default_scenarios/storage.rs index 3d3f5f69..d186fe56 100644 --- a/crates/cli/src/default_scenarios/storage.rs +++ b/crates/cli/src/default_scenarios/storage.rs @@ -1,8 +1,9 @@ use crate::default_scenarios::{builtin::ToTestConfig, contracts}; use contender_core::generator::{types::SpamRequest, CreateDefinition, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, clap::Parser)] +#[derive(Debug, Clone, clap::Parser, Deserialize, Serialize)] pub struct StorageStressCliArgs { #[arg( short = 's', diff --git a/crates/cli/src/default_scenarios/stress.rs b/crates/cli/src/default_scenarios/stress.rs index 726e02af..475dc755 100644 --- a/crates/cli/src/default_scenarios/stress.rs +++ b/crates/cli/src/default_scenarios/stress.rs @@ -1,6 +1,7 @@ use clap::Parser; use contender_core::generator::{types::SpamRequest, CreateDefinition, FunctionCallDefinition}; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use strum::IntoEnumIterator; use crate::default_scenarios::{ @@ -14,7 +15,7 @@ use crate::default_scenarios::{ transfers::{TransferStressArgs, TransferStressCliArgs}, }; -#[derive(Debug, Clone, Parser)] +#[derive(Debug, Clone, Parser, Deserialize, Serialize)] pub struct StressCliArgs { #[arg( long, diff --git a/crates/cli/src/default_scenarios/transfers.rs b/crates/cli/src/default_scenarios/transfers.rs index 8dd5b3e9..dd6eacaa 100644 --- a/crates/cli/src/default_scenarios/transfers.rs +++ b/crates/cli/src/default_scenarios/transfers.rs @@ -1,9 +1,14 @@ use crate::default_scenarios::builtin::ToTestConfig; use alloy::primitives::{Address, U256}; use clap::Parser; -use contender_core::generator::{types::SpamRequest, util::parse_value, FunctionCallDefinition}; +use contender_core::generator::{ + types::SpamRequest, + util::{deserialize_value, parse_value}, + FunctionCallDefinition, +}; +use serde::{Deserialize, Serialize}; -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Debug, Deserialize, Serialize)] pub struct TransferStressCliArgs { #[arg( short = 'a', @@ -13,13 +18,14 @@ pub struct TransferStressCliArgs { value_parser = parse_value, help = "Amount of tokens to transfer in each transaction." )] + #[serde(deserialize_with = "deserialize_value")] pub amount: U256, + #[arg( short, long = "transfer.recipient", visible_aliases = ["tr", "recipient"], help = "Address to receive ether sent from spammers.", - value_parser = |s: &str| s.parse::
().map_err(|_| "Invalid address format".to_string()) )] pub recipient: Option
, } diff --git a/crates/cli/src/default_scenarios/uni_v2.rs b/crates/cli/src/default_scenarios/uni_v2.rs index 2dc3c1ef..3332c17d 100644 --- a/crates/cli/src/default_scenarios/uni_v2.rs +++ b/crates/cli/src/default_scenarios/uni_v2.rs @@ -3,14 +3,15 @@ use std::str::FromStr; use crate::default_scenarios::{builtin::ToTestConfig, contracts::test_token}; use alloy::primitives::U256; use clap::Parser; -use contender_core::generator::util::parse_value; +use contender_core::generator::util::{deserialize_value, deserialize_value_opt, parse_value}; use contender_core::generator::{ types::SpamRequest, CompiledContract, CreateDefinition, FunctionCallDefinition, }; use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; use thiserror::Error; -#[derive(Debug, Clone, Parser)] +#[derive(Debug, Clone, Parser, Deserialize, Serialize)] pub struct UniV2CliArgs { #[arg( short, @@ -31,6 +32,7 @@ pub struct UniV2CliArgs { value_parser = parse_value, visible_aliases = ["weth"] )] + #[serde(deserialize_with = "deserialize_value")] pub weth_per_token: U256, #[arg( @@ -42,6 +44,7 @@ pub struct UniV2CliArgs { visible_aliases = ["mint"], value_name = "TOKEN_AMOUNT" )] + #[serde(deserialize_with = "deserialize_value")] pub initial_token_supply: U256, #[arg( @@ -51,6 +54,7 @@ pub struct UniV2CliArgs { value_name = "WETH_AMOUNT", visible_aliases = ["trade-weth"] )] + #[serde(deserialize_with = "deserialize_value_opt")] pub weth_trade_amount: Option, #[arg( @@ -60,6 +64,7 @@ pub struct UniV2CliArgs { value_name = "TOKEN_AMOUNT", visible_aliases = ["trade-token"] )] + #[serde(deserialize_with = "deserialize_value_opt")] pub token_trade_amount: Option, } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs new file mode 100644 index 00000000..1ef4b6ec --- /dev/null +++ b/crates/cli/src/lib.rs @@ -0,0 +1,11 @@ +pub mod commands; +pub mod default_scenarios; +pub mod error; +pub mod util; + +pub use error::CliError as Error; + +// prometheus +use tokio::sync::OnceCell; +pub static PROM: OnceCell = OnceCell::const_new(); +pub static LATENCY_HIST: OnceCell = OnceCell::const_new(); diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index edaa7923..6e6e7f6c 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -1,43 +1,36 @@ -mod commands; -mod default_scenarios; -mod error; -mod util; - -use crate::commands::{error::ArgsError, ReportFormat, SpamCampaignContext}; use alloy::{ network::AnyNetwork, providers::{DynProvider, ProviderBuilder}, rpc::client::ClientBuilder, }; -use commands::{ - admin::handle_admin_command, - common::ScenarioSendTxsCliArgs, - db::{drop_db, export_db, import_db, reset_db}, - replay::ReplayArgs, - ContenderCli, ContenderSubcommand, DbCommand, SetupCommandArgs, SpamCliArgs, SpamCommandArgs, - SpamScenario, +use contender_cli::commands; +use contender_cli::{ + commands::{ + admin::handle_admin_command, + common::ScenarioSendTxsCliArgs, + db::{drop_db, export_db, import_db, reset_db}, + error::ArgsError, + replay::ReplayArgs, + ContenderCli, ContenderSubcommand, DbCommand, ReportFormat, SetupCommandArgs, + SpamCampaignContext, SpamCliArgs, SpamCommandArgs, SpamScenario, + }, + default_scenarios::{fill_block::FillBlockCliArgs, BuiltinScenarioCli}, + util::{db_file_in, init_reports_dir, resolve_data_dir}, + Error, }; use contender_core::{db::DbOps, util::TracingOptions}; use contender_sqlite::{SqliteDb, DB_VERSION}; -use default_scenarios::{fill_block::FillBlockCliArgs, BuiltinScenarioCli}; -use error::CliError; use regex::Regex; use std::str::FromStr; -use tokio::sync::OnceCell; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; -use util::{db_file_in, init_reports_dir, resolve_data_dir}; - -// prometheus -static PROM: OnceCell = OnceCell::const_new(); -static LATENCY_HIST: OnceCell = OnceCell::const_new(); #[tokio::main(flavor = "multi_thread")] async fn main() -> miette::Result<()> { run().await.map_err(|e| e.into()) } -async fn run() -> Result<(), CliError> { +async fn run() -> Result<(), contender_cli::Error> { init_tracing(); let args = ContenderCli::parse_args(); @@ -103,7 +96,7 @@ async fn run() -> Result<(), CliError> { } else if let Some(config) = builtin_scenario_config { SpamScenario::Builtin( config - .to_builtin_scenario(&provider, &args, &data_dir) + .to_builtin_scenario(&provider, args.builtin_options(&data_dir)?) .await?, ) } else { @@ -112,7 +105,7 @@ async fn run() -> Result<(), CliError> { BuiltinScenarioCli::FillBlock(FillBlockCliArgs { max_gas_per_block: None, }) - .to_builtin_scenario(&provider, &args, &data_dir) + .to_builtin_scenario(&provider, args.builtin_options(&data_dir)?) .await?, ) }; @@ -135,13 +128,11 @@ async fn run() -> Result<(), CliError> { } => { if let Some(campaign_id) = campaign_id { let resolved_campaign_id = if campaign_id == "__LATEST_CAMPAIGN__" { - db.latest_campaign_id() - .map_err(CliError::Db)? - .ok_or_else(|| { - CliError::Report(contender_report::Error::CampaignNotFound( - "latest".to_string(), - )) - })? + db.latest_campaign_id().map_err(Error::Db)?.ok_or_else(|| { + Error::Report(contender_report::Error::CampaignNotFound( + "latest".to_string(), + )) + })? } else { campaign_id }; @@ -155,7 +146,7 @@ async fn run() -> Result<(), CliError> { skip_tx_traces, ) .await - .map_err(CliError::Report)?; + .map_err(Error::Report)?; } else { let use_json = matches!(format, ReportFormat::Json); contender_report::command::report( @@ -167,7 +158,7 @@ async fn run() -> Result<(), CliError> { skip_tx_traces, ) .await - .map_err(CliError::Report)?; + .map_err(Error::Report)?; } } @@ -195,8 +186,8 @@ async fn run() -> Result<(), CliError> { } /// Check DB version, throw error if version is incompatible with currently-running version of contender. -fn init_db(command: &ContenderSubcommand, db: &SqliteDb) -> Result<(), CliError> { - if db.table_exists("run_txs").map_err(CliError::Db)? { +fn init_db(command: &ContenderSubcommand, db: &SqliteDb) -> Result<(), Error> { + if db.table_exists("run_txs").map_err(Error::Db)? { // check version and exit if DB version is incompatible let quit_early = db.version() != DB_VERSION && !matches!( @@ -221,31 +212,37 @@ fn init_db(command: &ContenderSubcommand, db: &SqliteDb) -> Result<(), CliError> DB_VERSION ); warn!("{recommendation}"); - return Err(CliError::DbVersion); + return Err(Error::DbVersion); } } else { info!("no DB found, creating new DB"); - db.create_tables().map_err(CliError::Db)?; + db.create_tables().map_err(Error::Db)?; } Ok(()) } -fn init_tracing() { - let filter = EnvFilter::try_from_default_env().ok(); // fallback if RUST_LOG is unset - - let mut opts = TracingOptions::default(); +/// Reads the RUST_LOG environment variable and extracts log levels. +pub fn read_rust_log() -> Vec { let rustlog = std::env::var("RUST_LOG").unwrap_or_default().to_lowercase(); // interpret log levels from words matching `=[a-zA-Z]+` let level_regex = Regex::new(r"=[a-zA-Z]+").unwrap(); - let matches: Vec = level_regex + level_regex .find_iter(&rustlog) .map(|m| m.as_str().trim_start_matches('=')) .map(|m| tracing::Level::from_str(m).unwrap_or(tracing::Level::INFO)) - .collect(); + .collect() +} + +fn init_tracing() { + let filter = EnvFilter::try_from_default_env().ok(); // fallback if RUST_LOG is unset + let mut opts = TracingOptions::default(); // if user provides any log level > info, print line num & source file in logs - if matches.iter().any(|lvl| *lvl > tracing::Level::INFO) { + if read_rust_log() + .iter() + .any(|lvl| *lvl > tracing::Level::INFO) + { opts = opts.with_line_number(true).with_target(true); } diff --git a/crates/cli/src/util/utils.rs b/crates/cli/src/util/utils.rs index 1f3cc1bf..efb6a407 100644 --- a/crates/cli/src/util/utils.rs +++ b/crates/cli/src/util/utils.rs @@ -50,7 +50,7 @@ const DEFAULT_SCENARIOS_URL: &str = /// If the testfile starts with `scenario:`, it is treated as a builtin scenario. /// Otherwise, it is treated as a file path. /// Built-in scenarios are fetched relative to the default URL: [`DEFAULT_SCENARIOS_URL`](crate::util::DEFAULT_SCENARIOS_URL). -pub async fn load_testconfig(testfile: &str) -> Result { +pub async fn load_testconfig(testfile: &str) -> Result { Ok(if testfile.starts_with("scenario:") { let remote_url = format!( "{DEFAULT_SCENARIOS_URL}/{}", diff --git a/crates/core/src/generator/create_def.rs b/crates/core/src/generator/create_def.rs index 036ef285..25abbfed 100644 --- a/crates/core/src/generator/create_def.rs +++ b/crates/core/src/generator/create_def.rs @@ -66,12 +66,16 @@ pub struct CreateDefinition { #[serde(flatten)] pub contract: CompiledContract, /// Constructor signature. Formats supported: "constructor(type1,type2,...)" or "(type1,type2,...)". + #[serde(skip_serializing_if = "Option::is_none")] pub signature: Option, /// Constructor arguments. May include placeholders. + #[serde(skip_serializing_if = "Option::is_none")] pub args: Option>, /// Address of the tx sender. + #[serde(skip_serializing_if = "Option::is_none")] pub from: Option, /// Get a `from` address from the pool of signers specified here. + #[serde(skip_serializing_if = "Option::is_none")] pub from_pool: Option, } diff --git a/crates/core/src/generator/function_def.rs b/crates/core/src/generator/function_def.rs index fdb5391d..3cc307cd 100644 --- a/crates/core/src/generator/function_def.rs +++ b/crates/core/src/generator/function_def.rs @@ -13,28 +13,38 @@ pub struct FunctionCallDefinition { /// Address of the contract to call. pub to: String, /// Address of the tx sender. + #[serde(skip_serializing_if = "Option::is_none")] pub from: Option, /// Get a `from` address from the pool of signers specified here. + #[serde(skip_serializing_if = "Option::is_none")] pub from_pool: Option, /// Name of the function to call. + #[serde(skip_serializing_if = "Option::is_none")] pub signature: Option, /// Parameters to pass to the function. + #[serde(skip_serializing_if = "Option::is_none")] pub args: Option>, /// Value in wei to send with the tx. + #[serde(skip_serializing_if = "Option::is_none")] pub value: Option, /// Parameters to fuzz during the test. + #[serde(skip_serializing_if = "Option::is_none")] pub fuzz: Option>, /// Optional type of the spam transaction for categorization. + #[serde(skip_serializing_if = "Option::is_none")] pub kind: Option, /// Optional gas limit, which will skip gas estimation. This allows reverting txs to be sent. + #[serde(skip_serializing_if = "Option::is_none")] pub gas_limit: Option, /// Optional blob data; tx type must be set to EIP4844 by spammer + #[serde(skip_serializing_if = "Option::is_none")] pub blob_data: Option, /// Optional setCode data; tx type must be set to EIP7702 by spammer + #[serde(skip_serializing_if = "Option::is_none")] pub authorization_address: Option, /// If true and `from_pool` is set, run this setup transaction for all accounts in the pool. /// Defaults to false (only runs for the first account). - #[serde(default)] + #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub for_all_accounts: bool, } diff --git a/crates/core/src/generator/trait.rs b/crates/core/src/generator/trait.rs index 99435c05..5b5c647d 100644 --- a/crates/core/src/generator/trait.rs +++ b/crates/core/src/generator/trait.rs @@ -478,7 +478,7 @@ where if let Some(handle) = handle { // Wait for sender's previous task, then run this one let prev_handle = pending_per_sender.remove(&from); - let chained = tokio::task::spawn(async move { + let chained = crate::spawn_with_session(async move { if let Some(prev) = prev_handle { // Ignore errors from previous task - they'll be reported separately let _ = prev.await; diff --git a/crates/core/src/generator/util.rs b/crates/core/src/generator/util.rs index f2d2b9ea..a8fa54a3 100644 --- a/crates/core/src/generator/util.rs +++ b/crates/core/src/generator/util.rs @@ -173,6 +173,28 @@ pub fn generate_setcode_signer(seed: &impl Seeder) -> (PrivateKeySigner, [u8; 32 ) } +/// Serde deserializer that parses a `U256` using [`parse_value`], +/// supporting both raw numbers and human-readable strings like `"0.00001 ether"`. +pub fn deserialize_value<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let s: String = serde::Deserialize::deserialize(deserializer)?; + parse_value(&s).map_err(serde::de::Error::custom) +} + +/// Like [`deserialize_value`] but for `Option`. Returns `None` if the field is absent or null. +pub fn deserialize_value_opt<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let s: Option = serde::Deserialize::deserialize(deserializer)?; + match s { + Some(s) => parse_value(&s).map(Some).map_err(serde::de::Error::custom), + None => Ok(None), + } +} + /// Parses a string like "1eth" or "20 gwei" into a U256. pub fn parse_value(input: &str) -> Result { let input = input.trim().to_lowercase(); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 833d3920..1bddc103 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,3 +19,35 @@ pub use contender_bundle_provider::bundle::BundleType; pub use orchestrator::{Contender, ContenderCtx, RunOpts}; pub use tokio::task as tokio_task; pub use tokio_util::sync::CancellationToken; + +tokio::task_local! { + /// The session ID for the current task, used by the server's log routing layer + /// to route tracing events to the correct per-session broadcast channel. + pub static CURRENT_SESSION_ID: usize; +} + +/// Spawn a future that inherits the current `CURRENT_SESSION_ID` task-local (if set) +/// and instruments it with a `session` tracing span so the fmt layer shows the session ID. +/// If already inside a `session*` span, the existing span is used via `follows_from`. +pub fn spawn_with_session(future: F) -> tokio::task::JoinHandle +where + F: std::future::Future + Send + 'static, + F::Output: Send + 'static, +{ + match CURRENT_SESSION_ID.try_with(|id| *id) { + Ok(id) => { + let current = tracing::Span::current(); + let has_session_span = current + .metadata() + .is_some_and(|m| m.name().starts_with("session")); + let future = CURRENT_SESSION_ID.scope(id, future); + if has_session_span { + tokio::task::spawn(tracing::Instrument::instrument(future, current)) + } else { + let span = tracing::info_span!("session", id = id); + tokio::task::spawn(tracing::Instrument::instrument(future, span)) + } + } + Err(_) => tokio::task::spawn(future), + } +} diff --git a/crates/core/src/orchestrator.rs b/crates/core/src/orchestrator.rs index 04af46ae..0ec17ba9 100644 --- a/crates/core/src/orchestrator.rs +++ b/crates/core/src/orchestrator.rs @@ -9,6 +9,7 @@ use crate::{ agent_pools::AgentSpec, seeder::{rand_seed::SeedGenerator, Seeder}, templater::Templater, + types::AnyProvider, PlanConfig, RandSeed, }, spammer::{tx_actor::TxActorHandle, OnBatchSent, OnTxSent, Spammer}, @@ -17,12 +18,18 @@ use crate::{ Result, }; use alloy::{ - consensus::TxType, node_bindings::WEI_IN_ETHER, primitives::U256, - signers::local::PrivateKeySigner, transports::http::reqwest::Url, + consensus::TxType, + network::AnyNetwork, + node_bindings::WEI_IN_ETHER, + primitives::U256, + providers::{DynProvider, Provider}, + signers::local::PrivateKeySigner, + transports::http::reqwest::Url, }; use contender_bundle_provider::bundle::BundleType; use contender_engine_provider::ControlChain; use std::sync::LazyLock; +use tokio_util::sync::CancellationToken; static SMOL_AMOUNT: LazyLock = LazyLock::new(|| WEI_IN_ETHER / U256::from(100)); @@ -498,11 +505,19 @@ where /// let callback = NilCallback; /// // initialize opts; slightly tweaking the defaults /// let opts = RunOpts::new().txs_per_period(50).periods(10); + /// // create a cancellation token that can be used to stop the spam run from outside the `Contender` (optional) + /// let cancel_token = tokio_util::sync::CancellationToken::new(); /// /// // run spammer - /// contender.spam(spammer, callback.into(), opts).await.unwrap(); + /// contender.spam(spammer, callback.into(), opts, Some(cancel_token.clone())).await.unwrap(); /// ``` - pub async fn spam(&mut self, spammer: SP, callback: Arc, opts: RunOpts) -> Result<()> + pub async fn spam( + &mut self, + spammer: SP, + callback: Arc, + opts: RunOpts, + cancel_token: Option, + ) -> Result<()> where F: OnTxSent + OnBatchSent + Send + Sync + 'static, SP: Spammer, @@ -523,15 +538,66 @@ where ); let run_id = scenario.db.insert_run(&run_req).map_err(|e| e.into())?; - // send spam - spammer - .spam_rpc( - &mut scenario, - opts.txs_per_period, - opts.periods, - Some(run_id), - callback, - ) - .await + // Initialize TxActor contexts so flush_loop can match receipts. + let current_block = scenario.rpc_client.get_block_number().await?; + let actor_ctx = crate::spammer::tx_actor::ActorContext::new(current_block, run_id) + .with_pending_tx_timeout(Duration::from_secs(self.ctx.pending_tx_timeout_secs)); + for handle in scenario.msg_handles.values() { + handle.init_ctx(actor_ctx.clone()).await?; + } + + // send spam; if an external cancel token was provided, select on it + // so we can abort mid-run (the cursor.next() inside spam_rpc won't + // check cancellation on its own). + let result = if let Some(external) = cancel_token { + tokio::select! { + res = spammer.spam_rpc( + &mut scenario, + opts.txs_per_period, + opts.periods, + Some(run_id), + callback, + ) => res, + _ = external.cancelled() => { + scenario.shutdown().await; + Ok(()) + } + } + } else { + spammer + .spam_rpc( + &mut scenario, + opts.txs_per_period, + opts.periods, + Some(run_id), + callback, + ) + .await + }; + + // Signal the flush loop that sending is done so it can shut down + // once all receipts are processed (or after the stale block timeout). + scenario.ctx.cancel_token.cancel(); + + // Wait for all flush loops to finish collecting receipts. + for handle in scenario.msg_handles.values() { + handle.await_flush().await; + } + + result + } + + /// Materialize a fresh `TestScenario` using the context which was used to create this `Contender` instance. + pub async fn build_scenario(&self) -> Result> { + self.ctx.build_scenario().await + } + + /// Produce a web3 provider connected to the current instance's RPC URL. + pub fn provider(&self) -> AnyProvider { + DynProvider::new( + alloy::providers::ProviderBuilder::new() + .network::() + .connect_http(self.ctx.rpc_url.clone()), + ) } } diff --git a/crates/core/src/spammer/spammer_trait.rs b/crates/core/src/spammer/spammer_trait.rs index 941c6055..c82f5d2e 100644 --- a/crates/core/src/spammer/spammer_trait.rs +++ b/crates/core/src/spammer/spammer_trait.rs @@ -71,31 +71,33 @@ where let auth_provider = scenario.auth_provider.clone(); // run loop in background to call fcu when spamming is done - let fcu_handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { - if let Some(auth_client) = &auth_provider { - loop { - let fcu_done = is_fcu_done.load(std::sync::atomic::Ordering::SeqCst); - let sending_done = - is_sending_done.load(std::sync::atomic::Ordering::SeqCst); - if fcu_done { - info!("FCU is done, stopping block production..."); - break; - } - if sending_done { - auth_client - .advance_chain(DEFAULT_BLOCK_TIME) - .await - .map_err(|e| { - is_fcu_done.store(true, std::sync::atomic::Ordering::SeqCst); - CallbackError::AuthProvider(e) - })?; - } else { - tokio::time::sleep(Duration::from_secs(1)).await; + let fcu_handle: tokio::task::JoinHandle> = + crate::spawn_with_session(async move { + if let Some(auth_client) = &auth_provider { + loop { + let fcu_done = is_fcu_done.load(std::sync::atomic::Ordering::SeqCst); + let sending_done = + is_sending_done.load(std::sync::atomic::Ordering::SeqCst); + if fcu_done { + info!("FCU is done, stopping block production..."); + break; + } + if sending_done { + auth_client + .advance_chain(DEFAULT_BLOCK_TIME) + .await + .map_err(|e| { + is_fcu_done + .store(true, std::sync::atomic::Ordering::SeqCst); + CallbackError::AuthProvider(e) + })?; + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } } } - } - Ok(()) - }); + Ok(()) + }); let tx_req_chunks = scenario .get_spam_tx_chunks(txs_per_period, num_periods) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index e33f121c..c3d08873 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -367,8 +367,13 @@ async fn flush_loop( flush_sender: mpsc::Sender, db: Arc, rpc: Arc, + cancel_token: CancellationToken, ) { let mut interval = tokio::time::interval(Duration::from_secs(1)); + /// Number of consecutive blocks with no change in pending count before giving up. + const STALE_BLOCK_LIMIT: u64 = 6; + let mut stale_blocks: u64 = 0; + let mut last_pending_count: usize = usize::MAX; loop { interval.tick().await; @@ -394,6 +399,12 @@ async fn flush_loop( continue; }; + // If cancel_token is set (sending is done) and cache is empty, we're done. + if cancel_token.is_cancelled() && cache_snapshot.is_empty() { + info!("all receipts processed, shutting down receipt collection."); + break; + } + // Get current block number let new_block = match rpc.get_block_number().await { Ok(n) => n, @@ -439,12 +450,34 @@ async fn flush_loop( } if cache_snapshot.is_empty() { + if cancel_token.is_cancelled() { + info!("pending tx cache is empty, shutting down receipt collection."); + return; + } break; } } Err(e) => warn!("flush_cache error for block {}: {:?}", bn, e), } } + + // Track stale blocks: if sending is done and pending count hasn't changed, increment. + if cancel_token.is_cancelled() && !cache_snapshot.is_empty() { + let current_count = cache_snapshot.len(); + if current_count == last_pending_count { + stale_blocks += new_block.saturating_sub(ctx.target_block).max(1); + } else { + stale_blocks = 0; + last_pending_count = current_count; + } + if stale_blocks >= STALE_BLOCK_LIMIT { + warn!( + "pending receipt count unchanged ({}) for {} blocks, shutting down receipt collection.", + current_count, stale_blocks + ); + break; + } + } } } @@ -564,6 +597,7 @@ fn get_tx_error( #[derive(Debug)] pub struct TxActorHandle { sender: mpsc::Sender, + flush_complete: CancellationToken, } #[derive(Debug)] @@ -596,27 +630,39 @@ impl TxActorHandle { let mut actor = TxActor::new(receiver, flush_receiver, fb_receiver, db.clone()); // Spawn the message handler task (owns the cache) - tokio::task::spawn(async move { + crate::spawn_with_session(async move { if let Err(e) = actor.run().await { error!("TxActor message handler terminated with error: {:?}", e); } }); // Spawn the independent flush task (communicates via channels) - tokio::task::spawn(async move { - flush_loop(flush_sender, db, rpc).await; + let flush_cancel = cancel_token.clone(); + let flush_complete = CancellationToken::new(); + let flush_done = flush_complete.clone(); + crate::spawn_with_session(async move { + flush_loop(flush_sender, db, rpc, flush_cancel).await; + flush_done.cancel(); }); // Spawn the flashblocks listener task if URL is provided if let Some(ws_url) = flashblocks_ws_url { - tokio::task::spawn(async move { + crate::spawn_with_session(async move { if let Err(e) = FlashblocksClient::listen(&ws_url, fb_sender, cancel_token).await { error!("{}", e); } }); } - Ok(Self { sender }) + Ok(Self { + sender, + flush_complete, + }) + } + + /// Waits until the flush loop has finished processing all receipts. + pub async fn await_flush(&self) { + self.flush_complete.cancelled().await; } /// Adds a new tx to the cache. diff --git a/crates/core/src/spammer/tx_callback.rs b/crates/core/src/spammer/tx_callback.rs index fc077272..de5a2cb7 100644 --- a/crates/core/src/spammer/tx_callback.rs +++ b/crates/core/src/spammer/tx_callback.rs @@ -136,7 +136,7 @@ impl OnTxSent for LogCallback { tx_actors: Option>>, ) -> Option>> { let cancel_token = self.cancel_token.clone(); - let handle = tokio::task::spawn(async move { + let handle = crate::spawn_with_session(async move { if let Some(tx_actors) = tx_actors { let tx_actor = tx_actors["default"].clone(); let tx = CacheTx { @@ -165,7 +165,7 @@ impl OnBatchSent for LogCallback { } if let Some(provider) = &self.auth_provider { let provider = provider.clone(); - return Some(tokio::task::spawn(async move { + return Some(crate::spawn_with_session(async move { provider .advance_chain(DEFAULT_BLOCK_TIME) .await diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index e866e96d..c8445f66 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -558,7 +558,7 @@ where let http_client = self.http_client.clone(); let scenario_label = self.scenario_label.clone(); - let handle = tokio::task::spawn(async move { + let handle = crate::spawn_with_session(async move { Self::deploy_contract(DeployContractParams { db: &db, tx_req: &tx_req, @@ -717,7 +717,7 @@ where let http_client = self.http_client.clone(); let sem = semaphore.clone(); - let handle = tokio::task::spawn(async move { + let handle = crate::spawn_with_session(async move { let _permit = sem.acquire().await.expect("semaphore closed"); let transport = Http::with_client(http_client, rpc_url.to_owned()); let rpc_client = ClientBuilder::default().transport(transport, false); @@ -1030,7 +1030,7 @@ where let cancel_token = self.ctx.cancel_token.clone(); let error_sender = error_sender.clone(); - tasks.push(tokio::task::spawn(async move { + tasks.push(crate::spawn_with_session(async move { let extra = RuntimeTxInfo::now(); let handles = match payload { ExecutionPayload::SignedTx(signed_tx, req) => { @@ -1190,7 +1190,7 @@ where .collect(); let hist = self.prometheus.hist.get(); - tasks.push(tokio::task::spawn(async move { + tasks.push(crate::spawn_with_session(async move { // Build json-rpc batch payload with multiple eth_sendRawTransaction requests let mut requests = Vec::with_capacity(signed_chunk.len()); for (i, (signed_tx, _)) in signed_chunk.iter().enumerate() { @@ -1772,7 +1772,7 @@ async fn sync_nonces( for addr in all_addrs { let send = sender.clone(); let rpc_client = Arc::new(rpc_client.clone()); - tasks.push(tokio::task::spawn(async move { + tasks.push(crate::spawn_with_session(async move { let nonce = rpc_client.get_transaction_count(addr).await?; send.send((addr, nonce)) .await diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml new file mode 100644 index 00000000..db18dd25 --- /dev/null +++ b/crates/server/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "contender_server" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Contender server" + +[[bin]] +name = "contender-server" +path = "src/main.rs" + +[[bin]] +name = "contender-log-client" +path = "src/log_client.rs" + +[dependencies] +base64.workspace = true +contender_core.workspace = true +contender_testfile.workspace = true +contender_sqlite.workspace = true +contender_cli.workspace = true + +async-trait = { workspace = true } +jsonrpsee = { workspace = true, features = ["server", "macros", "ws-client"] } +thiserror.workspace = true +tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +serde.workspace = true +serde_json.workspace = true +rand.workspace = true +axum = { workspace = true } +tower = "0.4" +tower-http = { version = "0.5", features = ["cors"] } +tokio-stream.workspace = true + +[build-dependencies] +syn = { version = "2", features = ["full"] } +quote = "1" diff --git a/crates/server/build.rs b/crates/server/build.rs new file mode 100644 index 00000000..dfa1c99c --- /dev/null +++ b/crates/server/build.rs @@ -0,0 +1,350 @@ +//! Build script that generates `static/builtin_scenarios.js` — a JavaScript +//! constant describing the parameter schema for every builtin scenario. +//! +//! Uses `syn` to parse the actual Rust token stream from the CLI crate, +//! extracting field names, types, defaults, and help text from `#[arg(...)]` +//! attributes and doc comments. + +use std::collections::HashMap; +use std::env; +use std::fs; +use std::path::{Path, PathBuf}; + +use quote::ToTokens; +use syn::{Attribute, Expr, Fields, Item, Lit, Meta, Type}; + +fn main() { + let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()); + let cli_dir = manifest_dir + .parent() + .unwrap() + .join("cli/src/default_scenarios"); + + // (UI name, source file relative to cli_dir, struct name) + let scenarios: &[(&str, &str, &str)] = &[ + ("Blobs", "blobs.rs", "BlobsCliArgs"), + ("Contract", "custom_contract.rs", "CustomContractCliArgs"), + ( + "EthFunctions", + "eth_functions/command.rs", + "EthFunctionsCliArgs", + ), + ("Erc20", "erc20.rs", "Erc20CliArgs"), + ("FillBlock", "fill_block.rs", "FillBlockCliArgs"), + ("Revert", "revert.rs", "RevertCliArgs"), + ("SetCode", "setcode/base.rs", "SetCodeCliArgs"), + ("Storage", "storage.rs", "StorageStressCliArgs"), + ("Stress", "stress.rs", "StressCliArgs"), + ("Transfers", "transfers.rs", "TransferStressCliArgs"), + ("UniV2", "uni_v2.rs", "UniV2CliArgs"), + ]; + + let enum_files: &[(&str, &str)] = &[ + ("EthereumOpcode", "eth_functions/opcodes.rs"), + ("EthereumPrecompile", "eth_functions/precompiles.rs"), + ]; + + let enum_variants = parse_enum_files(&cli_dir, enum_files); + + let mut js = + String::from("// AUTO-GENERATED by build.rs — do not edit\nconst BUILTIN_SCENARIOS = {\n"); + + for (name, file, struct_name) in scenarios { + let path = cli_dir.join(file); + println!("cargo:rerun-if-changed={}", path.display()); + + let source = fs::read_to_string(&path).unwrap_or_else(|e| { + panic!("Failed to read {}: {e}", path.display()); + }); + let ast = syn::parse_file(&source).unwrap_or_else(|e| { + panic!("Failed to parse {}: {e}", path.display()); + }); + + let fields = extract_fields(&ast, struct_name, &enum_variants); + + let key = to_kebab_case(name); + js.push_str(&format!(" \"{key}\": [\n")); + for f in &fields { + js.push_str(&format!( + " {{ name: {}, type: {}, default: {}, help: {}, optional: {} }},\n", + js_str(&f.name), + js_str(&f.field_type), + js_opt(&f.default), + js_opt(&f.help), + if f.optional { "true" } else { "false" }, + )); + } + js.push_str(" ],\n"); + } + js.push_str("};\n"); + + let static_dir = manifest_dir.join("static"); + fs::create_dir_all(&static_dir).unwrap(); + fs::write(static_dir.join("builtin_scenarios.js"), &js).unwrap(); + + println!("cargo:rerun-if-changed=build.rs"); +} + +// ── types ──────────────────────────────────────────────────────────── + +#[derive(Debug)] +struct FieldInfo { + name: String, + field_type: String, + default: Option, + help: Option, + optional: bool, +} + +// ── JS helpers ─────────────────────────────────────────────────────── + +/// Convert PascalCase to kebab-case to match `#[serde(rename_all = "kebab-case")]`. +fn to_kebab_case(s: &str) -> String { + let mut result = String::new(); + let chars: Vec = s.chars().collect(); + for (i, &c) in chars.iter().enumerate() { + if c.is_uppercase() && i > 0 { + let prev = chars[i - 1]; + if prev.is_lowercase() || prev.is_ascii_digit() { + result.push('-'); + } else if i + 1 < chars.len() && chars[i + 1].is_lowercase() { + result.push('-'); + } + } + result.push(c.to_ascii_lowercase()); + } + result +} + +fn js_str(s: &str) -> String { + format!( + "\"{}\"", + s.replace('\\', "\\\\") + .replace('"', "\\\"") + .replace('\n', "\\n") + .replace('\r', "\\r") + ) +} + +fn js_opt(v: &Option) -> String { + match v { + Some(s) => js_str(s), + None => "null".into(), + } +} + +// ── enum parsing ───────────────────────────────────────────────────── + +fn parse_enum_files(cli_dir: &Path, enums: &[(&str, &str)]) -> HashMap> { + let mut map = HashMap::new(); + for (enum_name, file) in enums { + let path = cli_dir.join(file); + println!("cargo:rerun-if-changed={}", path.display()); + let source = fs::read_to_string(&path).unwrap_or_default(); + let ast = syn::parse_file(&source).unwrap(); + for item in &ast.items { + if let Item::Enum(e) = item { + if e.ident == enum_name { + let variants: Vec = + e.variants.iter().map(|v| v.ident.to_string()).collect(); + map.insert(enum_name.to_string(), variants); + } + } + } + } + map +} + +// ── struct field extraction ────────────────────────────────────────── + +fn extract_fields( + ast: &syn::File, + struct_name: &str, + enum_variants: &HashMap>, +) -> Vec { + for item in &ast.items { + if let Item::Struct(s) = item { + if s.ident == struct_name { + return fields_from_struct(s, enum_variants); + } + } + } + vec![] +} + +fn fields_from_struct( + s: &syn::ItemStruct, + enum_variants: &HashMap>, +) -> Vec { + let Fields::Named(ref named) = s.fields else { + return vec![]; + }; + + let mut out = Vec::new(); + + for field in &named.named { + // Skip #[command(flatten)] and #[command(subcommand)] fields. + if has_command_attr(field, "flatten") || has_command_attr(field, "subcommand") { + continue; + } + + let name = field + .ident + .as_ref() + .map(|i| i.to_string()) + .unwrap_or_default(); + + let raw_ty = type_to_string(&field.ty); + let optional = raw_ty.starts_with("Option <") || raw_ty.starts_with("Option<"); + let field_type = rust_type_to_js(&raw_ty, enum_variants); + + let arg_attrs = collect_arg_kv(&field.attrs); + let default = arg_attrs + .get("default_value") + .cloned() + .or_else(|| arg_attrs.get("default_value_t").cloned()); + let help = arg_attrs + .get("long_help") + .cloned() + .or_else(|| arg_attrs.get("help").cloned()) + .or_else(|| doc_comment(&field.attrs)); + + out.push(FieldInfo { + name, + field_type, + default, + help, + optional, + }); + } + out +} + +/// Check if a field has `#[command(subcommand)]` or `#[command(flatten)]`. +fn has_command_attr(field: &syn::Field, keyword: &str) -> bool { + field.attrs.iter().any(|attr| { + if !attr.path().is_ident("command") { + return false; + } + let tokens = attr.meta.to_token_stream().to_string(); + tokens.contains(keyword) + }) +} + +/// Collect key-value pairs from `#[arg(...)]` attributes. +fn collect_arg_kv(attrs: &[Attribute]) -> HashMap { + let mut map = HashMap::new(); + for attr in attrs { + if !attr.path().is_ident("arg") { + continue; + } + if let Meta::List(list) = &attr.meta { + let tokens = list.tokens.to_string(); + for segment in split_top_level(&tokens) { + let segment = segment.trim(); + if let Some((k, v)) = segment.split_once('=') { + let k = k.trim(); + let v = v.trim(); + // Parse quoted values through syn::LitStr to properly unescape. + let value = if v.starts_with('"') { + syn::parse_str::(v) + .map(|lit| lit.value()) + .unwrap_or_else(|_| v.to_string()) + } else { + v.to_string() + }; + map.insert(k.to_string(), value); + } + } + } + } + map +} + +/// Split a token stream string on top-level commas (not inside parens/brackets/quotes). +fn split_top_level(s: &str) -> Vec { + let mut parts = Vec::new(); + let mut current = String::new(); + let mut depth = 0u32; + let mut in_string = false; + let mut prev = '\0'; + for ch in s.chars() { + if ch == '"' && prev != '\\' { + in_string = !in_string; + } + if !in_string { + match ch { + '(' | '[' => depth += 1, + ')' | ']' => depth = depth.saturating_sub(1), + ',' if depth == 0 => { + parts.push(std::mem::take(&mut current)); + prev = ch; + continue; + } + _ => {} + } + } + current.push(ch); + prev = ch; + } + if !current.trim().is_empty() { + parts.push(current); + } + parts +} + +/// Extract `/// doc comments` concatenated into a single string. +fn doc_comment(attrs: &[Attribute]) -> Option { + let docs: Vec = attrs + .iter() + .filter_map(|attr| { + if !attr.path().is_ident("doc") { + return None; + } + if let Meta::NameValue(nv) = &attr.meta { + if let Expr::Lit(lit) = &nv.value { + if let Lit::Str(s) = &lit.lit { + return Some(s.value().trim().to_string()); + } + } + } + None + }) + .filter(|s| !s.is_empty()) + .collect(); + if docs.is_empty() { + None + } else { + Some(docs.join(" ")) + } +} + +fn type_to_string(ty: &Type) -> String { + ty.to_token_stream().to_string() +} + +fn rust_type_to_js(raw: &str, enum_variants: &HashMap>) -> String { + let inner = raw + .replace("Option <", "Option<") + .replace("Vec <", "Vec<") + .replace("Option<", "") + .replace("Vec<", "") + .replace('>', "") + .replace(' ', ""); + + for (enum_name, variants) in enum_variants { + if inner.contains(enum_name.as_str()) { + let opts = variants.join(","); + if raw.contains("Vec") { + return format!("multi-select:{opts}"); + } + return format!("select:{opts}"); + } + } + + match inner.as_str() { + "bool" => "bool".into(), + "u32" | "u64" | "i64" | "usize" => "number".into(), + _ => "text".into(), + } +} diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs new file mode 100644 index 00000000..015ec89f --- /dev/null +++ b/crates/server/src/config.rs @@ -0,0 +1,35 @@ +use crate::log_layer::{new_log_sinks, SessionLogRouter, SessionLogSinks}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +pub struct ServerConfig { + pub rpc_addr: String, + pub sse_addr: String, +} + +/// Load server configuration from environment variables, with defaults. +pub fn load_server_config() -> ServerConfig { + let rpc_addr = std::env::var("RPC_HOST").unwrap_or("127.0.0.1:3000".to_string()); + let sse_addr = std::env::var("SSE_HOST").unwrap_or("127.0.0.1:3001".to_string()); + ServerConfig { rpc_addr, sse_addr } +} + +/// Initialize tracing with a custom layer for routing logs to session-specific sinks. +pub fn init_tracing() -> SessionLogSinks { + let log_sinks = new_log_sinks(); + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + let fmt_layer = tracing_subscriber::fmt::layer() + .with_ansi(true) + .with_target(true) + .with_line_number(true); + + let session_layer = SessionLogRouter::new(log_sinks.clone()); + + tracing_subscriber::registry() + .with(filter) + .with(fmt_layer) + .with(session_layer) + .init(); + + log_sinks +} diff --git a/crates/server/src/error.rs b/crates/server/src/error.rs new file mode 100644 index 00000000..f6daf4b3 --- /dev/null +++ b/crates/server/src/error.rs @@ -0,0 +1,109 @@ +use base64::DecodeError; +use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; +use thiserror::Error; + +use crate::sessions::ContenderSessionInfo; + +#[derive(Debug, Error)] +pub enum ContenderRpcError { + #[error("Failed to initialize contender session: {0}")] + SessionInitializationFailed(contender_core::Error), + + #[error("Session not found: {0}")] + SessionNotFound(usize), + + #[error("Session {} is not initialized", _0.id)] + SessionNotInitialized(ContenderSessionInfo), + + #[error("Session {} failed: {error}", info.id)] + SessionFailed { + info: ContenderSessionInfo, + error: String, + }, + + #[error("Session {} is currently busy: {:?}", _0.id, _0.status)] + SessionBusy(ContenderSessionInfo), + + #[error("Session {0} is not currently spamming")] + SessionNotBusy(usize), + + #[error("Invalid test config: {0}")] + InvalidTestConfig(#[from] contender_testfile::Error), + + #[error("Invalid arguments: {0}")] + InvalidArguments(String), + + #[error("Invalid base64: {0}")] + InvalidBase64(#[from] DecodeError), + + #[error("Invalid UTF-8 in decoded config: {0}")] + InvalidUtf8(std::string::FromUtf8Error), +} + +impl From for ErrorObjectOwned { + fn from(err: ContenderRpcError) -> Self { + match err { + /* TODO + standardize error codes and messages, + and decide what info to include in the data field + (e.g. stack traces for internal errors, but not for user errors) + */ + ContenderRpcError::SessionInitializationFailed(e) => ErrorObject::owned( + 1, + "Failed to initialize contender session".to_string(), + Some(e.to_string()), + ), + + ContenderRpcError::InvalidTestConfig(e) => { + ErrorObject::owned(2, "Invalid test config".to_string(), Some(e.to_string())) + } + + ContenderRpcError::InvalidBase64(e) => ErrorObject::owned( + 3, + "Invalid base64 encoding".to_string(), + Some(e.to_string()), + ), + + ContenderRpcError::InvalidUtf8(e) => ErrorObject::owned( + 4, + "Invalid UTF-8 in config".to_string(), + Some(e.to_string()), + ), + + ContenderRpcError::SessionNotFound(id) => { + ErrorObject::owned(5, format!("Session {id} not found"), Option::::None) + } + + ContenderRpcError::SessionNotInitialized(info) => ErrorObject::owned( + 6, + format!( + "Session {} not ready (status: {}); must be initialized before spamming", + info.id, info.status + ), + Option::::None, + ), + + ContenderRpcError::SessionBusy(info) => ErrorObject::owned( + 7, + format!("Session {} is currently busy: {}", info.id, info.status), + Option::::None, + ), + + ContenderRpcError::SessionNotBusy(id) => ErrorObject::owned( + 9, + format!("Session {id} is not currently spamming"), + Option::::None, + ), + + ContenderRpcError::SessionFailed { info, error } => ErrorObject::owned( + 8, + format!("Session {} failed with error: {error}", info.id), + Option::::None, + ), + + ContenderRpcError::InvalidArguments(msg) => { + ErrorObject::owned(400, "Invalid arguments".to_string(), Some(msg)) + } + } + } +} diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs new file mode 100644 index 00000000..f6d4fca9 --- /dev/null +++ b/crates/server/src/lib.rs @@ -0,0 +1,6 @@ +pub mod config; +pub mod error; +pub mod log_layer; +pub mod rpc_server; +pub mod sessions; +pub mod sse; diff --git a/crates/server/src/log_client.rs b/crates/server/src/log_client.rs new file mode 100644 index 00000000..cefdc7fd --- /dev/null +++ b/crates/server/src/log_client.rs @@ -0,0 +1,52 @@ +//! Simple test client that subscribes to session logs via JSON-RPC over websocket. +//! +//! Usage: +//! contender-log-client [ws_url] +//! +//! Examples: +//! contender-log-client 0 +//! contender-log-client 2 ws://127.0.0.1:3000 + +use jsonrpsee::core::client::SubscriptionClientT; +use jsonrpsee::rpc_params; +use jsonrpsee::ws_client::WsClientBuilder; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = std::env::args().collect(); + let session_id: usize = args + .get(1) + .expect("Usage: contender-log-client [ws_url]") + .parse() + .expect("session_id must be a number"); + let url = args + .get(2) + .map(|s| s.as_str()) + .unwrap_or("ws://127.0.0.1:3000"); + + eprintln!("Connecting to {url}, subscribing to session {session_id}..."); + + let client = WsClientBuilder::default().build(url).await?; + + let mut sub = client + .subscribe::( + "subscribe_logs", + rpc_params![session_id], + "unsubscribe_logs", + ) + .await?; + + eprintln!("Subscribed. Waiting for logs...\n"); + + while let Some(msg) = sub.next().await { + match msg { + Ok(line) => println!("{line}"), + Err(e) => { + eprintln!("Subscription error: {e}"); + break; + } + } + } + + Ok(()) +} diff --git a/crates/server/src/log_layer.rs b/crates/server/src/log_layer.rs new file mode 100644 index 00000000..d9495dfc --- /dev/null +++ b/crates/server/src/log_layer.rs @@ -0,0 +1,141 @@ +use std::{collections::HashMap, fmt, sync::Arc}; + +use tokio::sync::{broadcast, RwLock}; +use tracing::{field::Visit, Event, Subscriber}; +use tracing_subscriber::{fmt::time::FormatTime, layer::Context, registry::LookupSpan, Layer}; + +/// A shared registry mapping session IDs to broadcast senders. +/// The `SessionLogRouter` layer uses this to route log events to the correct session stream. +pub type SessionLogSinks = Arc>>>; + +/// Creates a new empty sink map. +pub fn new_log_sinks() -> SessionLogSinks { + Arc::new(RwLock::new(HashMap::new())) +} + +/// A `tracing` layer that inspects span context for a `session` span with an `id` field, +/// and routes formatted log events to the corresponding broadcast channel. +pub struct SessionLogRouter { + sinks: SessionLogSinks, +} + +impl SessionLogRouter { + pub fn new(sinks: SessionLogSinks) -> Self { + Self { sinks } + } +} + +impl Layer for SessionLogRouter +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + // Walk the span scope to find a span named "session" or "session_init" with an "id" field. + let session_id = ctx + .event_scope(event) + .and_then(|scope| { + for span in scope { + let name = span.name(); + if name.starts_with("session") { + let extensions = span.extensions(); + if let Some(fields) = extensions.get::() { + return Some(fields.id); + } + } + } + None + }) + .or_else(|| { + // Fall back to the task-local session ID (set by spawn_with_session). + contender_core::CURRENT_SESSION_ID.try_with(|id| *id).ok() + }); + + let Some(session_id) = session_id else { + return; + }; + + // Format the event. + let formatted = format_event(event, session_id); + + // Try to send non-blocking (don't await the RwLock — use try_read). + if let Ok(sinks) = self.sinks.try_read() { + if let Some(tx) = sinks.get(&session_id) { + let _ = tx.send(formatted); + } + } + } + + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: Context<'_, S>, + ) { + // When a span named "session*" is created, extract the `id` field and store it. + let span = ctx.span(id).expect("span not found"); + if span.name().starts_with("session") { + let mut fields = SessionSpanFields { id: 0 }; + attrs.record(&mut fields); + span.extensions_mut().insert(fields); + } + } +} + +/// Stored in span extensions to carry the session ID. +struct SessionSpanFields { + id: usize, +} + +impl Visit for SessionSpanFields { + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if field.name() == "id" { + self.id = value as usize; + } + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + if field.name() == "id" { + self.id = value as usize; + } + } + + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn fmt::Debug) {} +} + +fn format_event(event: &Event<'_>, session_id: usize) -> String { + let metadata = event.metadata(); + let mut visitor = MessageVisitor { + message: String::new(), + }; + event.record(&mut visitor); + + let mut timestamp = String::new(); + let _ = tracing_subscriber::fmt::time::SystemTime.format_time( + &mut tracing_subscriber::fmt::format::Writer::new(&mut timestamp), + ); + + format!( + "{} {} session[{}]: {}", + timestamp, + metadata.level(), + session_id, + visitor.message + ) +} + +struct MessageVisitor { + message: String, +} + +impl Visit for MessageVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn fmt::Debug) { + if field.name() == "message" { + self.message = format!("{:?}", value); + } else if !self.message.is_empty() { + self.message + .push_str(&format!(" {}={:?}", field.name(), value)); + } else { + self.message = format!("{}={:?}", field.name(), value); + } + } +} diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs new file mode 100644 index 00000000..81072e6c --- /dev/null +++ b/crates/server/src/main.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use contender_server::config::{init_tracing, load_server_config}; +use contender_server::rpc_server::{ContenderRpcServer as _, ContenderServer}; +use contender_server::sessions::ContenderSessionCache; +use contender_server::sse::sse_router; +use jsonrpsee::server::{Server, ServerHandle}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tower_http::cors::{Any, CorsLayer}; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // initialize logging w/ a custom layer that pipes logs to session-specific broadcast channels + let log_sinks = init_tracing(); + + // load server config + let config = load_server_config(); + + // shared session cache + let sessions = Arc::new(RwLock::new(ContenderSessionCache::new(log_sinks))); + + // RPC server for session management and log subscription + let handle = start_rpc_server(sessions.clone(), &config.rpc_addr).await?; + + // SSE endpoint for log streaming + let sse_handle = start_sse_server(sessions, &config.sse_addr).await?; + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down..."); + } + _ = handle.stopped() => { + info!("RPC server stopped"); + } + res = sse_handle => { + info!("SSE server stopped: {:?}", res); + } + } + + Ok(()) +} + +/// Starts a JSON-RPC HTTP server for managing contender sessions, +/// which includes a websocket server for subscribing to session logs. +/// +/// Returns a handle to the RPC server; awaiting `.stopped()` on this handle will wait until the server shuts down. +async fn start_rpc_server( + sessions: Arc>, + addr: &str, +) -> std::io::Result { + let cors = CorsLayer::new() + .allow_origin(Any) + .allow_methods(Any) + .allow_headers(Any); + + let server = Server::builder() + .set_http_middleware(tower::ServiceBuilder::new().layer(cors)) + .build(addr) + .await?; + let module = ContenderServer::new(sessions).into_rpc(); + let handle = server.start(module); + + info!("JSON-RPC server listening on {addr}"); + Ok(handle) +} + +/// Starts a simple SSE server that serves session logs at `/logs/:session_id`. +/// +/// Returns a handle to the server task; awaiting this handle will wait until the server shuts down. +async fn start_sse_server( + sessions: Arc>, + addr: &str, +) -> std::io::Result>> { + let sse_app = sse_router(sessions); + let sse_listener = tokio::net::TcpListener::bind(addr).await?; + info!("SSE server listening on {addr}"); + let sse_handle = tokio::spawn(async move { axum::serve(sse_listener, sse_app).await }); + Ok(sse_handle) +} diff --git a/crates/server/src/rpc_server/mod.rs b/crates/server/src/rpc_server/mod.rs new file mode 100644 index 00000000..5d93d65a --- /dev/null +++ b/crates/server/src/rpc_server/mod.rs @@ -0,0 +1,5 @@ +mod server; +mod types; + +pub use server::{ContenderRpcServer, ContenderServer}; +pub use types::*; diff --git a/crates/server/src/rpc_server/server.rs b/crates/server/src/rpc_server/server.rs new file mode 100644 index 00000000..b666036b --- /dev/null +++ b/crates/server/src/rpc_server/server.rs @@ -0,0 +1,327 @@ +use contender_core::generator::RandSeed; +use contender_core::spammer::{BlockwiseSpammer, LogCallback, NilCallback, TimedSpammer}; +use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing::{info, Instrument}; + +use crate::rpc_server::ServerStatus; +use crate::{ + error::ContenderRpcError, + rpc_server::types::{AddSessionParams, SpamParams, SpammerType}, + sessions::{ContenderSessionCache, ContenderSessionInfo, SessionStatus}, +}; + +#[rpc(server)] +pub trait ContenderRpc { + // ================ RPC Methods ================ + + #[method(name = "status")] + async fn status(&self) -> jsonrpsee::core::RpcResult; + + #[method(name = "add_session")] + async fn add_session( + &self, + name: AddSessionParams, + ) -> jsonrpsee::core::RpcResult; + + #[method(name = "get_session")] + async fn get_session( + &self, + id: usize, + ) -> jsonrpsee::core::RpcResult>; + + #[method(name = "get_all_sessions")] + async fn get_all_sessions(&self) -> jsonrpsee::core::RpcResult>; + + #[method(name = "remove_session")] + async fn remove_session(&self, id: usize) -> jsonrpsee::core::RpcResult<()>; + + #[method(name = "spam")] + async fn spam(&self, params: SpamParams) -> jsonrpsee::core::RpcResult; + + #[method(name = "stop")] + async fn stop(&self, session_id: usize) -> jsonrpsee::core::RpcResult; + + // ================ WS Methods ================ + + #[subscription(name = "subscribe_logs" => "session_log", item = String)] + async fn subscribe_logs(&self, session_id: usize) -> jsonrpsee::core::SubscriptionResult; +} + +pub struct ContenderServer { + pub sessions: Arc>, +} + +impl ContenderServer { + pub fn new(sessions: Arc>) -> Self { + Self { sessions } + } +} + +#[async_trait::async_trait] +impl ContenderRpcServer for ContenderServer { + async fn status(&self) -> jsonrpsee::core::RpcResult { + let sessions = self.sessions.read().await; + Ok(ServerStatus { + num_sessions: sessions.num_sessions(), + }) + } + + async fn add_session( + &self, + params: AddSessionParams, + ) -> jsonrpsee::core::RpcResult { + let session_seed; + let info; + { + let mut sessions = self.sessions.write().await; + session_seed = RandSeed::seed_from_bytes(&sessions.num_sessions().to_be_bytes()); + let session = sessions.add_session(params.to_new_session_params(session_seed).await?); + info = session.info.clone(); + } + + let session_id = info.id; + let sessions = Arc::clone(&self.sessions); + + info!( + "Spawning initialization for session {} with RPC URL {}", + info.name, info.rpc_url + ); + + let span = tracing::info_span!("session_init", id = session_id); + tokio::spawn( + contender_core::CURRENT_SESSION_ID.scope( + session_id, + async move { + // Take the contender out so we can initialize without holding the lock. + let contender = { + let mut lock = sessions.write().await; + lock.take_contender(session_id) + }; + + let Some(mut contender) = contender else { + return; + }; + + let result = contender.initialize().await; + + // Put the contender back and update status. + let mut lock = sessions.write().await; + lock.put_contender(session_id, contender); + if let Some(session) = lock.get_session_mut(session_id) { + match result { + Ok(()) => { + session.info.status = SessionStatus::Ready; + info!("Session {} initialized successfully", session_id); + } + Err(e) => { + let msg = e.to_string(); + session.info.status = SessionStatus::Failed(msg.clone()); + tracing::error!( + "Session {} initialization failed: {}", + session_id, + msg + ); + } + } + } + } + .instrument(span), + ), + ); + + Ok(info) + } + + async fn get_session( + &self, + id: usize, + ) -> jsonrpsee::core::RpcResult> { + let sessions = self.sessions.read().await; + Ok(sessions.get_session(id).map(|s| s.info.clone())) + } + + async fn get_all_sessions(&self) -> jsonrpsee::core::RpcResult> { + let sessions = self.sessions.read().await; + Ok(sessions.all_sessions()) + } + + async fn remove_session(&self, id: usize) -> jsonrpsee::core::RpcResult<()> { + let mut sessions = self.sessions.write().await; + sessions.remove_session(id); + Ok(()) + } + + async fn subscribe_logs( + &self, + pending: PendingSubscriptionSink, + session_id: usize, + ) -> jsonrpsee::core::SubscriptionResult { + let sessions = self.sessions.read().await; // TODO: replace self.sessions calls with wrappers to avoid accidental improper locking patterns + let Some(session) = sessions.get_session(session_id) else { + pending + .reject(jsonrpsee::types::ErrorObject::owned( + 5, + format!("Session {session_id} not found"), + None::<()>, + )) + .await; + return Ok(()); + }; + let mut rx = session.log_channel.subscribe(); + let cancel = session.cancel.clone(); + drop(sessions); + + let sink = pending.accept().await?; + + tokio::spawn(async move { + loop { + tokio::select! { + result = rx.recv() => { + let Ok(msg) = result else { break }; + let sub_msg = + SubscriptionMessage::from_json(&msg).expect("failed to serialize log message"); + if sink.send(sub_msg).await.is_err() { + break; + } + } + _ = cancel.cancelled() => break, + } + } + }); + + Ok(()) + } + + async fn spam(&self, params: SpamParams) -> jsonrpsee::core::RpcResult { + let session_id = params.session_id; + let sessions = self.sessions.read().await; + let Some(session) = sessions.get_session(session_id) else { + return Err(ContenderRpcError::SessionNotFound(session_id).into()); + }; + error_if_session_not_ready(&session.info)?; + let save_receipts = params.save_receipts.unwrap_or(false); + drop(sessions); + + // Take the contender out so we can spam without holding the lock. + let spam_cancel = CancellationToken::new(); + let contender = { + let mut lock = self.sessions.write().await; + if let Some(session) = lock.get_session_mut(session_id) { + session.info.status = SessionStatus::Spamming(params.clone()); + session.spam_cancel = Some(spam_cancel.clone()); + } + lock.take_contender(session_id) + }; + + let Some(contender) = contender else { + return Err(ContenderRpcError::SessionNotFound(session_id).into()); + }; + + let sessions = Arc::clone(&self.sessions); + let span = tracing::info_span!("session_spam", id = session_id); + tokio::spawn( + contender_core::CURRENT_SESSION_ID.scope( + session_id, + async move { + let mut contender = contender; + let opts = params.to_run_opts(); + let spammer_type = params.spammer.unwrap_or_default(); + + macro_rules! run_spam { + ($callback:expr) => { + match spammer_type { + SpammerType::Timed => { + let spammer = TimedSpammer::new(Duration::from_secs(1)); + contender + .spam(spammer, Arc::new($callback), opts, Some(spam_cancel)) + .await + } + SpammerType::Blockwise => { + let spammer = BlockwiseSpammer::new(); + contender + .spam(spammer, Arc::new($callback), opts, Some(spam_cancel)) + .await + } + } + }; + } + + let result = if save_receipts { + let provider = contender.provider(); + run_spam!(LogCallback::new(Arc::new(provider))) + } else { + run_spam!(NilCallback) + }; + + // Put the contender back and log outcome. + let mut lock = sessions.write().await; + lock.put_contender(session_id, contender); + if let Some(session) = lock.get_session_mut(session_id) { + session.spam_cancel = None; + } + match result { + Ok(()) => { + if let Some(session) = lock.get_session_mut(session_id) { + session.info.status = SessionStatus::Ready; + } + info!("Session {} spam completed successfully", session_id); + } + Err(e) => { + if let Some(session) = lock.get_session_mut(session_id) { + session.info.status = + SessionStatus::Failed(format!("spam failed: {e}")); + } + tracing::error!("Session {} spam failed: {}", session_id, e); + } + } + } + .instrument(span), + ), + ); + + Ok(format!("Spamming session {session_id}")) + } + + async fn stop(&self, session_id: usize) -> jsonrpsee::core::RpcResult { + let span = tracing::info_span!("session_stop", id = session_id); + let sessions = self.sessions.read().await; + let Some(session) = sessions.get_session(session_id) else { + return Err(ContenderRpcError::SessionNotFound(session_id).into()); + }; + let Some(ref token) = session.spam_cancel else { + return Err(ContenderRpcError::SessionNotBusy(session_id).into()); + }; + token.cancel(); + drop(sessions); + { + let _enter = span.enter(); + info!("Sent stop signal to session {session_id}"); + } + Ok(format!("Stopping session {session_id}")) + } +} + +/// Helper function to check if a session is ready to spam, +/// returning an appropriate RPC error if not. +fn error_if_session_not_ready( + session_info: &ContenderSessionInfo, +) -> jsonrpsee::core::RpcResult<()> { + Ok(match &session_info.status { + SessionStatus::Failed(msg) => { + return Err(ContenderRpcError::SessionFailed { + info: session_info.clone(), + error: msg.to_owned(), + } + .into()) + } + SessionStatus::Spamming(_) => { + return Err(ContenderRpcError::SessionBusy(session_info.clone()).into()) + } + SessionStatus::Ready => (), + _ => return Err(ContenderRpcError::SessionNotInitialized(session_info.clone()).into()), + }) +} diff --git a/crates/server/src/rpc_server/types.rs b/crates/server/src/rpc_server/types.rs new file mode 100644 index 00000000..1fd71589 --- /dev/null +++ b/crates/server/src/rpc_server/types.rs @@ -0,0 +1,195 @@ +use crate::{error::ContenderRpcError, sessions::NewSessionParams}; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; +use contender_cli::default_scenarios::{BuiltinOptions, BuiltinScenarioCli}; +use contender_core::{ + alloy::{ + network::AnyNetwork, + providers::{DynProvider, ProviderBuilder}, + }, + generator::RandSeed, + test_scenario::Url, + RunOpts, +}; +use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use tracing::debug; + +/// Data returned from the `status` endpoint, containing general info about the server. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ServerStatus { + pub num_sessions: usize, +} + +/// RPC parameters for adding a new contender session. +#[derive(Clone, Debug, Deserialize)] +pub struct AddSessionParams { + pub name: String, + pub rpc_url: Url, + pub test_config: Option, +} + +impl AddSessionParams { + pub async fn to_new_session_params( + self, + seed: RandSeed, + ) -> Result { + let test_config = if let Some(config) = self.test_config { + let provider = DynProvider::new( + ProviderBuilder::new() + .network::() + .connect_http(self.rpc_url.clone()), + ); + config + .to_testconfig( + Some(BuiltinOptions { + accounts_per_agent: None, + seed, + spam_rate: None, + }), + &provider, + ) + .await? + } else { + TestConfig::from_str(include_str!("../../../../scenarios/uniV2.toml")) + .expect("default config should be valid") + }; + + Ok(NewSessionParams { + name: self.name.clone(), + rpc_url: self.rpc_url.clone(), + test_config, + }) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum TestConfigSource { + TomlBase64(String), + Json(TestConfig), + Builtin(BuiltinScenarioCli), +} + +impl TestConfigSource { + pub async fn to_testconfig( + self, + builtin_options: Option, + provider: &DynProvider, + ) -> Result { + match self { + TestConfigSource::TomlBase64(b64) => { + let bytes = BASE64.decode(b64)?; + debug!( + "Decoded test config from base64, length {} bytes", + bytes.len() + ); + let config_str = + String::from_utf8(bytes).map_err(ContenderRpcError::InvalidUtf8)?; + TestConfig::from_str(&config_str).map_err(ContenderRpcError::InvalidTestConfig) + } + + TestConfigSource::Json(config) => Ok(config), + + TestConfigSource::Builtin(builtin) => { + let scenario = builtin + .to_builtin_scenario(provider, builtin_options.unwrap_or_default()) + .await + .unwrap() + .into(); + Ok(scenario) + } + } + } +} + +/// RPC parameters for the `spam` method. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct SpamParams { + pub session_id: usize, + /// Number of transactions per period. Defaults to 10. + pub txs_per_period: Option, + /// Number of periods (seconds or blocks). Defaults to 10. + pub duration: Option, + /// Which spammer to use. Defaults to `Timed`. + pub spammer: Option, + /// Human-readable name for this spam run. + pub name: Option, + /// Whether to look for receipts while spamming; enables onchain metrics collection. + pub save_receipts: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize, Default, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum SpammerType { + /// Send a batch of txs at a fixed time interval (1 second). + #[default] + Timed, + /// Send a batch of txs every new block. + Blockwise, +} + +impl SpamParams { + pub fn to_run_opts(&self) -> RunOpts { + let mut opts = RunOpts::new(); + if let Some(n) = self.txs_per_period { + opts = opts.txs_per_period(n); + } + if let Some(n) = self.duration { + opts = opts.periods(n); + } + if let Some(name) = &self.name { + opts = opts.name(name); + } + opts + } +} + +#[cfg(test)] +mod tests { + use super::*; + use base64::engine::general_purpose::STANDARD as BASE64; + use contender_cli::default_scenarios::transfers::TransferStressCliArgs; + use contender_core::alloy::{ + consensus::constants::ETH_TO_WEI, + primitives::{Address, U256}, + }; + + #[test] + fn test_toml_base64_variant() { + let toml_content = include_str!("../../../../scenarios/uniV2.toml"); + let b64 = BASE64.encode(toml_content); + let json = serde_json::json!({ "TomlBase64": b64 }); + // println!( + // "TomlBase64:\n{}\n", + // serde_json::to_string_pretty(&json).unwrap() + // ); + + let source: TestConfigSource = serde_json::from_value(json).unwrap(); + assert!(matches!(source, TestConfigSource::TomlBase64(_))); + } + + #[test] + fn test_json_variant() { + let config = + TestConfig::from_str(include_str!("../../../../scenarios/uniV2.toml")).unwrap(); + let json = serde_json::json!({ "Json": config }); + // println!("Json:\n{}\n", serde_json::to_string_pretty(&json).unwrap()); + + let source: TestConfigSource = serde_json::from_value(json).unwrap(); + assert!(matches!(source, TestConfigSource::Json(_))); + } + + #[tokio::test] + async fn test_builtin_variant() { + let builtin = + TestConfigSource::Builtin(BuiltinScenarioCli::Transfers(TransferStressCliArgs { + amount: U256::from(ETH_TO_WEI), + recipient: Some(Address::ZERO), + })); + let json = serde_json::json!(builtin); + // println!("{}", serde_json::to_string_pretty(&json).unwrap()); + + let source: TestConfigSource = serde_json::from_value(json).unwrap(); + assert!(matches!(source, TestConfigSource::Builtin(_))); + } +} diff --git a/crates/server/src/sessions.rs b/crates/server/src/sessions.rs new file mode 100644 index 00000000..8e651ffa --- /dev/null +++ b/crates/server/src/sessions.rs @@ -0,0 +1,220 @@ +use contender_core::{generator::RandSeed, test_scenario::Url, Contender}; +use contender_sqlite::SqliteDb; +use contender_testfile::TestConfig; +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; + +use crate::{ + log_layer::SessionLogSinks, + rpc_server::{SpamParams, SpammerType}, +}; + +type SessionId = usize; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub enum SessionStatus { + Initializing, + Ready, + Spamming(SpamParams), + Failed(String), +} + +impl std::fmt::Display for SessionStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SessionStatus::Initializing => write!(f, "Initializing"), + SessionStatus::Ready => write!(f, "Ready"), + SessionStatus::Spamming(params) => { + let res = params.to_run_opts(); + let spammer_type = params.spammer.clone().unwrap_or_default(); + let units = match spammer_type { + SpammerType::Timed => ("tps", "seconds"), + SpammerType::Blockwise => ("tpb", "blocks"), + }; + write!( + f, + "Spamming ({} {} for {} {})", + res.txs_per_period, units.0, res.periods, units.1 + ) + } + SessionStatus::Failed(err) => write!(f, "Failed: {err}"), + } + } +} + +pub struct ContenderSession { + /// Metadata about this session (id, name, rpc_url, status). + pub info: ContenderSessionInfo, + /// The contender instance for this session. `None` while it is taken out for + /// initialization or spamming (to avoid holding the lock during long operations). + pub contender: Option>, + /// Broadcast channel for per-session log lines. The tracing layer sends formatted + /// events here; WS and SSE subscribers receive from it. + pub log_channel: broadcast::Sender, + /// Session-lifetime token. Cancelled when the session is removed, which terminates + /// all WS/SSE log subscriber tasks. Once cancelled the session cannot be reused. + pub cancel: CancellationToken, + /// Per-spam-run token. Created fresh each time `spam` is called, cancelled by `stop` + /// (or `remove`). After cancellation the session returns to `Ready` and can spam again. + pub spam_cancel: Option, +} + +pub struct NewSessionParams { + pub name: String, + pub rpc_url: Url, + pub test_config: TestConfig, +} + +impl ContenderSession { + /// Should only be called by ContenderSessionCache when adding a new session, + /// since the session ID is determined by the cache + fn new(id: SessionId, params: NewSessionParams) -> Self { + let info = ContenderSessionInfo { + id, + name: params.name, + rpc_url: params.rpc_url, + status: SessionStatus::Initializing, + }; + + let contender = info.create_contender(params.test_config); + let (log_channel, _) = broadcast::channel(4096); + Self { + info, + contender: Some(contender), + log_channel, + cancel: CancellationToken::new(), + spam_cancel: None, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ContenderSessionInfo { + pub id: SessionId, + pub name: String, + pub rpc_url: Url, + pub status: SessionStatus, +} + +impl ContenderSessionInfo { + pub fn create_contender( + &self, + testconfig: TestConfig, + ) -> Contender { + // using in-memory SQLite for now; will switch to file-based if we need persistence across server restarts + let db = contender_sqlite::SqliteDb::new_memory(); + let seeder = contender_core::generator::RandSeed::seed_from_bytes(&self.id.to_be_bytes()); + let contender_ctx = + contender_core::ContenderCtx::builder(testconfig, db, seeder, self.rpc_url.clone()) + .build(); + + Contender::new(contender_ctx) + } +} + +pub struct ContenderSessionCache { + sessions: Vec, + log_sinks: SessionLogSinks, +} + +impl ContenderSessionCache { + pub fn new(log_sinks: SessionLogSinks) -> Self { + Self { + sessions: Vec::new(), + log_sinks, + } + } + + /// Generate a new session ID. This is currently just a random usize, but could be changed to something else (e.g. UUID) if we want to support more sessions or have better guarantees against collisions. + pub fn next_session_id(&self) -> SessionId { + self.gen_id( + self.sessions + .iter() + .map(|s| s.info.id) + .collect::>() + .as_slice(), + ) + } + + /// Generate a random session ID that is not currently in use. This is simpler than tracking used IDs and reusing them, and the ID space is large enough that collisions should be extremely rare. + /// In case of collision, we simply try again with a new random ID by recursing. + fn gen_id(&self, session_ids: &[SessionId]) -> SessionId { + let id = rand::random::() as SessionId; + if self.sessions.iter().all(|s| s.info.id != id) { + id + } else { + self.gen_id(session_ids) + } + } + + /// Add a new session to the cache. The ID is simply the index of the session in the vector. + /// The session is not initialized yet, the caller is responsible for calling initialize on the session's contender before it's returned by the RPC provider. + /// + /// Returns a mutable reference to the newly added session, + /// which can be used to call initialize on it before it's returned by the RPC provider. + pub fn add_session(&mut self, params: NewSessionParams) -> &mut ContenderSession { + let session = ContenderSession::new(self.next_session_id(), params); + let info = session.info.clone(); + let log_channel = session.log_channel.clone(); + + // Register the broadcast sender in the log sinks so the tracing layer can route to it. + if let Ok(mut sinks) = self.log_sinks.try_write() { + sinks.insert(info.id, log_channel); + } + + self.sessions.push(session); + self.sessions.last_mut().expect("just pushed, should exist") + } + + pub fn get_session(&self, id: SessionId) -> Option<&ContenderSession> { + self.sessions.iter().find(|s| s.info.id == id) + } + + pub fn get_session_mut(&mut self, id: SessionId) -> Option<&mut ContenderSession> { + self.sessions.iter_mut().find(|s| s.info.id == id) + } + + /// Take the Contender out of a session so it can be used outside the lock. + pub fn take_contender( + &mut self, + id: SessionId, + ) -> Option> { + self.get_session_mut(id).and_then(|s| s.contender.take()) + } + + /// Put the Contender back into a session after initialization. + pub fn put_contender( + &mut self, + id: SessionId, + contender: Contender, + ) { + if let Some(session) = self.get_session_mut(id) { + session.contender = Some(contender); + } + } + + pub fn remove_session(&mut self, id: SessionId) { + if let Some(session) = self.get_session(id) { + // Stop any running spam before tearing down. + if let Some(ref token) = session.spam_cancel { + token.cancel(); + } + // Cancel subscriber streams before dropping the session. + session.cancel.cancel(); + } + // Deregister the log sink. + if let Ok(mut sinks) = self.log_sinks.try_write() { + sinks.remove(&id); + } + self.sessions.retain(|s| s.info.id != id); + } + + pub fn all_sessions(&self) -> Vec { + self.sessions.iter().map(|s| s.info.clone()).collect() + } + + pub fn num_sessions(&self) -> usize { + self.sessions.len() + } +} diff --git a/crates/server/src/sse.rs b/crates/server/src/sse.rs new file mode 100644 index 00000000..7a15b4d9 --- /dev/null +++ b/crates/server/src/sse.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use axum::{ + extract::{Path, State}, + response::sse::{Event, Sse}, + routing::get, + Router, +}; +use tokio::sync::RwLock; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +use crate::sessions::ContenderSessionCache; + +pub type SharedSessions = Arc>; + +/// Build an axum router that serves SSE log streams. +/// +/// `GET /logs/:session_id` — returns an SSE stream of log lines for the given session. +pub fn sse_router(sessions: SharedSessions) -> Router { + Router::new() + .route("/logs/{session_id}", get(logs_handler)) + .with_state(sessions) +} + +async fn logs_handler( + Path(session_id): Path, + State(sessions): State, +) -> Result< + Sse>>, + (axum::http::StatusCode, String), +> { + let sessions = sessions.read().await; + let session = sessions.get_session(session_id).ok_or_else(|| { + ( + axum::http::StatusCode::NOT_FOUND, + format!("Session {session_id} not found"), + ) + })?; + let rx = session.log_channel.subscribe(); + let cancel = session.cancel.clone(); + drop(sessions); + + let stream = cancel_on_remove(rx, cancel); + + Ok(Sse::new(stream)) +} + +/// Wraps a broadcast receiver into a stream that terminates when the cancel token fires. +fn cancel_on_remove( + mut rx: tokio::sync::broadcast::Receiver, + cancel: CancellationToken, +) -> impl Stream> { + let (tx, mpsc_rx) = tokio::sync::mpsc::channel::>(256); + tokio::spawn(async move { + loop { + tokio::select! { + result = rx.recv() => { + match result { + Ok(msg) => { + if tx.send(Ok(Event::default().data(msg))).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!("SSE broadcast lag: skipped {n} messages"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + _ = cancel.cancelled() => break, + } + } + }); + ReceiverStream::new(mpsc_rx) +} diff --git a/crates/server/static/builtin_scenarios.js b/crates/server/static/builtin_scenarios.js new file mode 100644 index 00000000..ea666746 --- /dev/null +++ b/crates/server/static/builtin_scenarios.js @@ -0,0 +1,59 @@ +// AUTO-GENERATED by build.rs — do not edit +const BUILTIN_SCENARIOS = { + "blobs": [ + { name: "blob_data", type: "text", default: "0xdeadbeef", help: "Blob data. Values can be hexidecimal or UTF-8 strings.", optional: false }, + { name: "recipient", type: "text", default: null, help: "The recipient of the blob transactions. Defaults to sender's address. May be a contract placeholder from a previous contender setup.", optional: true }, + ], + "contract": [ + { name: "contract_path", type: "text", default: null, help: "Path to smart contract source. Format: :", optional: false }, + { name: "constructor_args", type: "text", default: null, help: "Comma-separated constructor arguments. Format: \"arg1, arg2, ...\" ", optional: true }, + { name: "setup_calls", type: "text", default: null, help: "Setup function calls that run once before spamming. May be specified multiple times. Example: `--spam \"setNumber(123456)\"`", optional: false }, + { name: "spam_calls", type: "text", default: null, help: "Spam function calls. May be specified multiple times. Example: `--spam \"setNumber(123456)\"`", optional: false }, + ], + "eth-functions": [ + { name: "opcodes", type: "multi-select:Stop,Add,Mul,Sub,Div,Sdiv,Mod,Smod,Addmod,Mulmod,Exp,Signextend,Lt,Gt,Slt,Sgt,Eq,Iszero,And,Or,Xor,Not,Byte,Shl,Shr,Sar,Sha3,Keccak256,Address,Balance,Origin,Caller,Callvalue,Calldataload,Calldatasize,Calldatacopy,Codesize,Codecopy,Gasprice,Extcodesize,Extcodecopy,Returndatasize,Returndatacopy,Extcodehash,Blockhash,Coinbase,Timestamp,Number,Prevrandao,Gaslimit,Chainid,Selfbalance,Basefee,Pop,Mload,Mstore,Mstore8,Sload,Sstore,Msize,Gas,Log0,Log1,Log2,Log3,Log4,Create,Call,Callcode,Return,Delegatecall,Create2,Staticcall,Revert,Invalid,Selfdestruct", default: null, help: "Comma-separated list of opcodes to call in spam transactions.", optional: false }, + { name: "precompiles", type: "multi-select:HashSha256,HashRipemd160,Identity,ModExp,EcAdd,EcMul,EcPairing,Blake2f", default: null, help: "Comma-separated list of precompiles to call in spam transactions.", optional: false }, + { name: "num_iterations", type: "number", default: "10", help: "Number of times to call an opcode/precompile in a single transaction.", optional: false }, + ], + "erc20": [ + { name: "send_amount", type: "text", default: "DEFAULT_TOKENS_SENT", help: "The amount to send in each spam tx.", optional: false }, + { name: "fund_amount", type: "text", default: "DEFAULT_TOKENS_FUNDED", help: "The amount of tokens to give each spammer account before spamming starts.", optional: false }, + { name: "token_recipient", type: "text", default: null, help: "The address to receive tokens sent by spam txs. By default, address(0) receives the tokens.", optional: true }, + ], + "fill-block": [ + { name: "max_gas_per_block", type: "number", default: null, help: "Override gas used per block. By default, the block limit is used.", optional: true }, + ], + "revert": [ + { name: "gas_use", type: "number", default: "30_000", help: "Amount of gas to use before reverting. This number + 35k gas is added to each tx's gas limit.", optional: false }, + ], + "set-code": [ + { name: "contract_address", type: "text", default: null, help: "The contract address containing the bytecode to copy into the sender's EOA. May be a placeholder. If not set, a test contract will be deployed.", optional: true }, + { name: "signature", type: "text", default: null, help: "The solidity signature of the function to call after setCode changes the account's bytecode.\nExample (smart wallet):\n--sig \"execute((address to, uint256 value, bytes data)[])\"", optional: true }, + { name: "args", type: "text", default: null, help: "Comma-separated arguments to the function being called on the EOA after the setCode transaction executes.\nExample (smart wallet):\n--args \"0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266,0,0xd09de08a\"", optional: true }, + ], + "storage": [ + { name: "num_slots", type: "number", default: "500", help: "Number of storage slots to fill with random data.", optional: false }, + { name: "num_iterations", type: "number", default: "1", help: "Number of times to write over each storage slot.", optional: false }, + ], + "stress": [ + { name: "disable_storage", type: "bool", default: "false", help: "Remove storage stress txs from the scenario.", optional: false }, + { name: "disable_transfers", type: "bool", default: "false", help: "Remove transfer stress txs from the scenario.", optional: false }, + { name: "disable_opcodes", type: "multi-select:Stop,Add,Mul,Sub,Div,Sdiv,Mod,Smod,Addmod,Mulmod,Exp,Signextend,Lt,Gt,Slt,Sgt,Eq,Iszero,And,Or,Xor,Not,Byte,Shl,Shr,Sar,Sha3,Keccak256,Address,Balance,Origin,Caller,Callvalue,Calldataload,Calldatasize,Calldatacopy,Codesize,Codecopy,Gasprice,Extcodesize,Extcodecopy,Returndatasize,Returndatacopy,Extcodehash,Blockhash,Coinbase,Timestamp,Number,Prevrandao,Gaslimit,Chainid,Selfbalance,Basefee,Pop,Mload,Mstore,Mstore8,Sload,Sstore,Msize,Gas,Log0,Log1,Log2,Log3,Log4,Create,Call,Callcode,Return,Delegatecall,Create2,Staticcall,Revert,Invalid,Selfdestruct", default: null, help: "Comma-separated list of opcodes to be ignored in the scenario.", optional: true }, + { name: "disable_precompiles", type: "multi-select:HashSha256,HashRipemd160,Identity,ModExp,EcAdd,EcMul,EcPairing,Blake2f", default: null, help: "Comma-separated list of precompiles to be ignored in the scenario.", optional: true }, + { name: "disable_all_precompiles", type: "bool", default: "false", help: "Disable all precompiles in the scenario.", optional: false }, + { name: "disable_all_opcodes", type: "bool", default: "false", help: "Disable all opcodes in the scenario.", optional: false }, + { name: "opcode_iterations", type: "number", default: "10", help: "Number of times to call an opcode in a single tx.", optional: false }, + { name: "with_fails", type: "bool", default: "false", help: "Enables all precompiles & opcodes. By default, the ones that typically fail are disabled.", optional: false }, + ], + "transfers": [ + { name: "amount", type: "text", default: "0.001 eth", help: "Amount of tokens to transfer in each transaction.", optional: false }, + { name: "recipient", type: "text", default: null, help: "Address to receive ether sent from spammers.", optional: true }, + ], + "uni-v2": [ + { name: "num_tokens", type: "number", default: "2", help: "The number of tokens to create in the scenario. Each token will be paired with WETH and each other token.", optional: false }, + { name: "weth_per_token", type: "text", default: "1 eth", help: "The amount of ETH to deposit into each TOKEN pool. One additional multiple of this is also minted for trading.", optional: false }, + { name: "initial_token_supply", type: "text", default: "5000000 eth", help: "The initial amount minted for each token. 50% of this will be deposited among trading pools. Units must be provided, e.g. '1 eth' to mint 1 token with 1e18 decimal precision.", optional: false }, + { name: "weth_trade_amount", type: "text", default: null, help: "The amount of WETH to trade in the scenario. If not provided, 0.01% of the pool's initial WETH will be traded for each token. Units must be provided, e.g. '0.1 eth'.", optional: true }, + { name: "token_trade_amount", type: "text", default: null, help: "The amount of tokens to trade in the scenario. If not provided, 0.01% of the initial supply will be traded for each token.", optional: true }, + ], +}; diff --git a/crates/server/static/index.html b/crates/server/static/index.html new file mode 100644 index 00000000..8e80819c --- /dev/null +++ b/crates/server/static/index.html @@ -0,0 +1,734 @@ + + + + + +Contender Server + + + + +

Contender Server

+
+ + + + disconnected +
+ +
+
+

Controls

+
+
+

Sessions

+
No sessions
+
+
+ + + + + + diff --git a/crates/testfile/src/lib.rs b/crates/testfile/src/lib.rs index b4c41753..92988e44 100644 --- a/crates/testfile/src/lib.rs +++ b/crates/testfile/src/lib.rs @@ -607,7 +607,7 @@ mod more_tests { let spammer = TimedSpammer::new(Duration::from_secs(1)); let callback = NilCallback; let opts = RunOpts::new().txs_per_period(100).periods(3); - contender.spam(spammer, callback.into(), opts).await?; + contender.spam(spammer, callback.into(), opts, None).await?; Ok(()) }