diff --git a/.ethexe.example.local.toml b/.ethexe.example.local.toml index b48a81f72c1..3434ae0fb1e 100644 --- a/.ethexe.example.local.toml +++ b/.ethexe.example.local.toml @@ -184,6 +184,26 @@ block-time = 1 # (optional, default: false). # no-rpc = false +# Flag to enable RocksDB snapshot download RPC API. +# (optional, default: false). +# snapshot = false + +# Bearer token used by `snapshot_download`. +# Must be provided when `snapshot = true`. +# snapshot-token = "replace-with-strong-random-token" + +# Snapshot stream chunk size in bytes. +# (optional, default: 1048576). +# snapshot-chunk-bytes = 1048576 + +# Snapshot artifact retention in seconds. +# (optional, default: 600). +# snapshot-retention-secs = 600 + +# Maximum concurrent snapshot downloads. +# (optional, default: 1). +# snapshot-max-concurrent = 1 + ########################################################################################## ### Prometheus (metrics) service parameters. diff --git a/.ethexe.example.toml b/.ethexe.example.toml index f39a6d6f1ba..d376b961bce 100644 --- a/.ethexe.example.toml +++ b/.ethexe.example.toml @@ -184,6 +184,26 @@ # (optional, default: false). # no-rpc = false +# Flag to enable RocksDB snapshot download RPC API. +# (optional, default: false). +# snapshot = false + +# Bearer token used by `snapshot_download`. +# Must be provided when `snapshot = true`. +# snapshot-token = "replace-with-strong-random-token" + +# Snapshot stream chunk size in bytes. +# (optional, default: 1048576). +# snapshot-chunk-bytes = 1048576 + +# Snapshot artifact retention in seconds. +# (optional, default: 600). +# snapshot-retention-secs = 600 + +# Maximum concurrent snapshot downloads. +# (optional, default: 1). +# snapshot-max-concurrent = 1 + ########################################################################################## ### Prometheus (metrics) service parameters. diff --git a/Cargo.lock b/Cargo.lock index 10d793331b9..c117aa19dd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,9 +135,9 @@ dependencies = [ [[package]] name = "alloy-chains" -version = "0.2.30" +version = "0.2.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90f374d3c6d729268bbe2d0e0ff992bb97898b2df756691a62ee1d5f0506bc39" +checksum = "3842d8c52fcd3378039f4703dba392dca8b546b1c8ed6183048f8dab95b2be78" dependencies = [ "alloy-primitives", "num_enum 0.7.5", @@ -421,9 +421,9 @@ dependencies = [ [[package]] name = "alloy-primitives" -version = "1.5.7" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3b431b4e72cd8bd0ec7a50b4be18e73dab74de0dba180eef171055e5d5926e" +checksum = "f6a0fb18dd5fb43ec5f0f6a20be1ce0287c79825827de5744afaa6c957737c33" dependencies = [ "alloy-rlp", "bytes", @@ -444,6 +444,7 @@ dependencies = [ "rustc-hash 2.1.1", "serde", "sha3", + "tiny-keccak", ] [[package]] @@ -5354,6 +5355,7 @@ name = "ethexe-rpc" version = "1.10.0" dependencies = [ "anyhow", + "clap 4.5.54", "dashmap 5.5.3", "ethexe-common", "ethexe-db", @@ -5363,6 +5365,7 @@ dependencies = [ "gear-core", "gear-workspace-hack", "gprimitives", + "hex", "hyper 1.8.1", "jsonrpsee", "metrics", @@ -5370,12 +5373,16 @@ dependencies = [ "ntest", "parity-scale-codec", "serde", + "sha2 0.10.9", "sp-core", + "tar", + "tempfile", "tokio", "tower 0.4.13", "tower-http 0.5.2", "tracing", "tracing-subscriber", + "zstd 0.13.3", ] [[package]] @@ -7412,13 +7419,12 @@ dependencies = [ "indexmap 2.13.0", "ipnet", "itertools 0.10.5", - "itertools 0.11.0", + "itertools 0.13.0", "js-sys", "jsonrpsee", "jsonrpsee-client-transport", "jsonrpsee-core", "k256", - "keccak", "libc", "libp2p 0.52.4", "libp2p-identity", @@ -7541,7 +7547,6 @@ dependencies = [ "signature", "slice-group-by", "smallvec", - "socket2 0.4.10", "soketto", "sp-allocator", "sp-api", @@ -7629,6 +7634,7 @@ dependencies = [ "wasmtime-runtime", "winnow", "zeroize", + "zstd-sys", ] [[package]] @@ -9413,9 +9419,9 @@ dependencies = [ [[package]] name = "keccak-asm" -version = "0.1.5" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b646a74e746cd25045aa0fd42f4f7f78aa6d119380182c7e63a5593c4ab8df6f" +checksum = "505d1856a39b200489082f90d897c3f07c455563880bc5952e38eabf731c83b6" dependencies = [ "digest 0.10.7", "sha3-asm", @@ -16859,9 +16865,9 @@ dependencies = [ [[package]] name = "sha3-asm" -version = "0.1.5" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b31139435f327c93c6038ed350ae4588e2c70a13d50599509fee6349967ba35a" +checksum = "c28efc5e327c837aa837c59eae585fc250715ef939ac32881bcc11677cd02d46" dependencies = [ "cc", "cfg-if", @@ -21579,6 +21585,15 @@ dependencies = [ "zstd-safe 6.0.6", ] +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe 7.2.4", +] + [[package]] name = "zstd-safe" version = "5.0.2+zstd.1.5.2" @@ -21599,6 +21614,15 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.16+zstd.1.5.7" diff --git a/ethexe/cli/src/params/mod.rs b/ethexe/cli/src/params/mod.rs index 2f2e7ea444a..7682c07d26b 100644 --- a/ethexe/cli/src/params/mod.rs +++ b/ethexe/cli/src/params/mod.rs @@ -93,7 +93,10 @@ impl Params { .transpose() }) .transpose()?; - let rpc = rpc.and_then(|p| p.into_config(&node)); + let rpc = match rpc { + Some(params) => params.into_config(&node)?, + None => None, + }; let prometheus = prometheus.and_then(|p| p.into_config()); Ok(Config { node, diff --git a/ethexe/cli/src/params/rpc.rs b/ethexe/cli/src/params/rpc.rs index a2c36978d4e..17b088e0336 100644 --- a/ethexe/cli/src/params/rpc.rs +++ b/ethexe/cli/src/params/rpc.rs @@ -17,8 +17,9 @@ // along with this program. If not, see . use super::MergeParams; +use anyhow::{Result, anyhow}; use clap::Parser; -use ethexe_rpc::{DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER, RpcConfig}; +use ethexe_rpc::{DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER, RpcConfig, SnapshotRpcConfig}; use ethexe_service::config::NodeConfig; use serde::Deserialize; use std::{ @@ -52,6 +53,31 @@ pub struct RpcParams { #[arg(long)] pub gas_limit_multiplier: Option, + + /// Flag to enable snapshot download RPC API. + #[arg(long)] + #[serde(default)] + pub snapshot: bool, + + /// Bearer token for snapshot download RPC authorization. + #[arg(long)] + #[serde(rename = "snapshot-token")] + pub snapshot_token: Option, + + /// Snapshot chunk size in bytes. + #[arg(long)] + #[serde(rename = "snapshot-chunk-bytes")] + pub snapshot_chunk_bytes: Option, + + /// Snapshot retention period in seconds. + #[arg(long)] + #[serde(rename = "snapshot-retention-secs")] + pub snapshot_retention_secs: Option, + + /// Max amount of concurrent snapshot downloads. + #[arg(long)] + #[serde(rename = "snapshot-max-concurrent")] + pub snapshot_max_concurrent: Option, } impl RpcParams { @@ -59,9 +85,9 @@ impl RpcParams { pub const DEFAULT_RPC_PORT: u16 = 9944; /// Convert self into a proper `RpcConfig` object, if RPC service is enabled. - pub fn into_config(self, node_config: &NodeConfig) -> Option { + pub fn into_config(self, node_config: &NodeConfig) -> Result> { if self.no_rpc { - return None; + return Ok(None); } let ipv4_addr = if self.rpc_external { @@ -91,14 +117,49 @@ impl RpcParams { .gas_limit_multiplier .unwrap_or(DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER); - Some(RpcConfig { + let snapshot = if self.snapshot { + let auth_bearer_token = self.snapshot_token.ok_or_else(|| { + anyhow!("`snapshot-token` must be provided when `snapshot` rpc is enabled") + })?; + if auth_bearer_token.is_empty() { + return Err(anyhow!( + "`snapshot-token` must be non-empty when `snapshot` rpc is enabled" + )); + } + Some(SnapshotRpcConfig { + auth_bearer_token, + chunk_size_bytes: self + .snapshot_chunk_bytes + .unwrap_or(SnapshotRpcConfig::DEFAULT_CHUNK_SIZE_BYTES) + .max(1), + retention_secs: self + .snapshot_retention_secs + .unwrap_or(SnapshotRpcConfig::DEFAULT_RETENTION_SECS), + max_concurrent_downloads: self + .snapshot_max_concurrent + .unwrap_or(SnapshotRpcConfig::DEFAULT_MAX_CONCURRENT_DOWNLOADS) + .max(1), + }) + } else { + None + }; + + let gas_allowance = gas_limit_multiplier + .checked_mul(node_config.block_gas_limit) + .ok_or_else(|| { + anyhow!( + "rpc gas allowance overflow: gas_limit_multiplier={gas_limit_multiplier}, block_gas_limit={}", + node_config.block_gas_limit + ) + })?; + + Ok(Some(RpcConfig { listen_addr, cors, - gas_allowance: gas_limit_multiplier - .checked_mul(node_config.block_gas_limit) - .expect("RPC gas allowance overflow"), + gas_allowance, chunk_size: node_config.chunk_processing_threads, - }) + snapshot, + })) } } @@ -110,6 +171,15 @@ impl MergeParams for RpcParams { rpc_cors: self.rpc_cors.or(with.rpc_cors), no_rpc: self.no_rpc || with.no_rpc, gas_limit_multiplier: self.gas_limit_multiplier.or(with.gas_limit_multiplier), + snapshot: self.snapshot || with.snapshot, + snapshot_token: self.snapshot_token.or(with.snapshot_token), + snapshot_chunk_bytes: self.snapshot_chunk_bytes.or(with.snapshot_chunk_bytes), + snapshot_retention_secs: self + .snapshot_retention_secs + .or(with.snapshot_retention_secs), + snapshot_max_concurrent: self + .snapshot_max_concurrent + .or(with.snapshot_max_concurrent), } } } @@ -180,3 +250,65 @@ impl<'de> Deserialize<'de> for Cors { } } } + +#[cfg(test)] +mod tests { + use super::*; + use ethexe_service::config::ConfigPublicKey; + use tempfile::tempdir; + + fn node_config(block_gas_limit: u64) -> NodeConfig { + let database_dir = tempdir().expect("temporary directory should be created"); + let key_dir = tempdir().expect("temporary directory should be created"); + + NodeConfig { + database_path: database_dir.path().to_path_buf(), + key_path: key_dir.path().to_path_buf(), + validator: ConfigPublicKey::Disabled, + validator_session: ConfigPublicKey::Disabled, + eth_max_sync_depth: 0, + worker_threads: None, + blocking_threads: None, + chunk_processing_threads: 2, + block_gas_limit, + canonical_quarantine: 0, + dev: false, + pre_funded_accounts: 0, + fast_sync: false, + chain_deepness_threshold: 0, + } + } + + #[test] + fn rejects_empty_snapshot_token() { + let params = RpcParams { + snapshot: true, + snapshot_token: Some(String::new()), + ..Default::default() + }; + + let err = params + .into_config(&node_config(1)) + .expect_err("empty snapshot token should be rejected"); + assert!( + err.to_string().contains("must be non-empty"), + "unexpected error: {err:#}" + ); + } + + #[test] + fn rejects_gas_allowance_overflow() { + let params = RpcParams { + gas_limit_multiplier: Some(u64::MAX), + ..Default::default() + }; + + let err = params + .into_config(&node_config(2)) + .expect_err("gas allowance overflow should be rejected"); + assert!( + err.to_string().contains("rpc gas allowance overflow"), + "unexpected error: {err:#}" + ); + } +} diff --git a/ethexe/db/src/rocks.rs b/ethexe/db/src/rocks.rs index 227030dd4b6..4470f6193dc 100644 --- a/ethexe/db/src/rocks.rs +++ b/ethexe/db/src/rocks.rs @@ -17,10 +17,13 @@ // along with this program. If not, see . use crate::{CASDatabase, KVDatabase}; -use anyhow::Result; +use anyhow::{Context as _, Result}; use gprimitives::H256; -use rocksdb::{DB, DBIteratorWithThreadMode, Options}; -use std::{path::PathBuf, sync::Arc}; +use rocksdb::{DB, DBIteratorWithThreadMode, Options, checkpoint::Checkpoint}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; /// Database for storing states and codes in memory. #[derive(Debug, Clone)] @@ -36,6 +39,15 @@ impl RocksDatabase { inner: Arc::new(db), }) } + + /// Create a physical RocksDB checkpoint at the provided destination path. + pub fn create_checkpoint(&self, path: impl AsRef) -> Result<()> { + let checkpoint = + Checkpoint::new(self.inner.as_ref()).context("failed to create rocksdb checkpoint")?; + checkpoint + .create_checkpoint(path) + .context("failed to materialize rocksdb checkpoint") + } } impl CASDatabase for RocksDatabase { @@ -227,4 +239,23 @@ mod tests { tests::kv_multi_thread(db); }); } + + #[test] + fn create_checkpoint_and_reopen() { + with_database(|db| { + let key = b"key"; + let value = b"value".to_vec(); + db.put(key, value.clone()); + + let checkpoint_dir = + tempfile::tempdir().expect("Failed to create temporary directory for checkpoint"); + let checkpoint_path = checkpoint_dir.path().join("checkpoint"); + db.create_checkpoint(&checkpoint_path) + .expect("Failed to create RocksDB checkpoint"); + + let checkpoint_db = RocksDatabase::open(checkpoint_path) + .expect("Failed to open RocksDB checkpoint for read"); + assert_eq!(checkpoint_db.get(key), Some(value)); + }); + } } diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 9801aa2df98..32439b3d9db 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -7,6 +7,10 @@ license.workspace = true homepage.workspace = true repository.workspace = true +[[example]] +name = "snapshot_download_verify" +required-features = ["client"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -30,12 +34,20 @@ tracing.workspace = true dashmap.workspace = true metrics.workspace = true metrics-derive.workspace = true +sha2.workspace = true +hex.workspace = true +tar = "0.4" +zstd = "0.13" gear-workspace-hack.workspace = true [dev-dependencies] +clap = { workspace = true, features = ["derive"] } jsonrpsee = { workspace = true, features = ["client"] } ethexe-common = { workspace = true, features = ["std", "mock"] } +ethexe-db = { workspace = true, features = ["mock"] } ntest.workspace = true +tempfile.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } tracing-subscriber.workspace = true [features] diff --git a/ethexe/rpc/examples/snapshot_download_verify.rs b/ethexe/rpc/examples/snapshot_download_verify.rs new file mode 100644 index 00000000000..940b25cd856 --- /dev/null +++ b/ethexe/rpc/examples/snapshot_download_verify.rs @@ -0,0 +1,643 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use anyhow::{Context, Result, anyhow, bail, ensure}; +use clap::Parser; +use ethexe_common::{ + Announce, HashOf, ProtocolTimelines, SimpleBlockData, + db::{DBConfig, DBGlobals, GlobalsStorageRO}, + gear::MAX_BLOCK_GAS_LIMIT, +}; +use ethexe_db::{CASDatabase, Database, RawDatabase, RocksDatabase, VERSION}; +use ethexe_rpc::{ + RpcConfig, RpcServer, RpcService, SnapshotClient, SnapshotRpcConfig, SnapshotStreamItem, +}; +use gprimitives::H256; +use jsonrpsee::{ + server::ServerHandle, + ws_client::{HeaderMap, HeaderValue, WsClient, WsClientBuilder}, +}; +use sha2::{Digest as _, Sha256}; +use std::{ + fs::{self, File}, + io::{Cursor, Write as _}, + net::{Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, + str::FromStr, +}; +use tempfile::TempDir; + +#[derive(Debug, Parser)] +struct Cli { + /// Existing snapshot-enabled RPC endpoint to verify instead of spawning a local fixture server. + #[arg(long)] + ws_url: Option, + + /// Snapshot cases in the form `entry_count x entry_size_bytes`. + #[arg(long = "case", value_name = "ENTRY_COUNTxENTRY_SIZE_BYTES")] + cases: Vec, + + /// Bearer token used to authorize snapshot downloads. + #[arg(long, default_value = "snapshot-token")] + token: String, + + /// Chunk size configured on the snapshot RPC server. + #[arg(long, default_value_t = 32 * 1024)] + chunk_bytes: usize, + + /// Snapshot retention configured on the snapshot RPC server. + #[arg(long, default_value_t = 600)] + retention_secs: u64, + + /// Maximum concurrent downloads configured on the snapshot RPC server. + #[arg(long, default_value_t = 1)] + max_concurrent: u32, + + /// Optional directory to persist downloaded archives and extracted checkpoints. + #[arg(long)] + output_dir: Option, +} + +#[derive(Clone, Debug)] +struct SnapshotCase { + entry_count: usize, + entry_size: usize, +} + +impl SnapshotCase { + fn label(&self) -> String { + format!("{}x{}", self.entry_count, self.entry_size) + } +} + +impl FromStr for SnapshotCase { + type Err = anyhow::Error; + + fn from_str(value: &str) -> Result { + let (entry_count, entry_size) = value.split_once('x').ok_or_else(|| { + anyhow!("invalid case `{value}`, expected format ENTRY_COUNTxENTRY_SIZE_BYTES") + })?; + + let entry_count = entry_count + .parse() + .with_context(|| format!("invalid entry count in case `{value}`"))?; + let entry_size = entry_size + .parse() + .with_context(|| format!("invalid entry size in case `{value}`"))?; + + Ok(Self { + entry_count, + entry_size, + }) + } +} + +#[derive(Debug)] +struct SnapshotFixture { + _temp_dir: TempDir, + rocks_db: RocksDatabase, + db: Database, + expected_block_hash: H256, + sample_hash: H256, + sample_payload: Vec, +} + +#[derive(Debug)] +struct DownloadedSnapshot { + snapshot_id: String, + block_hash: H256, + total_bytes: u64, + chunk_size: u64, + total_chunks: u64, + sha256_hex: String, + compression: String, + archive_bytes: Vec, +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + if let Some(output_dir) = &cli.output_dir { + fs::create_dir_all(output_dir) + .with_context(|| format!("failed to create output dir {}", output_dir.display()))?; + } + + if cli.ws_url.is_some() { + ensure!( + cli.cases.is_empty(), + "`--case` is only supported when the verifier spawns local fixture servers" + ); + return run_external_snapshot_check(&cli).await; + } + + let cases = parse_cases(&cli)?; + for case in cases { + run_case(&cli, &case).await?; + } + + Ok(()) +} + +fn parse_cases(cli: &Cli) -> Result> { + if cli.cases.is_empty() { + return Ok(vec![ + SnapshotCase { + entry_count: 8, + entry_size: 1024, + }, + SnapshotCase { + entry_count: 64, + entry_size: 32 * 1024, + }, + SnapshotCase { + entry_count: 128, + entry_size: 128 * 1024, + }, + ]); + } + + cli.cases.iter().map(|case| case.parse()).collect() +} + +async fn run_case(cli: &Cli, case: &SnapshotCase) -> Result<()> { + let fixture = SnapshotFixture::new(case)?; + let listen_addr = unused_local_addr()?; + let snapshot_config = SnapshotRpcConfig { + auth_bearer_token: cli.token.clone(), + chunk_size_bytes: cli.chunk_bytes.max(1), + retention_secs: cli.retention_secs, + max_concurrent_downloads: cli.max_concurrent.max(1), + }; + let (handle, _rpc) = start_snapshot_server(listen_addr, &fixture, snapshot_config).await?; + + let result = async { + let client = snapshot_client(format!("ws://{listen_addr}"), &cli.token).await?; + let downloaded = download_snapshot(&client).await?; + verify_downloaded_snapshot(&downloaded, &fixture, cli.output_dir.as_deref(), case)?; + + println!( + "verified case {}: bytes={}, chunks={}, sha256={}", + case.label(), + downloaded.total_bytes, + downloaded.total_chunks, + downloaded.sha256_hex + ); + + Ok(()) + } + .await; + + handle + .stop() + .map_err(|err| anyhow!("failed to stop snapshot rpc server: {err}"))?; + handle.stopped().await; + + result +} + +async fn run_external_snapshot_check(cli: &Cli) -> Result<()> { + let ws_url = cli + .ws_url + .as_ref() + .expect("checked above that external mode is enabled"); + let client = snapshot_client(ws_url.clone(), &cli.token).await?; + let downloaded = download_snapshot(&client).await?; + verify_external_snapshot(&downloaded, cli.output_dir.as_deref())?; + + println!( + "verified external snapshot: bytes={}, chunks={}, sha256={}, block_hash={}", + downloaded.total_bytes, + downloaded.total_chunks, + downloaded.sha256_hex, + downloaded.block_hash + ); + + Ok(()) +} + +impl SnapshotFixture { + fn new(case: &SnapshotCase) -> Result { + let temp_dir = tempfile::tempdir().context("failed to create temporary directory")?; + let rocks_db = RocksDatabase::open(temp_dir.path().to_path_buf()) + .context("SnapshotFixture: failed to open rocks database")?; + let db_raw = RawDatabase::from_one(&rocks_db); + + db_raw.kv.set_config(DBConfig { + version: VERSION, + chain_id: 0, + router_address: Default::default(), + timelines: ProtocolTimelines::default(), + genesis_block_hash: H256::from_low_u64_be(1), + genesis_announce_hash: HashOf::::zero(), + }); + + let expected_block_hash = H256::from_low_u64_be(42); + db_raw.kv.set_globals(DBGlobals { + start_block_hash: H256::from_low_u64_be(1), + start_announce_hash: HashOf::::zero(), + latest_synced_block: SimpleBlockData { + hash: expected_block_hash, + header: Default::default(), + }, + latest_prepared_block_hash: expected_block_hash, + latest_computed_announce_hash: HashOf::::zero(), + }); + + let db = Database::try_from_raw(db_raw) + .context("SnapshotFixture: failed to construct Database from RawDatabase")?; + + let mut sample = None; + for index in 0..case.entry_count { + let payload = pseudo_random_payload(index as u64 + 1, case.entry_size); + let hash = db.cas().write(&payload); + if sample.is_none() { + sample = Some((hash, payload)); + } + } + + let (sample_hash, sample_payload) = + sample.ok_or_else(|| anyhow!("snapshot case must contain at least one entry"))?; + + Ok(Self { + _temp_dir: temp_dir, + rocks_db, + db, + expected_block_hash, + sample_hash, + sample_payload, + }) + } +} + +fn pseudo_random_payload(seed: u64, len: usize) -> Vec { + let mut state = seed; + let mut payload = vec![0u8; len]; + + for byte in &mut payload { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + *byte = state as u8; + } + + payload +} + +async fn start_snapshot_server( + listen_addr: SocketAddr, + fixture: &SnapshotFixture, + snapshot_config: SnapshotRpcConfig, +) -> Result<(ServerHandle, RpcService)> { + let rpc_config = RpcConfig { + listen_addr, + cors: None, + gas_allowance: MAX_BLOCK_GAS_LIMIT, + chunk_size: 2, + snapshot: Some(snapshot_config), + }; + + RpcServer::new(rpc_config, fixture.db.clone()) + .with_snapshot_source(fixture.rocks_db.clone()) + .run_server() + .await + .context("failed to start snapshot rpc server") +} + +async fn snapshot_client(ws_url: String, token: &str) -> Result { + let mut headers = HeaderMap::new(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {token}")) + .context("failed to construct authorization header")?, + ); + + WsClientBuilder::new() + .set_headers(headers) + .build(ws_url) + .await + .context("failed to create snapshot ws client") +} + +async fn download_snapshot(client: &WsClient) -> Result { + let mut subscription = client + .download() + .await + .context("failed to create snapshot subscription")?; + + let manifest = subscription + .next() + .await + .ok_or_else(|| anyhow!("snapshot stream ended before manifest"))? + .context("failed to receive manifest item")?; + + let (snapshot_id, block_hash, total_bytes, chunk_size, total_chunks, sha256_hex, compression) = + match manifest { + SnapshotStreamItem::Manifest { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + .. + } => ( + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + ), + other => bail!("expected manifest item, got {other:?}"), + }; + + let mut archive_bytes = Vec::with_capacity(total_bytes as usize); + let mut received_chunks = 0u64; + + for expected_index in 0..total_chunks { + let item = subscription + .next() + .await + .ok_or_else(|| anyhow!("snapshot stream ended before chunk {expected_index}"))? + .context("failed to receive chunk item")?; + match item { + SnapshotStreamItem::Chunk { index, data } => { + ensure!( + index == expected_index, + "unexpected chunk index: expected {expected_index}, got {index}" + ); + archive_bytes.extend_from_slice(&data.0); + received_chunks += 1; + } + other => bail!("expected chunk item, got {other:?}"), + } + } + + let completed = subscription + .next() + .await + .ok_or_else(|| anyhow!("snapshot stream ended before completion"))? + .context("failed to receive completion item")?; + + match completed { + SnapshotStreamItem::Completed { + total_chunks: completed_chunks, + total_bytes: completed_bytes, + } => { + ensure!( + completed_chunks == total_chunks, + "completed chunk count mismatch: expected {total_chunks}, got {completed_chunks}" + ); + ensure!( + completed_bytes == total_bytes, + "completed byte count mismatch: expected {total_bytes}, got {completed_bytes}" + ); + } + other => bail!("expected completed item, got {other:?}"), + } + + ensure!( + received_chunks == total_chunks, + "received chunk count mismatch: expected {total_chunks}, got {received_chunks}" + ); + + Ok(DownloadedSnapshot { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + archive_bytes, + }) +} + +fn verify_downloaded_snapshot( + downloaded: &DownloadedSnapshot, + fixture: &SnapshotFixture, + output_dir: Option<&Path>, + case: &SnapshotCase, +) -> Result<()> { + ensure!( + downloaded.compression == "tar.zst", + "unexpected compression: {}", + downloaded.compression + ); + ensure!( + downloaded.block_hash == fixture.expected_block_hash, + "unexpected manifest block hash: expected {}, got {}", + fixture.expected_block_hash, + downloaded.block_hash + ); + ensure!( + downloaded.total_bytes == downloaded.archive_bytes.len() as u64, + "archive byte count mismatch: manifest={}, actual={}", + downloaded.total_bytes, + downloaded.archive_bytes.len() + ); + + let expected_chunk_count = if downloaded.total_bytes == 0 { + 0 + } else { + downloaded.total_bytes.div_ceil(downloaded.chunk_size) + }; + ensure!( + downloaded.total_chunks == expected_chunk_count, + "chunk count mismatch: manifest={}, expected={expected_chunk_count}", + downloaded.total_chunks + ); + + let mut hasher = Sha256::new(); + hasher.update(&downloaded.archive_bytes); + ensure!( + downloaded.sha256_hex == hex::encode(hasher.finalize()), + "sha256 mismatch for downloaded archive" + ); + + let artifact_dir_guard = match output_dir { + Some(output_dir) => ArtifactDir::persistent(output_dir.join(case.label()))?, + None => ArtifactDir::temporary()?, + }; + let archive_path = artifact_dir_guard.path().join("snapshot.tar.zst"); + File::create(&archive_path) + .with_context(|| format!("failed to create archive {}", archive_path.display()))? + .write_all(&downloaded.archive_bytes) + .with_context(|| format!("failed to write archive {}", archive_path.display()))?; + + let extracted_dir = artifact_dir_guard.path().join("extracted"); + if extracted_dir.exists() { + fs::remove_dir_all(&extracted_dir) + .with_context(|| format!("failed to clean extract dir {}", extracted_dir.display()))?; + } + fs::create_dir_all(&extracted_dir) + .with_context(|| format!("failed to create extract dir {}", extracted_dir.display()))?; + extract_snapshot_archive(&downloaded.archive_bytes, &extracted_dir)?; + + let reopened_db = RocksDatabase::open(extracted_dir.join("rocksdb")) + .context("failed to reopen extracted rocksdb checkpoint")?; + let reopened_database = RawDatabase::from_one(&reopened_db); + let reopened_database = Database::try_from_raw(reopened_database) + .context("failed to construct Database from RawDatabase")?; + let block_hash = { + let globals_db = reopened_database.globals(); + + globals_db.latest_synced_block.hash + }; + ensure!( + block_hash == fixture.expected_block_hash, + "unexpected synced block hash after extraction" + ); + ensure!( + reopened_db.read(fixture.sample_hash) == Some(fixture.sample_payload.clone()), + "sample payload missing from reopened checkpoint" + ); + + println!( + " artifact={} snapshot_id={}", + archive_path.display(), + downloaded.snapshot_id + ); + + Ok(()) +} + +fn verify_external_snapshot( + downloaded: &DownloadedSnapshot, + output_dir: Option<&Path>, +) -> Result<()> { + ensure!( + downloaded.compression == "tar.zst", + "unexpected compression: {}", + downloaded.compression + ); + ensure!( + downloaded.total_bytes == downloaded.archive_bytes.len() as u64, + "archive byte count mismatch: manifest={}, actual={}", + downloaded.total_bytes, + downloaded.archive_bytes.len() + ); + + let expected_chunk_count = if downloaded.total_bytes == 0 { + 0 + } else { + downloaded.total_bytes.div_ceil(downloaded.chunk_size) + }; + ensure!( + downloaded.total_chunks == expected_chunk_count, + "chunk count mismatch: manifest={}, expected={expected_chunk_count}", + downloaded.total_chunks + ); + + let mut hasher = Sha256::new(); + hasher.update(&downloaded.archive_bytes); + ensure!( + downloaded.sha256_hex == hex::encode(hasher.finalize()), + "sha256 mismatch for downloaded archive" + ); + + let artifact_dir_guard = match output_dir { + Some(output_dir) => ArtifactDir::persistent(output_dir.join("external"))?, + None => ArtifactDir::temporary()?, + }; + let archive_path = artifact_dir_guard.path().join("snapshot.tar.zst"); + File::create(&archive_path) + .with_context(|| format!("failed to create archive {}", archive_path.display()))? + .write_all(&downloaded.archive_bytes) + .with_context(|| format!("failed to write archive {}", archive_path.display()))?; + + let extracted_dir = artifact_dir_guard.path().join("extracted"); + if extracted_dir.exists() { + fs::remove_dir_all(&extracted_dir) + .with_context(|| format!("failed to clean extract dir {}", extracted_dir.display()))?; + } + fs::create_dir_all(&extracted_dir) + .with_context(|| format!("failed to create extract dir {}", extracted_dir.display()))?; + extract_snapshot_archive(&downloaded.archive_bytes, &extracted_dir)?; + + let reopened_db = RocksDatabase::open(extracted_dir.join("rocksdb")) + .context("failed to reopen extracted rocksdb checkpoint")?; + let reopened_database = RawDatabase::from_one(&reopened_db); + let reopened_database = Database::try_from_raw(reopened_database) + .context("failed to construct Database from RawDatabase")?; + let block_hash = { + let globals_db = reopened_database.globals(); + + globals_db.latest_synced_block.hash + }; + ensure!( + block_hash == downloaded.block_hash, + "manifest block hash does not match extracted checkpoint latest data" + ); + + println!( + " artifact={} snapshot_id={}", + archive_path.display(), + downloaded.snapshot_id + ); + + Ok(()) +} + +fn extract_snapshot_archive(archive_bytes: &[u8], extract_dir: &Path) -> Result<()> { + let decoder = zstd::Decoder::new(Cursor::new(archive_bytes)) + .context("failed to create zstd decoder for snapshot archive")?; + let mut archive = tar::Archive::new(decoder); + archive + .unpack(extract_dir) + .with_context(|| format!("failed to unpack archive into {}", extract_dir.display())) +} + +fn unused_local_addr() -> Result { + let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)) + .context("failed to bind ephemeral localhost port")?; + let addr = listener + .local_addr() + .context("failed to read ephemeral localhost address")?; + drop(listener); + Ok(addr) +} + +enum ArtifactDir { + Persistent(PathBuf), + Temporary(TempDir), +} + +impl ArtifactDir { + fn persistent(path: PathBuf) -> Result { + fs::create_dir_all(&path) + .with_context(|| format!("failed to create artifact dir {}", path.display()))?; + Ok(Self::Persistent(path)) + } + + fn temporary() -> Result { + tempfile::tempdir() + .context("failed to create temporary artifact dir") + .map(Self::Temporary) + } + + fn path(&self) -> &Path { + match self { + Self::Persistent(path) => path, + Self::Temporary(dir) => dir.path(), + } + } +} diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index 8ed642f4ca1..b009f2ce9b6 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -20,13 +20,16 @@ mod block; mod code; mod injected; mod program; +mod snapshot; pub use block::{BlockApi, BlockServer}; pub use code::{CodeApi, CodeServer}; pub use injected::{InjectedApi, InjectedServer}; pub use program::{FullProgramState, ProgramApi, ProgramServer}; +pub use snapshot::{SnapshotApi, SnapshotServer, SnapshotStreamItem}; #[cfg(feature = "client")] pub use crate::apis::{ block::BlockClient, code::CodeClient, injected::InjectedClient, program::ProgramClient, + snapshot::SnapshotClient, }; diff --git a/ethexe/rpc/src/apis/snapshot.rs b/ethexe/rpc/src/apis/snapshot.rs new file mode 100644 index 00000000000..7e248aec3b4 --- /dev/null +++ b/ethexe/rpc/src/apis/snapshot.rs @@ -0,0 +1,500 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{SnapshotRpcConfig, errors}; +use anyhow::{Context as _, Result, anyhow}; +use dashmap::DashSet; +use ethexe_common::db::GlobalsStorageRO; +use ethexe_db::{Database, RocksDatabase}; +use gprimitives::H256; +use jsonrpsee::{ + PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, + core::{SubscriptionResult, async_trait}, + proc_macros::rpc, +}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest as _, Sha256}; +use sp_core::Bytes; +use std::{ + fs::{self, File}, + io::Read as _, + path::{Path, PathBuf}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; + +const SNAPSHOT_ARCHIVE_NAME: &str = "snapshot.tar.zst"; +const SNAPSHOT_CHECKPOINT_DIR_NAME: &str = "checkpoint"; +const SNAPSHOT_COMPRESSION: &str = "tar.zst"; +// 1KiB +const SNAPSHOT_MIN_CHUNK_IN_BYTES: usize = 1_024; + +static SNAPSHOT_SERVICE_COUNTER: AtomicU64 = AtomicU64::new(0); + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +#[non_exhaustive] +pub enum SnapshotStreamItem { + Manifest { + snapshot_id: String, + block_hash: H256, + total_bytes: u64, + chunk_size: u64, + total_chunks: u64, + sha256_hex: String, + compression: String, + }, + Chunk { + index: u64, + data: Bytes, + }, + Completed { + total_chunks: u64, + total_bytes: u64, + }, +} + +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "snapshot"))] +#[cfg_attr(feature = "client", rpc(server, client, namespace = "snapshot"))] +pub trait Snapshot { + #[subscription(name = "download", unsubscribe = "downloadUnsubscribe", item = SnapshotStreamItem)] + async fn download(&self) -> SubscriptionResult; +} + +#[derive(Clone)] +pub struct SnapshotApi { + service: Arc, +} + +impl SnapshotApi { + pub fn new(db: Database, rocks_db: RocksDatabase, cfg: SnapshotRpcConfig) -> Self { + Self { + service: Arc::new(SnapshotService::new(db, rocks_db, cfg)), + } + } +} + +#[async_trait] +impl SnapshotServer for SnapshotApi { + async fn download(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { + let permit = self + .service + .concurrency_limiter + .clone() + .try_acquire_owned() + .map_err(|_| errors::unavailable("too many concurrent snapshot downloads"))?; + + let service = self.service.clone(); + let snapshot = tokio::task::spawn_blocking(move || service.prepare_snapshot()) + .await + .map_err(|err| errors::internal_with(format!("snapshot worker panicked: {err}")))? + .map_err(|err| errors::internal_with(format!("failed to prepare snapshot: {err:#}")))?; + + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(err) => { + self.service + .clone() + .cleanup_snapshot_async(snapshot.work_dir) + .await; + return Err(err.into()); + } + }; + + self.service + .clone() + .spawn_streaming_task(sink, snapshot, permit); + + Ok(()) + } +} + +#[derive(Debug)] +struct SnapshotService { + db: Database, + rocks_db: RocksDatabase, + chunk_size_bytes: usize, + retention: Duration, + service_prefix: String, + work_root: PathBuf, + concurrency_limiter: Arc, + active_work_dirs: DashSet, + id_counter: AtomicU64, +} + +#[derive(Debug)] +struct PreparedSnapshot { + snapshot_id: String, + block_hash: H256, + archive_path: PathBuf, + work_dir: PathBuf, + total_bytes: u64, + chunk_size_bytes: usize, + total_chunks: u64, + sha256_hex: String, +} + +impl SnapshotService { + fn new(db: Database, rocks_db: RocksDatabase, cfg: SnapshotRpcConfig) -> Self { + let chunk_size_bytes = cfg.chunk_size_bytes.max(SNAPSHOT_MIN_CHUNK_IN_BYTES); + let max_concurrent_downloads = cfg.max_concurrent_downloads.max(1); + let service_prefix = format!( + "svc{:x}", + SNAPSHOT_SERVICE_COUNTER.fetch_add(1, Ordering::Relaxed) + ); + + Self { + db, + rocks_db, + chunk_size_bytes, + retention: Duration::from_secs(cfg.retention_secs), + work_root: std::env::temp_dir() + .join("ethexe-rpc-snapshots") + .join(&service_prefix), + service_prefix, + concurrency_limiter: Arc::new(Semaphore::new(max_concurrent_downloads as usize)), + active_work_dirs: DashSet::default(), + id_counter: AtomicU64::new(0), + } + } + + fn prepare_snapshot(&self) -> Result { + if let Err(err) = self.cleanup_stale_snapshots() { + tracing::warn!("failed to cleanup stale snapshot artifacts: {err:#}"); + } + + fs::create_dir_all(&self.work_root).with_context(|| { + format!( + "failed to create snapshot workspace {}", + self.work_root.display() + ) + })?; + + let snapshot_id = self.next_snapshot_id(); + let work_dir = self.work_root.join(&snapshot_id); + let checkpoint_dir = work_dir.join(SNAPSHOT_CHECKPOINT_DIR_NAME); + let archive_path = work_dir.join(SNAPSHOT_ARCHIVE_NAME); + + fs::create_dir_all(&work_dir) + .with_context(|| format!("failed to create work dir {}", work_dir.display()))?; + + self.active_work_dirs.insert(work_dir.clone()); + + let prepared = (|| { + let work_dir_for_result = work_dir.clone(); + let block_hash = { + let globals_db = self.db.globals(); + + globals_db.latest_synced_block.hash + }; + + self.rocks_db + .create_checkpoint(&checkpoint_dir) + .with_context(|| { + format!( + "failed to create rocksdb checkpoint at {}", + checkpoint_dir.display() + ) + })?; + + Self::pack_checkpoint_archive(&checkpoint_dir, &archive_path)?; + + let (total_bytes, sha256_hex) = Self::compute_file_metadata(&archive_path)?; + let total_chunks = if total_bytes == 0 { + 0 + } else { + total_bytes.div_ceil(self.chunk_size_bytes as u64) + }; + + Ok(PreparedSnapshot { + snapshot_id, + block_hash, + archive_path, + work_dir: work_dir_for_result, + total_bytes, + chunk_size_bytes: self.chunk_size_bytes, + total_chunks, + sha256_hex, + }) + })(); + + if prepared.is_err() { + self.cleanup_snapshot(work_dir); + } + + prepared + } + + fn pack_checkpoint_archive(checkpoint_dir: &Path, archive_path: &Path) -> Result<()> { + let archive = File::create(archive_path) + .with_context(|| format!("failed to create {}", archive_path.display()))?; + + let mut encoder = + zstd::Encoder::new(archive, 3).context("failed to initialize zstd encoder")?; + { + let mut tar_builder = tar::Builder::new(&mut encoder); + tar_builder + .append_dir_all("rocksdb", checkpoint_dir) + .with_context(|| { + format!( + "failed to append checkpoint directory {} to tar", + checkpoint_dir.display() + ) + })?; + tar_builder + .finish() + .context("failed to finish tar archive")?; + } + + encoder.finish().context("failed to finish zstd stream")?; + + Ok(()) + } + + fn compute_file_metadata(path: &Path) -> Result<(u64, String)> { + let mut file = File::open(path) + .with_context(|| format!("failed to open snapshot archive {}", path.display()))?; + + let mut hasher = Sha256::new(); + let mut total_bytes = 0u64; + let mut buffer = [0u8; 64 * 1024]; + + loop { + let read = file + .read(&mut buffer) + .with_context(|| format!("failed to read archive {}", path.display()))?; + + if read == 0 { + break; + } + + hasher.update(&buffer[..read]); + total_bytes += read as u64; + } + + Ok((total_bytes, hex::encode(hasher.finalize()))) + } + + fn next_snapshot_id(&self) -> String { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let id = self.id_counter.fetch_add(1, Ordering::Relaxed); + + format!("{}-{timestamp:x}-{id:x}", self.service_prefix) + } + + fn cleanup_stale_snapshots(&self) -> Result<()> { + let read_dir = match fs::read_dir(&self.work_root) { + Ok(read_dir) => read_dir, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(err) => { + return Err(err).with_context(|| { + format!("failed to list snapshot dir {}", self.work_root.display()) + }); + } + }; + + for entry in read_dir { + let entry = entry.context("failed to read entry from snapshot root")?; + let path = entry.path(); + if !path.is_dir() || self.active_work_dirs.contains(&path) { + continue; + } + + let modified = entry + .metadata() + .with_context(|| format!("failed to read metadata for {}", path.display()))? + .modified() + .with_context(|| format!("failed to read mtime for {}", path.display()))?; + + let age = modified.elapsed().unwrap_or_default(); + if age >= self.retention { + fs::remove_dir_all(&path).with_context(|| { + format!("failed to remove stale snapshot {}", path.display()) + })?; + } + } + + Ok(()) + } + + fn cleanup_snapshot(&self, work_dir: PathBuf) { + self.active_work_dirs.remove(&work_dir); + if let Err(err) = fs::remove_dir_all(&work_dir) + && err.kind() != std::io::ErrorKind::NotFound + { + tracing::warn!( + "failed to remove snapshot artifact {}: {err}", + work_dir.display() + ); + } + } + + fn spawn_streaming_task( + self: Arc, + sink: SubscriptionSink, + snapshot: PreparedSnapshot, + _permit: OwnedSemaphorePermit, + ) { + tokio::spawn(async move { + let work_dir = snapshot.work_dir.clone(); + + let res = self.stream_snapshot(&sink, &snapshot).await; + if let Err(err) = res { + tracing::warn!( + "failed to stream snapshot {}: {err:#}", + snapshot.snapshot_id + ); + } + + self.cleanup_after_streaming_async(work_dir).await; + }); + } + + async fn cleanup_after_streaming_async(self: Arc, work_dir: PathBuf) { + let service = self; + if let Err(err) = tokio::task::spawn_blocking(move || { + service.cleanup_snapshot(work_dir); + if let Err(err) = service.cleanup_stale_snapshots() { + tracing::warn!("failed to cleanup stale snapshots after streaming: {err:#}"); + } + }) + .await + { + tracing::warn!("snapshot cleanup worker panicked: {err}"); + } + } + + async fn cleanup_snapshot_async(self: Arc, work_dir: PathBuf) { + let service = self; + if let Err(err) = + tokio::task::spawn_blocking(move || service.cleanup_snapshot(work_dir)).await + { + tracing::warn!("snapshot cleanup worker panicked: {err}"); + } + } + + async fn stream_snapshot( + &self, + sink: &SubscriptionSink, + snapshot: &PreparedSnapshot, + ) -> Result<()> { + Self::send_item( + sink, + SnapshotStreamItem::Manifest { + snapshot_id: snapshot.snapshot_id.clone(), + block_hash: snapshot.block_hash, + total_bytes: snapshot.total_bytes, + chunk_size: snapshot.chunk_size_bytes as u64, + total_chunks: snapshot.total_chunks, + sha256_hex: snapshot.sha256_hex.clone(), + compression: SNAPSHOT_COMPRESSION.to_owned(), + }, + ) + .await?; + + self.stream_archive_chunks(sink, snapshot).await?; + + Self::send_item( + sink, + SnapshotStreamItem::Completed { + total_chunks: snapshot.total_chunks, + total_bytes: snapshot.total_bytes, + }, + ) + .await + } + + async fn stream_archive_chunks( + &self, + sink: &SubscriptionSink, + snapshot: &PreparedSnapshot, + ) -> Result<()> { + let (sender, mut receiver) = mpsc::channel::), String>>(4); + let archive_path = snapshot.archive_path.clone(); + let chunk_size_bytes = snapshot.chunk_size_bytes; + + tokio::task::spawn_blocking(move || { + if let Err(err) = Self::read_archive_chunks(archive_path, chunk_size_bytes, &sender) { + let _ = sender.blocking_send(Err(err.to_string())); + } + }); + + while let Some(next) = receiver.recv().await { + let (index, chunk) = match next { + Ok(ok) => ok, + Err(err) => return Err(anyhow!(err)), + }; + + Self::send_item( + sink, + SnapshotStreamItem::Chunk { + index, + data: chunk.into(), + }, + ) + .await?; + } + + Ok(()) + } + + fn read_archive_chunks( + archive_path: PathBuf, + chunk_size_bytes: usize, + sender: &mpsc::Sender), String>>, + ) -> Result<()> { + let mut file = File::open(&archive_path) + .with_context(|| format!("failed to open archive {}", archive_path.display()))?; + + let mut index = 0u64; + loop { + let mut chunk = vec![0u8; chunk_size_bytes]; + let read = file + .read(&mut chunk) + .with_context(|| format!("failed to read archive {}", archive_path.display()))?; + + if read == 0 { + break; + } + + chunk.truncate(read); + if sender.blocking_send(Ok((index, chunk))).is_err() { + break; + } + index += 1; + } + + Ok(()) + } + + async fn send_item(sink: &SubscriptionSink, item: SnapshotStreamItem) -> Result<()> { + let message = SubscriptionMessage::from_json(&item) + .context("failed to serialize snapshot stream item")?; + + sink.send(message) + .await + .context("failed to send snapshot stream item") + } +} diff --git a/ethexe/rpc/src/errors.rs b/ethexe/rpc/src/errors.rs index e36010a1587..f3235b63d3e 100644 --- a/ethexe/rpc/src/errors.rs +++ b/ethexe/rpc/src/errors.rs @@ -35,3 +35,15 @@ pub fn bad_request(err: impl ToString) -> ErrorObject<'static> { pub fn internal() -> ErrorObject<'static> { ErrorObject::owned(8000, "Internal error", None::<&str>) } + +pub fn internal_with(err: impl ToString) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Internal error", Some(err.to_string())) +} + +pub fn unauthorized(err: impl ToString) -> ErrorObject<'static> { + ErrorObject::owned(8001, "Unauthorized", Some(err.to_string())) +} + +pub fn unavailable(err: impl ToString) -> ErrorObject<'static> { + ErrorObject::owned(8002, "Service unavailable", Some(err.to_string())) +} diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 4a142649b76..45be59612cd 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -16,24 +16,32 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +pub use crate::apis::SnapshotStreamItem; #[cfg(feature = "client")] -pub use crate::apis::{BlockClient, CodeClient, FullProgramState, InjectedClient, ProgramClient}; +pub use crate::apis::{ + BlockClient, CodeClient, FullProgramState, InjectedClient, ProgramClient, SnapshotClient, +}; use anyhow::Result; use apis::{ BlockApi, BlockServer, CodeApi, CodeServer, InjectedApi, InjectedServer, ProgramApi, - ProgramServer, + ProgramServer, SnapshotApi, SnapshotServer, }; use ethexe_common::injected::{ AddressedInjectedTransaction, InjectedTransactionAcceptance, SignedPromise, }; -use ethexe_db::Database; +use ethexe_db::{Database, RocksDatabase}; use ethexe_processor::{Processor, ProcessorConfig}; -use futures::{Stream, stream::FusedStream}; -use hyper::header::HeaderValue; +use futures::{FutureExt, Stream, future::BoxFuture, stream::FusedStream}; +use hyper::header::{AUTHORIZATION, HeaderValue}; use jsonrpsee::{ RpcModule as JsonrpcModule, - server::{PingConfig, Server, ServerHandle}, + core::server::MethodResponse, + server::{ + PingConfig, Server, ServerHandle, + middleware::rpc::{RpcServiceBuilder, RpcServiceT}, + }, + types::Request, }; use std::{ net::SocketAddr, @@ -72,16 +80,47 @@ pub struct RpcConfig { pub gas_allowance: u64, /// Amount of processing threads for queue processing. pub chunk_size: usize, + /// Configuration for snapshot RPC API. + pub snapshot: Option, +} + +/// Configuration of RocksDB snapshot download RPC. +#[derive(Debug, Clone)] +pub struct SnapshotRpcConfig { + /// Static bearer token used to authorize snapshot methods. + pub auth_bearer_token: String, + /// Size of one streamed chunk in bytes. + pub chunk_size_bytes: usize, + /// Snapshot retention period in seconds. + pub retention_secs: u64, + /// Max number of concurrent snapshot downloads. + pub max_concurrent_downloads: u32, +} + +impl SnapshotRpcConfig { + pub const DEFAULT_CHUNK_SIZE_BYTES: usize = 1024 * 1024; + pub const DEFAULT_RETENTION_SECS: u64 = 600; + pub const DEFAULT_MAX_CONCURRENT_DOWNLOADS: u32 = 1; } pub struct RpcServer { config: RpcConfig, db: Database, + snapshot_db: Option, } impl RpcServer { pub fn new(config: RpcConfig, db: Database) -> Self { - Self { config, db } + Self { + config, + db, + snapshot_db: None, + } + } + + pub fn with_snapshot_source(mut self, snapshot_db: RocksDatabase) -> Self { + self.snapshot_db = Some(snapshot_db); + self } pub const fn port(&self) -> u16 { @@ -92,10 +131,31 @@ impl RpcServer { let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel(); let cors_layer = self.cors_layer()?; - let http_middleware = tower::ServiceBuilder::new().layer(cors_layer); + let http_middleware = tower::ServiceBuilder::new().layer(cors_layer).map_request( + |mut req: jsonrpsee::server::HttpRequest<_>| { + let token = parse_bearer_token( + req.headers() + .get(AUTHORIZATION) + .and_then(|value| value.to_str().ok()), + ); + req.extensions_mut().insert(RpcBearerToken(token)); + req + }, + ); + let expected_bearer_token = self + .config + .snapshot + .as_ref() + .map(|cfg| cfg.auth_bearer_token.clone()); + let rpc_middleware = + RpcServiceBuilder::new().layer_fn(move |service| SnapshotAuthRpcMiddleware { + service, + expected_bearer_token: expected_bearer_token.clone(), + }); let server = Server::builder() .set_http_middleware(http_middleware) + .set_rpc_middleware(rpc_middleware) // Setup WebSocket pings to detect dead connections. // Now it is set to default: ping_interval = 30s, inactive_limit = 40s .enable_ws_ping(PingConfig::default()) @@ -110,11 +170,25 @@ impl RpcServer { )? .overlaid(); + let snapshot = if let Some(snapshot_config) = self.config.snapshot.clone() { + self.snapshot_db + .clone() + .map(|snapshot_db| SnapshotApi::new(self.db.clone(), snapshot_db, snapshot_config)) + } else { + None + }; + if self.config.snapshot.is_some() && snapshot.is_none() { + tracing::warn!( + "snapshot rpc is configured, but no rocksdb source was provided; snapshot methods are disabled" + ); + } + let server_apis = RpcServerApis { code: CodeApi::new(self.db.clone()), block: BlockApi::new(self.db.clone()), program: ProgramApi::new(self.db.clone(), processor, self.config.gas_allowance), injected: InjectedApi::new(rpc_sender), + snapshot, }; let injected_api = server_apis.injected.clone(); @@ -184,6 +258,7 @@ struct RpcServerApis { pub code: CodeApi, pub injected: InjectedApi, pub program: ProgramApi, + pub snapshot: Option, } impl RpcServerApis { @@ -202,7 +277,65 @@ impl RpcServerApis { module .merge(ProgramServer::into_rpc(self.program)) .expect("No conflicts"); + if let Some(snapshot) = self.snapshot { + module + .merge(SnapshotServer::into_rpc(snapshot)) + .expect("No conflicts"); + } module } } + +#[derive(Clone, Debug)] +struct RpcBearerToken(Option); + +#[derive(Clone, Debug)] +struct SnapshotAuthRpcMiddleware { + service: S, + expected_bearer_token: Option, +} + +impl<'a, S> RpcServiceT<'a> for SnapshotAuthRpcMiddleware +where + S: RpcServiceT<'a> + 'a, + S::Future: Send + 'a, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, request: Request<'a>) -> Self::Future { + if request.method_name().starts_with("snapshot_") + && self.expected_bearer_token.is_some() + && !self.is_authorized(&request) + { + let id = request.id().into_owned(); + return async move { + MethodResponse::error(id, errors::unauthorized("invalid or missing bearer token")) + } + .boxed(); + } + + self.service.call(request).boxed() + } +} + +impl SnapshotAuthRpcMiddleware { + fn is_authorized(&self, request: &Request<'_>) -> bool { + let expected = self.expected_bearer_token.as_deref(); + let actual = request + .extensions() + .get::() + .and_then(|token| token.0.as_deref()); + actual == expected + } +} + +fn parse_bearer_token(header: Option<&str>) -> Option { + let header = header?; + let (scheme, token) = header.split_once(' ')?; + if !scheme.eq_ignore_ascii_case("bearer") || token.is_empty() { + return None; + } + + Some(token.to_owned()) +} diff --git a/ethexe/rpc/src/tests.rs b/ethexe/rpc/src/tests.rs index 8c319f4e705..bdf4d8e80b4 100644 --- a/ethexe/rpc/src/tests.rs +++ b/ethexe/rpc/src/tests.rs @@ -18,23 +18,36 @@ use crate::{ InjectedApi, InjectedClient, InjectedTransactionAcceptance, RpcConfig, RpcEvent, RpcServer, - RpcService, + RpcService, SnapshotClient, SnapshotRpcConfig, SnapshotStreamItem, }; - use ethexe_common::{ + Announce, HashOf, ProtocolTimelines, SimpleBlockData, + db::{DBConfig, DBGlobals, GlobalsStorageRO}, ecdsa::PrivateKey, gear::MAX_BLOCK_GAS_LIMIT, injected::{AddressedInjectedTransaction, Promise, SignedPromise}, mock::Mock, }; -use ethexe_db::Database; +use ethexe_db::{CASDatabase, Database, RawDatabase, RocksDatabase, VERSION}; use futures::StreamExt; use gear_core::{ message::{ReplyCode, SuccessReplyReason}, rpc::ReplyInfo, }; -use jsonrpsee::{server::ServerHandle, ws_client::WsClientBuilder}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use gprimitives::H256; +use jsonrpsee::{ + server::ServerHandle, + ws_client::{HeaderMap, HeaderValue, WsClient, WsClientBuilder}, +}; +use sha2::{Digest as _, Sha256}; +use std::{ + fs::{self, File}, + io::Write as _, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, +}; +use tempfile::TempDir; use tokio::task::{JoinHandle, JoinSet}; /// [`MockService`] simulates main `ethexe_service::Service` behavior. @@ -104,6 +117,7 @@ async fn start_new_server(listen_addr: SocketAddr) -> (ServerHandle, RpcService) cors: None, gas_allowance: MAX_BLOCK_GAS_LIMIT, chunk_size: 2, + snapshot: None, }; RpcServer::new(rpc_config, Database::memory()) .run_server() @@ -255,3 +269,663 @@ async fn test_concurrent_multiple_clients() { let _ = tasks.join_all().await; wait_for_closed_subscriptions(injected_api).await; } + +struct SnapshotFixture { + _temp_dir: TempDir, + rocks_db: RocksDatabase, + db: Database, + synced_block_hash: H256, + sample_hash: H256, + sample_payload: Vec, +} + +impl SnapshotFixture { + fn new(entry_count: usize, entry_size: usize) -> Self { + Self::new_with_payload_mode(entry_count, entry_size, false) + } + + fn new_high_entropy(entry_count: usize, entry_size: usize) -> Self { + Self::new_with_payload_mode(entry_count, entry_size, true) + } + + fn new_with_payload_mode(entry_count: usize, entry_size: usize, high_entropy: bool) -> Self { + let temp_dir = tempfile::tempdir().expect("Failed to create temporary directory"); + let db_rocks = RocksDatabase::open(temp_dir.path().to_path_buf()) + .expect("Failed to open rocks database"); + let db_raw = RawDatabase::from_one(&db_rocks); + + db_raw.kv.set_config(DBConfig { + version: VERSION, + chain_id: 0, + router_address: Default::default(), + timelines: ProtocolTimelines::default(), + genesis_block_hash: H256::from_low_u64_be(1), + genesis_announce_hash: HashOf::::zero(), + }); + + let synced_block_hash = H256::from_low_u64_be(42); + db_raw.kv.set_globals(DBGlobals { + start_block_hash: H256::from_low_u64_be(1), + start_announce_hash: HashOf::::zero(), + latest_synced_block: SimpleBlockData { + hash: synced_block_hash, + header: Default::default(), + }, + latest_prepared_block_hash: synced_block_hash, + latest_computed_announce_hash: HashOf::::zero(), + }); + + let db_reopened = + Database::try_from_raw(db_raw).expect("Constructs Database from RawDatabase"); + + let mut sample = None; + for index in 0..entry_count { + let payload = if high_entropy { + pseudo_random_payload(index as u64 + 1, entry_size) + } else { + let mut payload = vec![0u8; entry_size]; + payload.fill((index % 255) as u8); + payload + }; + + let hash = db_reopened.cas().write(&payload); + if sample.is_none() { + sample = Some((hash, payload.clone())); + } + } + + let (sample_hash, sample_payload) = sample.expect("snapshot fixture should have data"); + + Self { + _temp_dir: temp_dir, + rocks_db: db_rocks, + db: db_reopened, + synced_block_hash, + sample_hash, + sample_payload, + } + } +} + +fn pseudo_random_payload(seed: u64, len: usize) -> Vec { + let mut state = seed; + let mut payload = vec![0u8; len]; + + for byte in &mut payload { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + *byte = state as u8; + } + + payload +} + +fn snapshot_artifacts_root() -> PathBuf { + std::env::temp_dir().join("ethexe-rpc-snapshots") +} + +fn unused_local_addr() -> SocketAddr { + let listener = + std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("should bind localhost"); + let addr = listener + .local_addr() + .expect("should obtain local socket address"); + drop(listener); + addr +} + +fn snapshot_rpc_config( + token: &str, + retention_secs: u64, + max_concurrent_downloads: u32, +) -> SnapshotRpcConfig { + SnapshotRpcConfig { + auth_bearer_token: token.to_string(), + chunk_size_bytes: 32 * 1024, + retention_secs, + max_concurrent_downloads, + } +} + +async fn start_snapshot_server( + listen_addr: SocketAddr, + fixture: &SnapshotFixture, + snapshot_cfg: Option, +) -> (ServerHandle, RpcService) { + let rpc_config = RpcConfig { + listen_addr, + cors: None, + gas_allowance: MAX_BLOCK_GAS_LIMIT, + chunk_size: 2, + snapshot: snapshot_cfg, + }; + RpcServer::new(rpc_config, fixture.db.clone()) + .with_snapshot_source(fixture.rocks_db.clone()) + .run_server() + .await + .expect("RPC Server will start successfully") +} + +async fn snapshot_client(listen_addr: SocketAddr, token: Option<&str>) -> WsClient { + let mut builder = WsClientBuilder::new(); + + if let Some(token) = token { + let mut headers = HeaderMap::new(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {token}")) + .expect("Authorization header must be valid"), + ); + builder = builder.set_headers(headers); + } + + builder + .build(format!("ws://{listen_addr}")) + .await + .expect("WS client will be created") +} + +struct DownloadedSnapshot { + snapshot_id: String, + block_hash: H256, + total_bytes: u64, + chunk_size: u64, + total_chunks: u64, + sha256_hex: String, + chunks: Vec<(u64, Vec)>, +} + +struct StreamedDownloadedSnapshot { + snapshot_id: String, + block_hash: H256, + total_bytes: u64, + chunk_size: u64, + total_chunks: u64, + sha256_hex: String, +} + +async fn download_snapshot(client: &WsClient) -> DownloadedSnapshot { + let mut subscription = client + .download() + .await + .expect("Snapshot subscription must be created"); + + let manifest = subscription + .next() + .await + .expect("Manifest item should be present") + .expect("Manifest item should not contain subscription error"); + + let (snapshot_id, block_hash, total_bytes, chunk_size, total_chunks, sha256_hex, _compression) = + match manifest { + SnapshotStreamItem::Manifest { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + } => ( + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + ), + other => panic!("Expected manifest item, got: {other:?}"), + }; + + let mut chunks = Vec::with_capacity(total_chunks as usize); + for _ in 0..total_chunks { + let item = subscription + .next() + .await + .expect("Chunk item should be present") + .expect("Chunk item should not contain subscription error"); + match item { + SnapshotStreamItem::Chunk { index, data } => chunks.push((index, data.0)), + other => panic!("Expected chunk item, got: {other:?}"), + } + } + + let completed = subscription + .next() + .await + .expect("Completed item should be present") + .expect("Completed item should not contain subscription error"); + + match completed { + SnapshotStreamItem::Completed { + total_chunks: done_chunks, + total_bytes: done_bytes, + } => { + assert_eq!(done_chunks, total_chunks); + assert_eq!(done_bytes, total_bytes); + } + other => panic!("Expected completed item, got: {other:?}"), + } + + DownloadedSnapshot { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + chunks, + } +} + +async fn download_snapshot_to_file( + client: &WsClient, + archive_path: PathBuf, +) -> StreamedDownloadedSnapshot { + let mut subscription = client + .download() + .await + .expect("Snapshot subscription must be created"); + + let manifest = subscription + .next() + .await + .expect("Manifest item should be present") + .expect("Manifest item should not contain subscription error"); + + let (snapshot_id, block_hash, total_bytes, chunk_size, total_chunks, sha256_hex, _compression) = + match manifest { + SnapshotStreamItem::Manifest { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + } => ( + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + compression, + ), + other => panic!("Expected manifest item, got: {other:?}"), + }; + + let mut archive = File::create(&archive_path).expect("Archive file should be created"); + let mut hasher = Sha256::new(); + let mut bytes_written = 0u64; + let mut chunks_written = 0u64; + + for expected_index in 0..total_chunks { + let item = subscription + .next() + .await + .expect("Chunk item should be present") + .expect("Chunk item should not contain subscription error"); + match item { + SnapshotStreamItem::Chunk { index, data } => { + assert_eq!(index, expected_index, "chunk index should be sequential"); + archive + .write_all(&data.0) + .expect("Chunk data should be written to archive file"); + hasher.update(&data.0); + bytes_written += data.0.len() as u64; + chunks_written += 1; + } + other => panic!("Expected chunk item, got: {other:?}"), + } + } + + let completed = subscription + .next() + .await + .expect("Completed item should be present") + .expect("Completed item should not contain subscription error"); + + match completed { + SnapshotStreamItem::Completed { + total_chunks: done_chunks, + total_bytes: done_bytes, + } => { + assert_eq!(done_chunks, total_chunks); + assert_eq!(done_bytes, total_bytes); + } + other => panic!("Expected completed item, got: {other:?}"), + } + + assert_eq!(bytes_written, total_bytes); + assert_eq!(chunks_written, total_chunks); + assert_eq!(sha256_hex, hex::encode(hasher.finalize())); + + StreamedDownloadedSnapshot { + snapshot_id, + block_hash, + total_bytes, + chunk_size, + total_chunks, + sha256_hex, + } +} + +fn extract_snapshot_archive(archive_path: &PathBuf, extract_dir: &PathBuf) { + let archive = File::open(archive_path).expect("Archive file should be opened"); + let decoder = zstd::Decoder::new(archive).expect("Archive zstd decoder should be created"); + let mut tar = tar::Archive::new(decoder); + tar.unpack(extract_dir) + .expect("Archive should be unpacked successfully"); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_download_success_with_valid_token() { + let fixture = SnapshotFixture::new(32, 16 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let snapshot = download_snapshot(&client).await; + + assert_eq!(snapshot.block_hash, fixture.synced_block_hash); + assert!(!snapshot.snapshot_id.is_empty()); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_download_rejects_missing_token() { + let fixture = SnapshotFixture::new(16, 8 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, None).await; + let err = client + .download() + .await + .expect_err("Download must fail without auth token"); + assert!( + err.to_string().contains("Unauthorized"), + "unexpected error: {err}" + ); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_download_rejects_invalid_token() { + let fixture = SnapshotFixture::new(16, 8 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("wrong-token")).await; + let err = client + .download() + .await + .expect_err("Download must fail for wrong token"); + assert!( + err.to_string().contains("Unauthorized"), + "unexpected error: {err}" + ); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_methods_not_registered_when_disabled() { + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_new_server(listen_addr).await; + + let client = snapshot_client(listen_addr, None).await; + let err = client + .download() + .await + .expect_err("Download must fail when snapshot API is disabled"); + assert!( + err.to_string().contains("Method not found"), + "unexpected error: {err}" + ); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_methods_not_registered_without_snapshot_source() { + let listen_addr = unused_local_addr(); + let rpc_config = RpcConfig { + listen_addr, + cors: None, + gas_allowance: MAX_BLOCK_GAS_LIMIT, + chunk_size: 2, + snapshot: Some(snapshot_rpc_config("snapshot-token", 600, 1)), + }; + let (_handle, _rpc) = RpcServer::new(rpc_config, Database::memory()) + .run_server() + .await + .expect("RPC Server will start successfully"); + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let err = client + .download() + .await + .expect_err("Download must fail when snapshot source is unavailable"); + assert!( + err.to_string().contains("Method not found"), + "unexpected error: {err}" + ); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_stream_manifest_then_chunks_then_completed() { + let fixture = SnapshotFixture::new(32, 8 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let snapshot = download_snapshot(&client).await; + let indexes: Vec<_> = snapshot.chunks.iter().map(|(i, _)| *i).collect(); + assert_eq!(indexes, (0..snapshot.total_chunks).collect::>()); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_chunk_count_and_total_bytes_match_manifest() { + let fixture = SnapshotFixture::new(32, 8 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let snapshot = download_snapshot(&client).await; + let bytes_downloaded: usize = snapshot.chunks.iter().map(|(_, chunk)| chunk.len()).sum(); + + assert_eq!(snapshot.total_chunks as usize, snapshot.chunks.len()); + assert_eq!(snapshot.total_bytes, bytes_downloaded as u64); + assert_eq!(snapshot.chunk_size, 32 * 1024); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_sha256_matches_streamed_archive() { + let fixture = SnapshotFixture::new(32, 8 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let snapshot = download_snapshot(&client).await; + + let mut hasher = Sha256::new(); + for (_, chunk) in &snapshot.chunks { + hasher.update(chunk); + } + assert_eq!(snapshot.sha256_hex, hex::encode(hasher.finalize())); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_cleanup_removes_request_artifacts_and_ttl_gc_removes_stale() { + let fixture = SnapshotFixture::new(32, 8 * 1024); + let listen_addr = unused_local_addr(); + + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 0, 1)), + ) + .await; + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let first_snapshot = download_snapshot(&client).await; + let service_prefix = first_snapshot + .snapshot_id + .split_once('-') + .expect("snapshot id should include service prefix") + .0 + .to_string(); + let stale_dir = snapshot_artifacts_root() + .join(service_prefix) + .join("stale-artifact-for-test"); + std::fs::create_dir_all(&stale_dir).expect("stale dir should be created"); + + let second_snapshot = download_snapshot(&client).await; + + tokio::time::sleep(Duration::from_millis(50)).await; + + let snapshot_dir = snapshot_artifacts_root() + .join( + second_snapshot + .snapshot_id + .split_once('-') + .expect("snapshot id should include service prefix") + .0, + ) + .join(second_snapshot.snapshot_id); + assert!( + !snapshot_dir.exists(), + "snapshot directory should be removed" + ); + assert!(!stale_dir.exists(), "stale artifact should be removed"); +} + +#[tokio::test] +#[ntest::timeout(120_000)] +async fn snapshot_concurrency_limit_enforced() { + let fixture = SnapshotFixture::new(64, 512 * 1024); + let listen_addr = unused_local_addr(); + let (_handle, _rpc) = start_snapshot_server( + listen_addr, + &fixture, + Some(snapshot_rpc_config("snapshot-token", 600, 1)), + ) + .await; + + let first_client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let second_client = snapshot_client(listen_addr, Some("snapshot-token")).await; + + let first_task = tokio::spawn(async move { first_client.download().await }); + tokio::time::sleep(Duration::from_millis(10)).await; + + let err = second_client + .download() + .await + .expect_err("Second concurrent download must fail"); + let err_text = err.to_string(); + assert!( + err_text.contains("Service unavailable") || err_text.contains("Internal error"), + "unexpected error: {err}" + ); + + let first_subscription = first_task + .await + .expect("first download task should finish") + .expect("first download should succeed"); + first_subscription + .unsubscribe() + .await + .expect("unsubscribe should succeed"); +} + +#[tokio::test] +#[ignore = "manual large snapshot test; generates and downloads a 2+ GiB archive over WS"] +#[ntest::timeout(7_200_000)] +async fn snapshot_download_large_real_archive_2gib() { + const ENTRY_COUNT: usize = 544; + const ENTRY_SIZE: usize = 4 * 1024 * 1024; + const CHUNK_SIZE_BYTES: usize = 1_048_576; + + let fixture = SnapshotFixture::new_high_entropy(ENTRY_COUNT, ENTRY_SIZE); + let listen_addr = unused_local_addr(); + let snapshot_cfg = SnapshotRpcConfig { + auth_bearer_token: "snapshot-token".to_string(), + chunk_size_bytes: CHUNK_SIZE_BYTES, + retention_secs: 600, + max_concurrent_downloads: 1, + }; + let (_handle, _rpc) = start_snapshot_server(listen_addr, &fixture, Some(snapshot_cfg)).await; + + let output_dir = tempfile::tempdir().expect("temporary directory should be created"); + let archive_path = output_dir.path().join("snapshot.tar.zst"); + let extract_dir = output_dir.path().join("extract"); + + let client = snapshot_client(listen_addr, Some("snapshot-token")).await; + let snapshot = download_snapshot_to_file(&client, archive_path.clone()).await; + + assert_eq!(snapshot.block_hash, fixture.synced_block_hash); + assert_eq!(snapshot.chunk_size, CHUNK_SIZE_BYTES as u64); + assert!(snapshot.total_bytes > 2 * 1024 * 1024 * 1024); + assert_eq!( + snapshot.total_chunks, + snapshot.total_bytes.div_ceil(snapshot.chunk_size) + ); + assert!(!snapshot.snapshot_id.is_empty()); + assert!(!snapshot.sha256_hex.is_empty()); + + fs::create_dir_all(&extract_dir).expect("extract directory should be created"); + extract_snapshot_archive(&archive_path, &extract_dir); + + let reopened_db = RocksDatabase::open(extract_dir.join("rocksdb")) + .expect("Extracted RocksDB checkpoint should reopen successfully"); + let reopened_database = RawDatabase::from_one(&reopened_db); + let reopened_database = Database::try_from_raw(reopened_database) + .expect("Database should be constructed from RawDatabase successfully"); + let block_hash = { + let globals_db = reopened_database.globals(); + + globals_db.latest_synced_block.hash + }; + assert_eq!(block_hash, fixture.synced_block_hash); + assert_eq!( + reopened_db.read(fixture.sample_hash), + Some(fixture.sample_payload.clone()) + ); +} diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 0e988efdffd..8020d3d72fe 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -381,10 +381,9 @@ impl Service { None }; - let rpc = config - .rpc - .as_ref() - .map(|config| RpcServer::new(config.clone(), db.clone())); + let rpc = config.rpc.as_ref().map(|config| { + RpcServer::new(config.clone(), db.clone()).with_snapshot_source(rocks_db.clone()) + }); let compute_config = ComputeConfig::new(config.node.canonical_quarantine); let processor_config = ProcessorConfig { diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index f0e1d867462..2181ab37b80 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -137,6 +137,7 @@ async fn basics() { .checked_mul(config.node.block_gas_limit) .unwrap(), chunk_size: config.node.chunk_processing_threads, + snapshot: None, }); config.prometheus = Some(PrometheusConfig { diff --git a/ethexe/service/src/tests/utils/env.rs b/ethexe/service/src/tests/utils/env.rs index 625a5e2aea6..cffbf8f5a6d 100644 --- a/ethexe/service/src/tests/utils/env.rs +++ b/ethexe/service/src/tests/utils/env.rs @@ -816,6 +816,7 @@ impl NodeConfig { cors: None, gas_allowance: DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER * DEFAULT_BLOCK_GAS_LIMIT, chunk_size: DEFAULT_CHUNK_SIZE.get(), + snapshot: None, }; self.rpc = Some(service_rpc_config); diff --git a/utils/gear-workspace-hack/Cargo.toml b/utils/gear-workspace-hack/Cargo.toml index 147d61f8e92..83a42cb4092 100644 --- a/utils/gear-workspace-hack/Cargo.toml +++ b/utils/gear-workspace-hack/Cargo.toml @@ -17,204 +17,204 @@ publish = false # they have features like `runtime-benchmarks` and `try-runtime` # so we cannot, for example, run `cargo build -p gear-cli` without `--all-features` -[dependencies.frame-executive] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.frame-executive] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.frame-support] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.frame-support] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" features = ["experimental"] -[dependencies.frame-try-runtime] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.frame-try-runtime] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-authority-discovery] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-authority-discovery] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-authorship] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-authorship] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-babe] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-babe] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-bags-list] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-bags-list] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-balances] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-balances] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-bounties] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-bounties] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-child-bounties] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-child-bounties] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-conviction-voting] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-conviction-voting] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-election-provider-multi-phase] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-election-provider-multi-phase] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-grandpa] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-grandpa] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-identity] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-identity] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-im-online] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-im-online] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-multisig] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-multisig] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-nomination-pools] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-nomination-pools] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-offences] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-offences] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-preimage] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-preimage] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-proxy] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-proxy] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-ranked-collective] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-ranked-collective] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-referenda] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-referenda] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-scheduler] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-scheduler] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-session] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-session] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["historical", "std"] -[dependencies.pallet-sudo] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-sudo] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-timestamp] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-timestamp] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-transaction-payment] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-transaction-payment] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-treasury] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-treasury] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-utility] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-utility] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-vesting] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-vesting] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.pallet-whitelist] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.pallet-whitelist] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["std"] -[dependencies.sc-client-db] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.sc-client-db] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" default-features = false features = ["rocksdb", "test-helpers"] -[dependencies.sc-service] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.sc-service] git = "https://github.com/gear-tech/polkadot-sdk.git" branch = "gear-polkadot-stable2409-wasm32v1-none" features = ["test-helpers"] ### BEGIN HAKARI SECTION -[dependencies] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { git = "https://github.com/gear-tech/alloy.git", branch = "fix-blob-gas-filler", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } @@ -326,7 +326,6 @@ jsonrpsee = { version = "0.24", default-features = false, features = ["client", jsonrpsee-client-transport = { version = "0.24", default-features = false, features = ["tls-rustls-platform-verifier", "web", "ws"] } jsonrpsee-core = { version = "0.24", features = ["async-client", "async-wasm-client", "http-helpers", "server"] } k256 = { version = "0.13", features = ["expose-field", "hash2curve", "serde"] } -keccak = { version = "0.1", default-features = false, features = ["asm"] } libc = { version = "0.2" } libp2p = { version = "0.52", default-features = false, features = ["dns", "ed25519", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "wasm-ext", "websocket", "yamux"] } libp2p-identity = { version = "0.2", default-features = false, features = ["ed25519", "peerid", "rand", "secp256k1", "serde"] } @@ -399,11 +398,10 @@ serde_json = { version = "1", features = ["alloc", "arbitrary_precision", "raw_v serdect = { version = "0.2" } sha1 = { version = "0.10" } sha2 = { version = "0.10" } -sha3 = { version = "0.10", features = ["asm"] } +sha3 = { version = "0.10" } signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] } slice-group-by = { version = "0.3" } smallvec = { version = "1", default-features = false, features = ["const_new", "serde", "union"] } -socket2 = { version = "0.4", default-features = false, features = ["all"] } soketto = { version = "0.8", features = ["http"] } sp-allocator = { git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409-wasm32v1-none" } sp-api = { git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409-wasm32v1-none", features = ["frame-metadata"] } @@ -486,8 +484,9 @@ wasmtime-jit-debug = { version = "8", default-features = false, features = ["gdb wasmtime-runtime = { version = "8", default-features = false, features = ["async", "pooling-allocator"] } winnow = { version = "0.7" } zeroize = { version = "1", features = ["derive", "std"] } +zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } -[build-dependencies] +[target.'cfg(not(target_arch = "wasm32"))'.build-dependencies] aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { git = "https://github.com/gear-tech/alloy.git", branch = "fix-blob-gas-filler", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } @@ -609,7 +608,6 @@ jsonrpsee = { version = "0.24", default-features = false, features = ["client", jsonrpsee-client-transport = { version = "0.24", default-features = false, features = ["tls-rustls-platform-verifier", "web", "ws"] } jsonrpsee-core = { version = "0.24", features = ["async-client", "async-wasm-client", "http-helpers", "server"] } k256 = { version = "0.13", features = ["expose-field", "hash2curve", "serde"] } -keccak = { version = "0.1", default-features = false, features = ["asm"] } libc = { version = "0.2" } libp2p = { version = "0.52", default-features = false, features = ["dns", "ed25519", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "wasm-ext", "websocket", "yamux"] } libp2p-identity = { version = "0.2", default-features = false, features = ["ed25519", "peerid", "rand", "secp256k1", "serde"] } @@ -684,11 +682,10 @@ serde_json = { version = "1", features = ["alloc", "arbitrary_precision", "raw_v serdect = { version = "0.2" } sha1 = { version = "0.10" } sha2 = { version = "0.10" } -sha3 = { version = "0.10", features = ["asm"] } +sha3 = { version = "0.10" } signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] } slice-group-by = { version = "0.3" } smallvec = { version = "1", default-features = false, features = ["const_new", "serde", "union"] } -socket2 = { version = "0.4", default-features = false, features = ["all"] } soketto = { version = "0.8", features = ["http"] } sp-allocator = { git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409-wasm32v1-none" } sp-api = { git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409-wasm32v1-none", features = ["frame-metadata"] } @@ -774,6 +771,7 @@ wasmtime-jit-debug = { version = "8", default-features = false, features = ["gdb wasmtime-runtime = { version = "8", default-features = false, features = ["async", "pooling-allocator"] } winnow = { version = "0.7" } zeroize = { version = "1", features = ["derive", "std"] } +zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } [target.x86_64-unknown-linux-gnu.dependencies] crossbeam-epoch = { version = "0.9" } @@ -781,7 +779,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } @@ -806,7 +804,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } @@ -832,7 +830,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } @@ -856,7 +854,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } @@ -881,7 +879,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } nom = { version = "7" } @@ -903,7 +901,7 @@ errno = { version = "0.3" } gimli = { version = "0.28" } hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "logging", "ring", "tls12", "webpki-tokio"] } hyper-util = { version = "0.1", default-features = false, features = ["client-proxy"] } -itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } +itertools-594e8ee84c453af0 = { package = "itertools", version = "0.13", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false, features = ["extra_traits"] } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } nom = { version = "7" }