diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 5277e0ac740..4cf55c9847b 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::io::Write; use std::net::SocketAddr; +use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -50,16 +51,23 @@ use tokio::net::TcpListener; use tracing::debug; use super::shutdown::NodeShutdownHandle; +use crate::test_utils::STANDALONE_NODE_NAME; -pub struct TestNodeConfig { +struct ClusterNode { + node_name: String, + config: NodeConfig, + shutdown_handle: NodeShutdownHandle, +} + +pub struct SandboxNodeConfig { + pub node_name: String, pub services: HashSet, pub enable_otlp: bool, } pub struct ClusterSandboxBuilder { temp_dir: TempDir, - node_configs: Vec, - use_legacy_ingest: bool, + node_configs: Vec, } impl Default for ClusterSandboxBuilder { @@ -67,14 +75,65 @@ impl Default for ClusterSandboxBuilder { Self { temp_dir: tempfile::tempdir().unwrap(), node_configs: Vec::new(), - use_legacy_ingest: false, } } } +struct SandboxCommonConfigs { + root_data_dir: PathBuf, + metastore_uri: QuickwitUri, + default_index_root_uri: QuickwitUri, + cluster_id: String, +} + +impl SandboxCommonConfigs { + fn new(tmp_dir: &TempDir) -> Self { + let unique_ram_dir_name = new_coolid("test-ram-dir"); + let metastore_uri = + QuickwitUri::from_str(&format!("ram:///{}/metastore", unique_ram_dir_name)).unwrap(); + let default_index_root_uri = + QuickwitUri::from_str(&format!("ram:///{}/indexes", unique_ram_dir_name)).unwrap(); + let cluster_id = new_coolid("test-cluster"); + Self { + root_data_dir: tmp_dir.path().to_path_buf(), + metastore_uri, + default_index_root_uri, + cluster_id, + } + } +} + +/// Creates a NodeConfig from sandbox configs. +/// +/// Peer seeds still need to be filled later. +fn assemble_node_config( + common_configs: &SandboxCommonConfigs, + test_node_config: SandboxNodeConfig, + rest_port: u16, + grpc_port: u16, +) -> NodeConfig { + let mut config = NodeConfig::for_test_from_ports(rest_port, grpc_port); + config.indexer_config.enable_otlp_endpoint = test_node_config.enable_otlp; + config + .enabled_services + .clone_from(&test_node_config.services); + config.jaeger_config.enable_endpoint = true; + config.cluster_id.clone_from(&common_configs.cluster_id); + config.node_id = NodeId::new(test_node_config.node_name.clone()); + config.data_dir_path = common_configs.root_data_dir.join(config.node_id.as_str()); + config.metastore_uri = common_configs.metastore_uri.clone(); + config.default_index_root_uri = common_configs.default_index_root_uri.clone(); + config +} + impl ClusterSandboxBuilder { - pub fn add_node(mut self, services: impl IntoIterator) -> Self { - self.node_configs.push(TestNodeConfig { + pub fn add_node( + mut self, + node_name: impl Into, + services: impl IntoIterator, + ) -> Self { + self.node_configs.push(SandboxNodeConfig { + node_name: node_name.into(), services: HashSet::from_iter(services), enable_otlp: false, }); @@ -83,63 +142,46 @@ impl ClusterSandboxBuilder { pub fn add_node_with_otlp( mut self, + node_name: impl Into, services: impl IntoIterator, ) -> Self { - self.node_configs.push(TestNodeConfig { + self.node_configs.push(SandboxNodeConfig { + node_name: node_name.into(), services: HashSet::from_iter(services), enable_otlp: true, }); self } - pub fn use_legacy_ingest(mut self) -> Self { - self.use_legacy_ingest = true; - self - } - /// Builds a list of of [`NodeConfig`] from the node definitions added to /// builder. For each node, a [`NodeConfig`] is built with the right /// parameters such that we will be able to run `quickwit_serve` on them and - /// form a Quickwit cluster. For each node, we set: - /// - `data_dir_path` defined by `root_data_dir/node_id`. - /// - `metastore_uri` defined by `root_data_dir/metastore`. - /// - `default_index_root_uri` defined by `root_data_dir/indexes`. - /// - `peers` defined by others nodes `gossip_advertise_addr`. + /// form a Quickwit cluster. pub async fn build_config(self) -> ResolvedClusterConfig { - let root_data_dir = self.temp_dir.path().to_path_buf(); - let cluster_id = new_coolid("test-cluster"); + let common_configs = SandboxCommonConfigs::new(&self.temp_dir); let mut resolved_node_configs = Vec::new(); let mut peers: Vec = Vec::new(); - let unique_dir_name = new_coolid("test-dir"); let tcp_listener_resolver = TestTcpListenerResolver::default(); - for (node_idx, node_builder) in self.node_configs.iter().enumerate() { + for node_builder in self.node_configs { let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into(); let rest_tcp_listener = TcpListener::bind(socket).await.unwrap(); let grpc_tcp_listener = TcpListener::bind(socket).await.unwrap(); - let mut config = NodeConfig::for_test_from_ports( + let config = assemble_node_config( + &common_configs, + node_builder, rest_tcp_listener.local_addr().unwrap().port(), grpc_tcp_listener.local_addr().unwrap().port(), ); tcp_listener_resolver.add_listener(rest_tcp_listener).await; tcp_listener_resolver.add_listener(grpc_tcp_listener).await; - config.indexer_config.enable_otlp_endpoint = node_builder.enable_otlp; - config.enabled_services.clone_from(&node_builder.services); - config.jaeger_config.enable_endpoint = true; - config.cluster_id.clone_from(&cluster_id); - config.node_id = NodeId::new(format!("test-node-{node_idx}")); - config.data_dir_path = root_data_dir.join(config.node_id.as_str()); - config.metastore_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/metastore")).unwrap(); - config.default_index_root_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/indexes")).unwrap(); peers.push(config.gossip_advertise_addr.to_string()); - resolved_node_configs.push((config, node_builder.services.clone())); + resolved_node_configs.push(config); } for node_config in resolved_node_configs.iter_mut() { - node_config.0.peer_seeds = peers + node_config.peer_seeds = peers .clone() .into_iter() - .filter(|seed| *seed != node_config.0.gossip_advertise_addr.to_string()) + .filter(|seed| *seed != node_config.gossip_advertise_addr.to_string()) .collect_vec(); } ResolvedClusterConfig { @@ -156,7 +198,7 @@ impl ClusterSandboxBuilder { pub async fn build_and_start_standalone() -> ClusterSandbox { ClusterSandboxBuilder::default() - .add_node(QuickwitService::supported_services()) + .add_node(STANDALONE_NODE_NAME, QuickwitService::supported_services()) .build_config() .await .start() @@ -168,7 +210,7 @@ impl ClusterSandboxBuilder { /// been reserved and the configurations have been generated. pub struct ResolvedClusterConfig { temp_dir: TempDir, - pub node_configs: Vec<(NodeConfig, HashSet)>, + pub node_configs: Vec, tcp_listener_resolver: TestTcpListenerResolver, } @@ -176,17 +218,16 @@ impl ResolvedClusterConfig { /// Start a cluster using this config and waits for the nodes to be ready pub async fn start(self) -> ClusterSandbox { quickwit_cli::install_default_crypto_ring_provider(); - let mut node_shutdown_handles = Vec::new(); let runtimes_config = RuntimesConfig::light_for_tests(); let storage_resolver = StorageResolver::unconfigured(); let metastore_resolver = MetastoreResolver::unconfigured(); let cluster_size = self.node_configs.len(); - for node_config in self.node_configs.iter() { - let mut shutdown_handler = - NodeShutdownHandle::new(node_config.0.node_id.clone(), node_config.1.clone()); - let shutdown_signal = shutdown_handler.shutdown_signal(); + let mut nodes = Vec::with_capacity(self.node_configs.len()); + for node_config in self.node_configs { + let mut shutdown_handle = NodeShutdownHandle::new(); + let shutdown_signal = shutdown_handle.shutdown_signal(); let join_handle = tokio::spawn({ - let node_config = node_config.0.clone(); + let node_config = node_config.clone(); let node_id = node_config.node_id.clone(); let services = node_config.enabled_services.clone(); let metastore_resolver = metastore_resolver.clone(); @@ -208,14 +249,17 @@ impl ResolvedClusterConfig { Result::<_, anyhow::Error>::Ok(result) } }); - shutdown_handler.set_node_join_handle(join_handle); - node_shutdown_handles.push(shutdown_handler); + shutdown_handle.set_node_join_handle(join_handle); + nodes.push(ClusterNode { + node_name: node_config.node_id.as_str().to_string(), + config: node_config, + shutdown_handle, + }); } let sandbox = ClusterSandbox { - node_configs: self.node_configs, + nodes, _temp_dir: self.temp_dir, - node_shutdown_handles, }; sandbox .wait_for_cluster_num_ready_nodes(cluster_size) @@ -257,18 +301,22 @@ pub(crate) async fn ingest( /// A test environment where you can start a Quickwit cluster and use the gRPC /// or REST clients to test it. pub struct ClusterSandbox { - pub node_configs: Vec<(NodeConfig, HashSet)>, + nodes: Vec, _temp_dir: TempDir, - node_shutdown_handles: Vec, } impl ClusterSandbox { - fn find_node_for_service(&self, service: QuickwitService) -> NodeConfig { - self.node_configs + /// Returns the node configurations (useful for tests that need to access node settings) + pub fn node_configs(&self) -> impl Iterator { + self.nodes.iter().map(|node| &node.config) + } + + pub fn find_node_for_service(&self, service: QuickwitService) -> NodeConfig { + self.nodes .iter() - .find(|config| config.1.contains(&service)) + .find(|node| node.config.is_service_enabled(service)) .unwrap_or_else(|| panic!("No {service:?} node")) - .0 + .config .clone() } @@ -281,47 +329,14 @@ impl ClusterSandbox { } /// Returns a client to one of the nodes that runs the specified service - pub fn rest_client(&self, service: QuickwitService) -> QuickwitClient { - let node_config = self.find_node_for_service(service); - - let certificate = if let Some(tls_conf) = &node_config.rest_config.tls { - let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap(); - Some(reqwest::tls::Certificate::from_pem(&cert_bytes).unwrap()) - } else { - None - }; - - QuickwitClientBuilder::new(transport_url( - node_config.rest_config.listen_addr, - certificate.is_some(), - )) - .set_tls_ca(certificate) - .build() - } - - /// A client configured to ingest documents and return detailed parse failures. - pub fn detailed_ingest_client(&self) -> QuickwitClient { - let node_config = self.find_node_for_service(QuickwitService::Indexer); - - let certificate = if let Some(tls_conf) = &node_config.rest_config.tls { - let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap(); - Some(reqwest::tls::Certificate::from_pem(&cert_bytes).unwrap()) - } else { - None - }; - - QuickwitClientBuilder::new(transport_url( - node_config.rest_config.listen_addr, - certificate.is_some(), - )) - .set_tls_ca(certificate) - .detailed_response(true) - .build() - } - - // TODO(#5604) - pub fn rest_client_legacy_indexer(&self) -> QuickwitClient { - let node_config = self.find_node_for_service(QuickwitService::Indexer); + pub fn rest_client(&self, node_name: &str) -> QuickwitClient { + let node_config = self + .nodes + .iter() + .find(|node| node.node_name == node_name) + .unwrap_or_else(|| panic!("No node named {node_name}")) + .config + .clone(); let certificate = if let Some(tls_conf) = &node_config.rest_config.tls { let cert_bytes = std::fs::read(&tls_conf.ca_path).unwrap(); @@ -335,7 +350,6 @@ impl ClusterSandbox { certificate.is_some(), )) .set_tls_ca(certificate) - .use_legacy_ingest(true) .build() } @@ -355,10 +369,13 @@ impl ClusterSandbox { &self, expected_num_ready_nodes: usize, ) -> anyhow::Result<()> { + let metastore_node_id = self + .find_node_for_service(QuickwitService::Metastore) + .node_id; wait_until_predicate( - || async move { + || async { match self - .rest_client(QuickwitService::Metastore) + .rest_client(metastore_node_id.as_str()) .cluster() .snapshot() .await @@ -388,21 +405,23 @@ impl ClusterSandbox { Ok(()) } - /// Waits for the needed number of indexing pipeline to start. - /// - /// WARNING! does not work if multiple indexers are running + /// Waits for the needed number of indexing pipeline to start on the given indexer. pub async fn wait_for_indexing_pipelines( &self, + node_name: &str, required_pipeline_num: usize, ) -> anyhow::Result<()> { + let node_config = self + .node_configs() + .find(|conf| conf.node_id.as_str() == node_name) + .unwrap(); + assert!( + node_config.is_service_enabled(QuickwitService::Indexer), + "{node_name} is not an indexer" + ); wait_until_predicate( - || async move { - match self - .rest_client(QuickwitService::Indexer) - .node_stats() - .indexing() - .await - { + || async { + match self.rest_client(node_name).node_stats().indexing().await { Ok(result) => { if result.num_running_pipelines != required_pipeline_num { debug!( @@ -415,7 +434,7 @@ impl ClusterSandbox { } } Err(err) => { - debug!("wait_for_cluster_num_ready_nodes error {err}"); + debug!("wait_for_indexing_pipelines error {err}"); false } } @@ -434,15 +453,18 @@ impl ClusterSandbox { split_states_filter: Option>, required_splits_num: usize, ) -> anyhow::Result<()> { + let metastore_node_id = self + .find_node_for_service(QuickwitService::Metastore) + .node_id; wait_until_predicate( || { let splits_query_params = ListSplitsQueryParams { split_states: split_states_filter.clone(), ..Default::default() }; - async move { + async { match self - .rest_client(QuickwitService::Metastore) + .rest_client(metastore_node_id.as_str()) .splits(index_id) .list(splits_query_params) .await @@ -474,17 +496,17 @@ impl ClusterSandbox { } pub async fn local_ingest(&self, index_id: &str, json_data: &[Value]) -> anyhow::Result<()> { - let test_conf = self - .node_configs + let test_node = self + .nodes .iter() - .find(|config| config.1.contains(&QuickwitService::Indexer)) + .find(|node| node.config.is_service_enabled(QuickwitService::Indexer)) .ok_or(anyhow::anyhow!("No indexer node found"))?; // NodeConfig cannot be serialized, we write our own simplified config let mut tmp_config_file = tempfile::Builder::new().suffix(".yaml").tempfile().unwrap(); // we suffix data_dir with a random slug to save us from multiple local ingestion trying to // concurrently do something, and cleanup the directory to start a new ingestion. - let data_dir = test_conf - .0 + let data_dir = test_node + .config .data_dir_path .join(rand::random::().to_string()); tokio::fs::create_dir(&data_dir).await?; @@ -494,7 +516,7 @@ impl ClusterSandbox { metastore_uri: {} data_dir: {:?} "#, - test_conf.0.metastore_uri, data_dir + test_node.config.metastore_uri, data_dir ); tmp_config_file.write_all(node_config.as_bytes())?; tmp_config_file.flush()?; @@ -525,8 +547,11 @@ impl ClusterSandbox { } pub async fn assert_hit_count(&self, index_id: &str, query: &str, expected_num_hits: u64) { + let searcher_node_id = self + .find_node_for_service(QuickwitService::Searcher) + .node_id; let search_response = self - .rest_client(QuickwitService::Searcher) + .rest_client(searcher_node_id.as_str()) .search( index_id, SearchRequestQueryString { @@ -547,39 +572,42 @@ impl ClusterSandbox { ); } - /// Shutdown nodes that only provide the specified services - pub async fn shutdown_services( + /// Shutdown nodes by name + pub async fn shutdown_nodes( &mut self, - shutdown_services: impl IntoIterator, + node_names: impl IntoIterator>, ) -> Result>, anyhow::Error> { // We need to drop rest clients first because reqwest can hold connections open // preventing rest server's graceful shutdown. let mut indexer_shutdown_futures = Vec::new(); let mut other_shutdown_futures = Vec::new(); - let mut shutdown_nodes = HashMap::new(); + let mut shutdown_node_info = HashMap::new(); + let node_names_set: HashSet = node_names + .into_iter() + .map(|s| s.as_ref().to_string()) + .collect(); let mut i = 0; - let shutdown_services_map = HashSet::from_iter(shutdown_services); - while i < self.node_shutdown_handles.len() { - let handler_services = &self.node_shutdown_handles[i].node_services; - if !handler_services.is_subset(&shutdown_services_map) { + while i < self.nodes.len() { + let node_name = &self.nodes[i].node_name; + if !node_names_set.contains(node_name) { i += 1; continue; } - let handler_to_shutdown = self.node_shutdown_handles.remove(i); - shutdown_nodes.insert( - handler_to_shutdown.node_id.clone(), - handler_to_shutdown.node_services.clone(), + let node_to_shutdown = self.nodes.remove(i); + shutdown_node_info.insert( + node_to_shutdown.config.node_id.clone(), + node_to_shutdown.config.enabled_services.clone(), ); - if handler_to_shutdown - .node_services - .contains(&QuickwitService::Indexer) + if node_to_shutdown + .config + .is_service_enabled(QuickwitService::Indexer) { - indexer_shutdown_futures.push(handler_to_shutdown.shutdown()); + indexer_shutdown_futures.push(node_to_shutdown.shutdown_handle.shutdown()); } else { - other_shutdown_futures.push(handler_to_shutdown.shutdown()); + other_shutdown_futures.push(node_to_shutdown.shutdown_handle.shutdown()); } } - debug!("shutting down {:?}", shutdown_nodes); + debug!("shutting down {:?}", shutdown_node_info); // We must decommision the indexer nodes first and independently from the other nodes. let indexer_shutdown_results = future::join_all(indexer_shutdown_futures).await; let other_shutdown_results = future::join_all(other_shutdown_futures).await; @@ -593,7 +621,7 @@ impl ClusterSandbox { pub async fn shutdown( mut self, ) -> Result>, anyhow::Error> { - self.shutdown_services(QuickwitService::supported_services()) - .await + let all_node_names: Vec = self.nodes.iter().map(|n| n.node_name.clone()).collect(); + self.shutdown_nodes(all_node_names).await } } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs index 871b156abaf..eb239efd9b0 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -15,4 +15,6 @@ mod cluster_sandbox; mod shutdown; +pub(crate) const STANDALONE_NODE_NAME: &str = "standalone"; + pub(crate) use cluster_sandbox::{ClusterSandbox, ClusterSandboxBuilder, ingest}; diff --git a/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs index 2d51d3dfb8b..1bfdb2a02b1 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use quickwit_actors::ActorExitStatus; use quickwit_common::tower::BoxFutureInfaillible; -use quickwit_config::service::QuickwitService; -use quickwit_proto::types::NodeId; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::task::JoinHandle; @@ -26,19 +24,15 @@ type NodeJoinHandle = JoinHandle, anyhow pub(crate) struct NodeShutdownHandle { sender: Sender<()>, receiver: Receiver<()>, - pub node_services: HashSet, - pub node_id: NodeId, join_handle_opt: Option, } impl NodeShutdownHandle { - pub(crate) fn new(node_id: NodeId, node_services: HashSet) -> Self { + pub(crate) fn new() -> Self { let (sender, receiver) = watch::channel(()); Self { sender, receiver, - node_id, - node_services, join_handle_opt: None, } } diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 91c6b0ebeb1..630878a60a6 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -19,18 +19,18 @@ use hyper_util::rt::TokioExecutor; use quickwit_config::service::QuickwitService; use quickwit_serve::SearchRequestQueryString; -use crate::test_utils::ClusterSandboxBuilder; +use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME}; #[tokio::test] async fn test_ui_redirect_on_get() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; - let node_config = sandbox.node_configs.first().unwrap(); + let node_config = sandbox.node_configs().next().unwrap(); let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http(); - let root_uri = format!("http://{}/", node_config.0.rest_config.listen_addr) + let root_uri = format!("http://{}/", node_config.rest_config.listen_addr) .parse::() .unwrap(); let response = client.get(root_uri.clone()).await.unwrap(); @@ -52,7 +52,7 @@ async fn test_standalone_server() { { // The indexing service should be running. let counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_stats() .indexing() .await @@ -63,7 +63,7 @@ async fn test_standalone_server() { { // Create an dynamic index. sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create( r#" @@ -83,7 +83,7 @@ async fn test_standalone_server() { // Index should be searchable assert_eq!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .search( "my-new-index", SearchRequestQueryString { @@ -97,7 +97,10 @@ async fn test_standalone_server() { .num_hits, 0 ); - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines(STANDALONE_NODE_NAME, 1) + .await + .unwrap(); } sandbox.shutdown().await.unwrap(); } @@ -106,16 +109,16 @@ async fn test_standalone_server() { async fn test_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( r#" @@ -136,7 +139,7 @@ async fn test_multi_nodes_cluster() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -144,11 +147,14 @@ async fn test_multi_nodes_cluster() { ); // Assert that at least 1 indexing pipelines is successfully started - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 1) + .await + .unwrap(); // Check that search is working let search_response_empty = sandbox - .rest_client(QuickwitService::Searcher) + .rest_client("searcher") .search( "my-new-multi-node-index", SearchRequestQueryString { diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v1_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v1_tests.rs index af28f5b21a1..e4eebb2cf80 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v1_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v1_tests.rs @@ -15,7 +15,7 @@ use quickwit_config::ConfigFormat; use quickwit_config::service::QuickwitService; use quickwit_metastore::SplitState; -use quickwit_rest_client::rest_client::CommitType; +use quickwit_rest_client::rest_client::{CommitType, QuickwitClientBuilder}; use serde_json::json; use crate::ingest_json; @@ -27,14 +27,16 @@ use crate::test_utils::{ClusterSandboxBuilder, ingest}; #[tokio::test] async fn test_ingest_v1_happy_path() { let sandbox = ClusterSandboxBuilder::default() - .use_legacy_ingest() - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::Searcher]) - .add_node([ - QuickwitService::ControlPlane, - QuickwitService::Janitor, - QuickwitService::Metastore, - ]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node( + "other", + [ + QuickwitService::ControlPlane, + QuickwitService::Janitor, + QuickwitService::Metastore, + ], + ) .build_and_start() .await; @@ -51,7 +53,18 @@ async fn test_ingest_v1_happy_path() { commit_timeout_secs: 1 "# ); - let indexer_client = sandbox.rest_client_legacy_indexer(); + let indexer_node_config = sandbox + .node_configs() + .find(|cfg| cfg.node_id.as_str() == "indexer") + .unwrap(); + let url = reqwest::Url::parse(&format!( + "http://{}", + indexer_node_config.rest_config.listen_addr + )) + .unwrap(); + let indexer_client = QuickwitClientBuilder::new(url) + .use_legacy_ingest(true) + .build(); indexer_client .indexes() .create(index_config, ConfigFormat::Yaml, false) diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index 4a5e29c9565..55d9c41794e 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -24,12 +24,12 @@ use quickwit_metastore::SplitState; use quickwit_proto::ingest::ParseFailureReason; use quickwit_rest_client::error::{ApiError, Error}; use quickwit_rest_client::models::IngestSource; -use quickwit_rest_client::rest_client::CommitType; +use quickwit_rest_client::rest_client::{CommitType, QuickwitClientBuilder}; use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure}; use serde_json::json; use crate::ingest_json; -use crate::test_utils::{ClusterSandboxBuilder, ingest}; +use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME, ingest}; /// Ingesting on a freshly re-created index sometimes fails, see #5430 #[tokio::test] @@ -54,14 +54,14 @@ async fn test_ingest_recreated_index() { "# ); let current_index_metadata = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config.clone(), ConfigFormat::Yaml, false) .await .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "first record"}), CommitType::Force, @@ -75,7 +75,7 @@ async fn test_ingest_recreated_index() { .unwrap(); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await @@ -84,7 +84,7 @@ async fn test_ingest_recreated_index() { // Recreate the index and start ingesting into it again let new_index_metadata = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config, ConfigFormat::Yaml, false) .await @@ -96,7 +96,7 @@ async fn test_ingest_recreated_index() { ); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "second record"}), CommitType::Force, @@ -110,7 +110,7 @@ async fn test_ingest_recreated_index() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "third record"}), CommitType::Force, @@ -124,7 +124,7 @@ async fn test_ingest_recreated_index() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "fourth record"}), CommitType::Force, @@ -152,7 +152,7 @@ async fn test_ingest_recreated_index() { // Delete the index to avoid potential hanging on shutdown #5068 sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await @@ -184,14 +184,14 @@ async fn test_indexing_directory_cleanup() { "# ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config.clone(), ConfigFormat::Yaml, false) .await .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "first record"}), CommitType::Force, @@ -205,14 +205,14 @@ async fn test_indexing_directory_cleanup() { .unwrap(); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await .unwrap(); // The index is deleted so the `indexing` directory should be cleaned up - let data_dir_path = &sandbox.node_configs.first().unwrap().0.data_dir_path; + let data_dir_path = &sandbox.node_configs().next().unwrap().data_dir_path; let indexing_dir_path = data_dir_path.join(INDEXING_DIR_NAME); wait_until_predicate( || async { @@ -232,17 +232,26 @@ async fn test_indexing_directory_cleanup() { #[tokio::test] async fn test_ingest_v2_index_not_found() { let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) - .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) - .add_node([ - QuickwitService::ControlPlane, - QuickwitService::Metastore, - QuickwitService::Searcher, - ]) + .add_node( + "indexitor-1", + [QuickwitService::Indexer, QuickwitService::Janitor], + ) + .add_node( + "indexitor-2", + [QuickwitService::Indexer, QuickwitService::Janitor], + ) + .add_node( + "other", + [ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ], + ) .build_and_start() .await; let missing_index_err: Error = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexitor-1") .ingest( "missing_index", ingest_json!({"body": "doc1"}), @@ -265,13 +274,22 @@ async fn test_ingest_v2_index_not_found() { #[tokio::test] async fn test_ingest_v2_happy_path() { let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) - .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) - .add_node([ - QuickwitService::ControlPlane, - QuickwitService::Metastore, - QuickwitService::Searcher, - ]) + .add_node( + "indexitor-1", + [QuickwitService::Indexer, QuickwitService::Janitor], + ) + .add_node( + "indexitor-2", + [QuickwitService::Indexer, QuickwitService::Janitor], + ) + .add_node( + "other", + [ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ], + ) .build_and_start() .await; let index_id = "test_happy_path"; @@ -288,14 +306,14 @@ async fn test_ingest_v2_happy_path() { "# ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexitor-1") .indexes() .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); let ingest_resp = ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexitor-1"), index_id, ingest_json!({"body": "doc1"}), CommitType::Auto, @@ -321,7 +339,7 @@ async fn test_ingest_v2_happy_path() { // Delete the index to avoid potential hanging on shutdown #5068 sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexitor-1") .indexes() .delete(index_id, false) .await @@ -348,7 +366,7 @@ async fn test_commit_force() { ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config, ConfigFormat::Yaml, false) .await @@ -359,7 +377,7 @@ async fn test_commit_force() { let ingest_resp = tokio::time::timeout( Duration::from_secs(20), ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "force"}), CommitType::Force, @@ -382,7 +400,7 @@ async fn test_commit_force() { // Delete the index to avoid waiting for the commit timeout on shutdown #5068 sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await @@ -410,7 +428,7 @@ async fn test_commit_wait_for() { // Create index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config, ConfigFormat::Yaml, false) .await @@ -418,7 +436,7 @@ async fn test_commit_wait_for() { // run 2 ingest requests at the same time on the same index // wait_for shouldn't force the commit so expect only 1 published split - let client = sandbox.rest_client(QuickwitService::Indexer); + let client = sandbox.rest_client(STANDALONE_NODE_NAME); let ingest_1_fut = client .ingest( index_id, @@ -474,7 +492,7 @@ async fn test_commit_wait_for() { ..Default::default() }; let published_splits = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .splits(index_id) .list(splits_query_params) .await @@ -502,14 +520,14 @@ async fn test_commit_auto() { ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); let ingest_resp = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, ingest_json!({"body": "auto"}), @@ -558,14 +576,25 @@ async fn test_detailed_ingest_response() { "# ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); + // Create a client configured to ingest documents and return detailed parse failures + let node_config = sandbox + .node_configs() + .find(|cfg| cfg.enabled_services.contains(&QuickwitService::Indexer)) + .unwrap(); + let url = + reqwest::Url::parse(&format!("http://{}", node_config.rest_config.listen_addr)).unwrap(); + let detailed_ingest_client = QuickwitClientBuilder::new(url) + .detailed_response(true) + .build(); + let ingest_resp = ingest( - &sandbox.detailed_ingest_client(), + &detailed_ingest_client, index_id, IngestSource::Str("{\"body\":\"hello\"}\naouch!".to_string()), CommitType::Auto, @@ -592,11 +621,11 @@ async fn test_detailed_ingest_response() { #[tokio::test] async fn test_very_large_index_name() { let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -608,7 +637,7 @@ async fn test_very_large_index_name() { let oversized_index_id = format!("{acceptable_index_id}1"); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( format!( @@ -630,7 +659,7 @@ async fn test_very_large_index_name() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexer"), acceptable_index_id, ingest_json!({"body": "not too long"}), CommitType::Auto, @@ -649,14 +678,14 @@ async fn test_very_large_index_name() { // Delete the index to avoid potential hanging on shutdown #5068 sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .delete(acceptable_index_id, false) .await .unwrap(); let error = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( format!( @@ -691,7 +720,7 @@ async fn test_shutdown_single_node() { let index_id = "test_shutdown_single_node"; sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create( format!( @@ -713,7 +742,7 @@ async fn test_shutdown_single_node() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client(STANDALONE_NODE_NAME), index_id, ingest_json!({"body": "one"}), CommitType::Force, @@ -722,7 +751,7 @@ async fn test_shutdown_single_node() { .unwrap(); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, ingest_json!({"body": "two"}), @@ -742,20 +771,23 @@ async fn test_shutdown_single_node() { #[tokio::test] async fn test_shutdown_control_plane_first() { let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Indexer]) - .add_node([ - QuickwitService::ControlPlane, - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Janitor, - ]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node( + "other", + [ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ], + ) .build_and_start() .await; let index_id = "test_shutdown_control_plane_first"; // Create index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( format!( @@ -777,7 +809,7 @@ async fn test_shutdown_control_plane_first() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexer"), index_id, ingest_json!({"body": "one"}), CommitType::Force, @@ -785,15 +817,7 @@ async fn test_shutdown_control_plane_first() { .await .unwrap(); - sandbox - .shutdown_services([ - QuickwitService::ControlPlane, - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Janitor, - ]) - .await - .unwrap(); + sandbox.shutdown_nodes(["other"]).await.unwrap(); // The indexer hangs on shutdown because it cannot commit the shard EOF tokio::time::timeout(Duration::from_secs(5), sandbox.shutdown()) @@ -804,19 +828,22 @@ async fn test_shutdown_control_plane_first() { #[tokio::test] async fn test_shutdown_indexer_first() { let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Indexer]) - .add_node([ - QuickwitService::ControlPlane, - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Janitor, - ]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node( + "other", + [ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ], + ) .build_and_start() .await; let index_id = "test_shutdown_indexer_first"; sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( format!( @@ -838,7 +865,7 @@ async fn test_shutdown_indexer_first() { .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexer"), index_id, ingest_json!({"body": "one"}), CommitType::Force, @@ -846,10 +873,7 @@ async fn test_shutdown_indexer_first() { .await .unwrap(); - sandbox - .shutdown_services([QuickwitService::Indexer]) - .await - .unwrap(); + sandbox.shutdown_nodes(["indexer"]).await.unwrap(); tokio::time::timeout(Duration::from_secs(5), sandbox.shutdown()) .await diff --git a/quickwit/quickwit-integration-tests/src/tests/no_cp_tests.rs b/quickwit/quickwit-integration-tests/src/tests/no_cp_tests.rs index ccc8d2fa77f..646455ba97c 100644 --- a/quickwit/quickwit-integration-tests/src/tests/no_cp_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/no_cp_tests.rs @@ -34,9 +34,9 @@ fn initialize_tests() { async fn test_search_after_control_plane_shutdown() { initialize_tests(); let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Searcher]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("searcher", [QuickwitService::Searcher]) .build_and_start() .await; let index_id = "test-search-after-control-plane-shutdown"; @@ -54,16 +54,13 @@ async fn test_search_after_control_plane_shutdown() { ); sandbox - .rest_client(QuickwitService::Metastore) + .rest_client("metastore") .indexes() .create(index_config.clone(), ConfigFormat::Yaml, false) .await .unwrap(); - sandbox - .shutdown_services([QuickwitService::ControlPlane]) - .await - .unwrap(); + sandbox.shutdown_nodes(["control-plane"]).await.unwrap(); sandbox.assert_hit_count(index_id, "", 0).await; @@ -74,15 +71,15 @@ async fn test_search_after_control_plane_shutdown() { async fn test_searcher_and_metastore_without_control_plane() { initialize_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("searcher", [QuickwitService::Searcher]) .build_and_start() .await; // we cannot create an actual index without control plane let search_error = sandbox - .rest_client(QuickwitService::Searcher) + .rest_client("searcher") .search( "does-not-exist", SearchRequestQueryString { @@ -112,8 +109,11 @@ async fn test_searcher_and_metastore_without_control_plane() { async fn test_indexer_fails_without_control_plane() { initialize_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer, QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node( + "indexer-searcher", + [QuickwitService::Indexer, QuickwitService::Searcher], + ) .build_and_start() .await; diff --git a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs index fa981c2bdf6..23c6364869a 100644 --- a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs @@ -47,15 +47,18 @@ fn initialize_tests() { async fn test_ingest_traces_with_otlp_grpc_api() { initialize_tests(); let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node_with_otlp([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node_with_otlp("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; // Wait for the pipelines to start (one for logs and one for traces) - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 2) + .await + .unwrap(); fn build_span(span_name: String) -> Vec { let scope_spans = vec![ScopeSpans { @@ -129,10 +132,7 @@ async fn test_ingest_traces_with_otlp_grpc_api() { assert_eq!(status.code(), tonic::Code::NotFound); } - sandbox - .shutdown_services([QuickwitService::Indexer]) - .await - .unwrap(); + sandbox.shutdown_nodes(["indexer"]).await.unwrap(); sandbox.shutdown().await.unwrap(); } @@ -140,15 +140,18 @@ async fn test_ingest_traces_with_otlp_grpc_api() { async fn test_ingest_logs_with_otlp_grpc_api() { initialize_tests(); let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node_with_otlp([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node_with_otlp("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; // Wait fo the pipelines to start (one for logs and one for traces) - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 2) + .await + .unwrap(); fn build_log(body: String) -> Vec { let log_record = LogRecord { @@ -203,10 +206,7 @@ async fn test_ingest_logs_with_otlp_grpc_api() { .await; } - sandbox - .shutdown_services([QuickwitService::Indexer]) - .await - .unwrap(); + sandbox.shutdown_nodes(["indexer"]).await.unwrap(); sandbox.shutdown().await.unwrap(); } @@ -214,15 +214,18 @@ async fn test_ingest_logs_with_otlp_grpc_api() { async fn test_jaeger_api() { initialize_tests(); let mut sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node_with_otlp([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node_with_otlp("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; // Wait fo the pipelines to start (one for logs and one for traces) - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 2) + .await + .unwrap(); let export_trace_request = ExportTraceServiceRequest { resource_spans: make_resource_spans_for_test(), @@ -238,10 +241,7 @@ async fn test_jaeger_api() { .await .unwrap(); - sandbox - .shutdown_services([QuickwitService::Indexer]) - .await - .unwrap(); + sandbox.shutdown_nodes(["indexer"]).await.unwrap(); { // Test `GetServices` diff --git a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs index e9b01834791..b77fe487147 100644 --- a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs @@ -26,7 +26,7 @@ use quickwit_indexing::source::sqs_queue::test_helpers as sqs_test_helpers; use quickwit_metastore::SplitState; use tempfile::NamedTempFile; -use crate::test_utils::ClusterSandboxBuilder; +use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME}; fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { let mut temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -61,7 +61,7 @@ async fn test_sqs_with_duplicates() { let queue_url = sqs_test_helpers::create_queue(&sqs_client, "test-single-node-cluster").await; sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config.clone(), ConfigFormat::Yaml, false) .await @@ -85,7 +85,7 @@ async fn test_sqs_with_duplicates() { ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .sources(index_id) .create(source_config_input, ConfigFormat::Yaml) .await @@ -133,7 +133,7 @@ async fn test_sqs_with_duplicates() { .expect("number of in-flight messages didn't reach 2 within the timeout"); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await @@ -164,7 +164,7 @@ async fn test_sqs_garbage_collect() { let queue_url = sqs_test_helpers::create_queue(&sqs_client, "test-single-node-cluster").await; sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create(index_config.clone(), ConfigFormat::Yaml, false) .await @@ -190,7 +190,7 @@ async fn test_sqs_garbage_collect() { ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .sources(index_id) .create(source_config_input, ConfigFormat::Yaml) .await @@ -213,7 +213,7 @@ async fn test_sqs_garbage_collect() { wait_until_predicate( || async { let shard_count = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .sources(index_id) .get_shards(source_id) .await @@ -229,7 +229,7 @@ async fn test_sqs_garbage_collect() { .expect("shards where not pruned within the timeout"); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .delete(index_id, false) .await @@ -248,11 +248,11 @@ async fn test_update_source_multi_node_cluster() { let queue_url = sqs_test_helpers::create_queue(&sqs_client, "test-update-source-cluster").await; let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -261,7 +261,7 @@ async fn test_update_source_multi_node_cluster() { // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_stats() .indexing() .await @@ -283,14 +283,17 @@ async fn test_update_source_multi_node_cluster() { "# ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); // Wait until indexing pipelines are started - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 1) + .await + .unwrap(); // create an SQS source with 1 pipeline let source_id: &str = "test-update-source-cluster"; @@ -312,14 +315,17 @@ async fn test_update_source_multi_node_cluster() { "# ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .sources(index_id) .create(source_config_input, ConfigFormat::Yaml) .await .unwrap(); // Wait until the SQS indexing pipeline is also started - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 2) + .await + .unwrap(); // increase the number of pipelines to 3 let source_config_input = format!( @@ -340,14 +346,17 @@ async fn test_update_source_multi_node_cluster() { "# ); sandbox - .rest_client(QuickwitService::Metastore) + .rest_client("metastore") .sources(index_id) .update(source_id, source_config_input, ConfigFormat::Yaml, false) .await .unwrap(); // Wait until the SQS indexing pipeline is also started - sandbox.wait_for_indexing_pipelines(4).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 4) + .await + .unwrap(); sandbox.shutdown().await.unwrap(); } diff --git a/quickwit/quickwit-integration-tests/src/tests/tls_tests.rs b/quickwit/quickwit-integration-tests/src/tests/tls_tests.rs index 997860169ef..cbeb7aa9bc0 100644 --- a/quickwit/quickwit-integration-tests/src/tests/tls_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/tls_tests.rs @@ -24,10 +24,10 @@ use crate::test_utils::ClusterSandboxBuilder; async fn test_tls_rest() { quickwit_common::setup_logging_for_tests(); let mut sandbox_config = ClusterSandboxBuilder::default() - .add_node(QuickwitService::supported_services()) + .add_node("tls-standalone", QuickwitService::supported_services()) .build_config() .await; - sandbox_config.node_configs[0].0.rest_config.tls = Some(quickwit_config::TlsConfig { + sandbox_config.node_configs[0].rest_config.tls = Some(quickwit_config::TlsConfig { cert_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.crt").to_string(), key_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.key").to_string(), ca_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/ca.crt").to_string(), @@ -35,12 +35,12 @@ async fn test_tls_rest() { validate_client: false, }); let sandbox = sandbox_config.start().await; - let node_config = sandbox.node_configs.first().unwrap(); + let node_config = sandbox.node_configs().next().unwrap(); let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http::(); - let root_uri = format!("http://{}/", node_config.0.rest_config.listen_addr) + let root_uri = format!("http://{}/", node_config.rest_config.listen_addr) .parse::() .unwrap(); client @@ -50,7 +50,7 @@ async fn test_tls_rest() { assert_eq!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("tls-standalone") .indexes() .list() .await @@ -66,16 +66,16 @@ async fn test_tls_rest() { async fn test_tls_grpc() { quickwit_common::setup_logging_for_tests(); let mut sandbox_config = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_config() .await; for node in &mut sandbox_config.node_configs { - node.0.rest_config.tls = Some(quickwit_config::TlsConfig { + node.rest_config.tls = Some(quickwit_config::TlsConfig { cert_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.crt").to_string(), key_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/server.key").to_string(), ca_path: concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/ca.crt").to_string(), @@ -89,7 +89,7 @@ async fn test_tls_grpc() { // TODO connect to grpc port and verify it refuses non-tls connection sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( r#" @@ -110,7 +110,7 @@ async fn test_tls_grpc() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -118,11 +118,14 @@ async fn test_tls_grpc() { ); // Assert that at least 1 indexing pipelines is successfully started - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 1) + .await + .unwrap(); // Check that search is working let search_response_empty = sandbox - .rest_client(QuickwitService::Searcher) + .rest_client("searcher") .search( "my-new-multi-node-index", SearchRequestQueryString { diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/create_on_update.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/create_on_update.rs index b9b3bde4bef..1ff26506f96 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/create_on_update.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/create_on_update.rs @@ -26,11 +26,11 @@ use crate::test_utils::{ClusterSandboxBuilder, ingest}; async fn test_update_missing_no_create() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -39,7 +39,7 @@ async fn test_update_missing_no_create() { // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_stats() .indexing() .await @@ -49,7 +49,7 @@ async fn test_update_missing_no_create() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -57,7 +57,7 @@ async fn test_update_missing_no_create() { ); let status_code = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .update( "my-updatable-index", @@ -91,11 +91,11 @@ async fn test_update_missing_no_create() { async fn test_update_missing_create() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -104,7 +104,7 @@ async fn test_update_missing_create() { // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_stats() .indexing() .await @@ -114,7 +114,7 @@ async fn test_update_missing_create() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -122,7 +122,7 @@ async fn test_update_missing_create() { ); sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .update( "my-updatable-index", @@ -153,11 +153,11 @@ async fn test_update_missing_create() { async fn test_update_create_existing_doesnt_clear() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -166,7 +166,7 @@ async fn test_update_create_existing_doesnt_clear() { // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_stats() .indexing() .await @@ -176,7 +176,7 @@ async fn test_update_create_existing_doesnt_clear() { // Create an index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( r#" @@ -200,7 +200,7 @@ async fn test_update_create_existing_doesnt_clear() { .unwrap(); assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -208,10 +208,13 @@ async fn test_update_create_existing_doesnt_clear() { ); // Wait until indexing pipelines are started - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 1) + .await + .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexer"), "my-updatable-index", ingest_json!({"title": "first", "body": "first record"}), CommitType::Auto, @@ -228,7 +231,7 @@ async fn test_update_create_existing_doesnt_clear() { // Update the index to also search `body` by default, the same search should // now have 1 hit sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .update( "my-updatable-index", diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index cc806177b52..d14a4253884 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -15,13 +15,12 @@ use std::fmt::Write; use std::time::Duration; -use quickwit_config::service::QuickwitService; use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::CommitType; use serde_json::{Value, json}; use super::assert_hits_unordered; -use crate::test_utils::ClusterSandboxBuilder; +use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME}; /// Update the doc mapping between 2 calls to local-ingest (forces separate indexing pipelines) and /// assert the number of hits for the given query @@ -40,7 +39,7 @@ async fn validate_search_across_doc_mapping_updates( // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_stats() .indexing() .await @@ -50,7 +49,7 @@ async fn validate_search_across_doc_mapping_updates( // Create index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create( json!({ @@ -70,7 +69,7 @@ async fn validate_search_across_doc_mapping_updates( assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_health() .is_live() .await @@ -78,7 +77,10 @@ async fn validate_search_across_doc_mapping_updates( ); // Wait until indexing pipelines are started. - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines(STANDALONE_NODE_NAME, 1) + .await + .unwrap(); // We use local ingest to always pick up the latest doc mapping sandbox @@ -88,7 +90,7 @@ async fn validate_search_across_doc_mapping_updates( // Update index to also search "body" by default, search should now have 1 hit sandbox - .rest_client(QuickwitService::Searcher) + .rest_client(STANDALONE_NODE_NAME) .indexes() .update( index_id, @@ -591,23 +593,14 @@ async fn test_update_doc_mapping_add_field_on_strict() { async fn test_update_doc_validation() { quickwit_common::setup_logging_for_tests(); let index_id = "update-doc-validation"; - let sandbox = ClusterSandboxBuilder::default() - .add_node([ - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Indexer, - QuickwitService::ControlPlane, - QuickwitService::Janitor, - ]) - .build_and_start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // Wait for indexer to fully start. // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_stats() .indexing() .await @@ -617,7 +610,7 @@ async fn test_update_doc_validation() { // Create index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create( json!({ @@ -641,7 +634,7 @@ async fn test_update_doc_validation() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_health() .is_live() .await @@ -649,7 +642,10 @@ async fn test_update_doc_validation() { ); // Wait until indexing pipelines are started. - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines(STANDALONE_NODE_NAME, 1) + .await + .unwrap(); let unsigned_payload = (0..20).fold(String::new(), |mut buffer, id| { writeln!(&mut buffer, "{{\"body\": {id}}}").unwrap(); @@ -657,7 +653,7 @@ async fn test_update_doc_validation() { }); let unsigned_response = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(unsigned_payload.clone()), @@ -671,7 +667,7 @@ async fn test_update_doc_validation() { assert_eq!(unsigned_response.num_rejected_docs.unwrap(), 0); sandbox - .rest_client(QuickwitService::Searcher) + .rest_client(STANDALONE_NODE_NAME) .indexes() .update( index_id, @@ -700,7 +696,7 @@ async fn test_update_doc_validation() { }); let signed_response = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(signed_payload.clone()), diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs index ac9e24e517d..7c8eceeb59a 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs @@ -25,8 +25,11 @@ async fn assert_hits_unordered( query: &str, expected_result: Result<&[Value], ()>, ) { + let searcher_node_id = sandbox + .find_node_for_service(QuickwitService::Searcher) + .node_id; let search_res = sandbox - .rest_client(QuickwitService::Searcher) + .rest_client(searcher_node_id.as_str()) .search( index_id, SearchRequestQueryString { diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs index a431295aaf7..b8abdf75bc5 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs @@ -15,7 +15,6 @@ use std::fmt::Write; use std::time::Duration; -use quickwit_config::service::QuickwitService; use quickwit_metastore::SplitState; use quickwit_proto::types::DocMappingUid; use quickwit_rest_client::models::IngestSource; @@ -23,29 +22,20 @@ use quickwit_rest_client::rest_client::CommitType; use quickwit_serve::ListSplitsQueryParams; use serde_json::json; -use crate::test_utils::ClusterSandboxBuilder; +use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME}; #[tokio::test] async fn test_update_doc_mapping_restart_indexing_pipeline() { let index_id = "update-restart-ingest"; quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxBuilder::default() - .add_node([ - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Indexer, - QuickwitService::ControlPlane, - QuickwitService::Janitor, - ]) - .build_and_start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // Wait for indexer to fully start. // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_stats() .indexing() .await @@ -60,7 +50,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // Create index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .indexes() .create( json!({ @@ -85,7 +75,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .node_health() .is_live() .await @@ -93,7 +83,10 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { ); // Wait until indexing pipelines are started. - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines(STANDALONE_NODE_NAME, 1) + .await + .unwrap(); let payload = (0..1000).fold(String::new(), |mut buffer, id| { writeln!(&mut buffer, "{{\"body\": {id}}}").unwrap(); @@ -103,7 +96,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // ingest some documents with old doc mapping. // we *don't* use local ingest to use a normal indexing pipeline sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(payload.clone()), @@ -125,7 +118,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // the pipeline gets killed and restarted (in practice as this cluster is very lightly loaded, // it will almost always kill the pipeline before these documents are committed) sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(payload.clone()), @@ -138,7 +131,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // Update index sandbox - .rest_client(QuickwitService::Searcher) + .rest_client(STANDALONE_NODE_NAME) .indexes() .update( index_id, @@ -166,7 +159,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // the pipeline gets killed and restarted. In practice this will almost always use the new // mapping on a lightly loaded cluster. sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(payload.clone()), @@ -186,7 +179,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { // we ingest again, definitely with the up to date doc mapper this time sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .ingest( index_id, IngestSource::Str(payload.clone()), @@ -204,7 +197,7 @@ async fn test_update_doc_mapping_restart_indexing_pipeline() { .unwrap(); let splits = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client(STANDALONE_NODE_NAME) .splits(index_id) .list(ListSplitsQueryParams::default()) .await diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs index 8c9fc5621f3..b2bddeed3f4 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs @@ -26,11 +26,11 @@ use crate::test_utils::{ClusterSandboxBuilder, ingest}; async fn test_update_search_settings_on_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .add_node([QuickwitService::Janitor]) + .add_node("searcher", [QuickwitService::Searcher]) + .add_node("metastore", [QuickwitService::Metastore]) + .add_node("indexer", [QuickwitService::Indexer]) + .add_node("control-plane", [QuickwitService::ControlPlane]) + .add_node("janitor", [QuickwitService::Janitor]) .build_and_start() .await; @@ -39,7 +39,7 @@ async fn test_update_search_settings_on_multi_nodes_cluster() { // The starting time is a bit long for a cluster. tokio::time::sleep(Duration::from_secs(3)).await; let indexing_service_counters = sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_stats() .indexing() .await @@ -49,7 +49,7 @@ async fn test_update_search_settings_on_multi_nodes_cluster() { // Create an index sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .create( r#" @@ -73,7 +73,7 @@ async fn test_update_search_settings_on_multi_nodes_cluster() { .unwrap(); assert!( sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .node_health() .is_live() .await @@ -81,10 +81,13 @@ async fn test_update_search_settings_on_multi_nodes_cluster() { ); // Wait until indexing pipelines are started - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + sandbox + .wait_for_indexing_pipelines("indexer", 1) + .await + .unwrap(); ingest( - &sandbox.rest_client(QuickwitService::Indexer), + &sandbox.rest_client("indexer"), "my-updatable-index", ingest_json!({"title": "first", "body": "first record"}), CommitType::Auto, @@ -101,7 +104,7 @@ async fn test_update_search_settings_on_multi_nodes_cluster() { // Update the index to also search `body` by default, the same search should // now have 1 hit sandbox - .rest_client(QuickwitService::Indexer) + .rest_client("indexer") .indexes() .update( "my-updatable-index",