Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 165 additions & 137 deletions quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions quickwit/quickwit-integration-tests/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
mod cluster_sandbox;
mod shutdown;

pub(crate) const STANDALONE_NODE_NAME: &str = "standalone";

pub(crate) use cluster_sandbox::{ClusterSandbox, ClusterSandboxBuilder, ingest};
10 changes: 2 additions & 8 deletions quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;

use quickwit_actors::ActorExitStatus;
use quickwit_common::tower::BoxFutureInfaillible;
use quickwit_config::service::QuickwitService;
use quickwit_proto::types::NodeId;
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::task::JoinHandle;

Expand All @@ -26,19 +24,15 @@ type NodeJoinHandle = JoinHandle<Result<HashMap<String, ActorExitStatus>, anyhow
pub(crate) struct NodeShutdownHandle {
sender: Sender<()>,
receiver: Receiver<()>,
pub node_services: HashSet<QuickwitService>,
pub node_id: NodeId,
join_handle_opt: Option<NodeJoinHandle>,
}

impl NodeShutdownHandle {
pub(crate) fn new(node_id: NodeId, node_services: HashSet<QuickwitService>) -> Self {
pub(crate) fn new() -> Self {
let (sender, receiver) = watch::channel(());
Self {
sender,
receiver,
node_id,
node_services,
join_handle_opt: None,
}
}
Expand Down
38 changes: 22 additions & 16 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ use hyper_util::rt::TokioExecutor;
use quickwit_config::service::QuickwitService;
use quickwit_serve::SearchRequestQueryString;

use crate::test_utils::ClusterSandboxBuilder;
use crate::test_utils::{ClusterSandboxBuilder, STANDALONE_NODE_NAME};

#[tokio::test]
async fn test_ui_redirect_on_get() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let node_config = sandbox.node_configs.first().unwrap();
let node_config = sandbox.node_configs().next().unwrap();
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(30))
.http2_only(true)
.build_http();
let root_uri = format!("http://{}/", node_config.0.rest_config.listen_addr)
let root_uri = format!("http://{}/", node_config.rest_config.listen_addr)
.parse::<hyper::Uri>()
.unwrap();
let response = client.get(root_uri.clone()).await.unwrap();
Expand All @@ -52,7 +52,7 @@ async fn test_standalone_server() {
{
// The indexing service should be running.
let counters = sandbox
.rest_client(QuickwitService::Indexer)
.rest_client(STANDALONE_NODE_NAME)
.node_stats()
.indexing()
.await
Expand All @@ -63,7 +63,7 @@ async fn test_standalone_server() {
{
// Create an dynamic index.
sandbox
.rest_client(QuickwitService::Indexer)
.rest_client(STANDALONE_NODE_NAME)
.indexes()
.create(
r#"
Expand All @@ -83,7 +83,7 @@ async fn test_standalone_server() {
// Index should be searchable
assert_eq!(
sandbox
.rest_client(QuickwitService::Indexer)
.rest_client(STANDALONE_NODE_NAME)
.search(
"my-new-index",
SearchRequestQueryString {
Expand All @@ -97,7 +97,10 @@ async fn test_standalone_server() {
.num_hits,
0
);
sandbox.wait_for_indexing_pipelines(1).await.unwrap();
sandbox
.wait_for_indexing_pipelines(STANDALONE_NODE_NAME, 1)
.await
.unwrap();
}
sandbox.shutdown().await.unwrap();
}
Expand All @@ -106,16 +109,16 @@ async fn test_standalone_server() {
async fn test_multi_nodes_cluster() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.add_node("searcher", [QuickwitService::Searcher])
.add_node("metastore", [QuickwitService::Metastore])
.add_node("indexer", [QuickwitService::Indexer])
.add_node("control-plane", [QuickwitService::ControlPlane])
.add_node("janitor", [QuickwitService::Janitor])
.build_and_start()
.await;

sandbox
.rest_client(QuickwitService::Indexer)
.rest_client("indexer")
.indexes()
.create(
r#"
Expand All @@ -136,19 +139,22 @@ async fn test_multi_nodes_cluster() {

assert!(
sandbox
.rest_client(QuickwitService::Indexer)
.rest_client("indexer")
.node_health()
.is_live()
.await
.unwrap()
);

// Assert that at least 1 indexing pipelines is successfully started
sandbox.wait_for_indexing_pipelines(1).await.unwrap();
sandbox
.wait_for_indexing_pipelines("indexer", 1)
.await
.unwrap();

// Check that search is working
let search_response_empty = sandbox
.rest_client(QuickwitService::Searcher)
.rest_client("searcher")
.search(
"my-new-multi-node-index",
SearchRequestQueryString {
Expand Down
33 changes: 23 additions & 10 deletions quickwit/quickwit-integration-tests/src/tests/ingest_v1_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use quickwit_config::ConfigFormat;
use quickwit_config::service::QuickwitService;
use quickwit_metastore::SplitState;
use quickwit_rest_client::rest_client::CommitType;
use quickwit_rest_client::rest_client::{CommitType, QuickwitClientBuilder};
use serde_json::json;

use crate::ingest_json;
Expand All @@ -27,14 +27,16 @@ use crate::test_utils::{ClusterSandboxBuilder, ingest};
#[tokio::test]
async fn test_ingest_v1_happy_path() {
let sandbox = ClusterSandboxBuilder::default()
.use_legacy_ingest()
.add_node([QuickwitService::Indexer])
.add_node([QuickwitService::Searcher])
.add_node([
QuickwitService::ControlPlane,
QuickwitService::Janitor,
QuickwitService::Metastore,
])
.add_node("indexer", [QuickwitService::Indexer])
.add_node("searcher", [QuickwitService::Searcher])
.add_node(
"other",
[
QuickwitService::ControlPlane,
QuickwitService::Janitor,
QuickwitService::Metastore,
],
)
.build_and_start()
.await;

Expand All @@ -51,7 +53,18 @@ async fn test_ingest_v1_happy_path() {
commit_timeout_secs: 1
"#
);
let indexer_client = sandbox.rest_client_legacy_indexer();
let indexer_node_config = sandbox
.node_configs()
.find(|cfg| cfg.node_id.as_str() == "indexer")
.unwrap();
let url = reqwest::Url::parse(&format!(
"http://{}",
indexer_node_config.rest_config.listen_addr
))
.unwrap();
let indexer_client = QuickwitClientBuilder::new(url)
.use_legacy_ingest(true)
.build();
indexer_client
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
Expand Down
Loading
Loading