From f3c415e89c14101f80149e55f912a33b6b8b5314 Mon Sep 17 00:00:00 2001 From: Aryamaan Singh Date: Sun, 22 Feb 2026 16:03:02 +0530 Subject: [PATCH 1/2] Fixed get_admin() to check before creating new admin --- crates/fluss/src/client/admin.rs | 1 + crates/fluss/src/client/connection.rs | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 3012f85c..4972ec78 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -36,6 +36,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::task::JoinHandle; +#[derive(Clone)] pub struct FlussAdmin { admin_gateway: ServerConnection, #[allow(dead_code)] diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index a17e57fb..a159466f 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::WriterClient; +use crate::client::{WriterClient}; use crate::client::admin::FlussAdmin; use crate::client::metadata::Metadata; use crate::client::table::FlussTable; @@ -23,6 +23,7 @@ use crate::config::Config; use crate::rpc::RpcClient; use parking_lot::RwLock; use std::sync::Arc; +use tokio::sync::OnceCell; use crate::error::{Error, FlussError, Result}; use crate::metadata::TablePath; @@ -32,6 +33,7 @@ pub struct FlussConnection { network_connects: Arc, args: Config, writer_client: RwLock>>, + admin_client: OnceCell, } impl FlussConnection { @@ -44,6 +46,7 @@ impl FlussConnection { network_connects: connections.clone(), args: arg.clone(), writer_client: Default::default(), + admin_client: OnceCell::new(), }) } @@ -60,7 +63,16 @@ impl FlussConnection { } pub async fn get_admin(&self) -> Result { - FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await + // Lazily initialize and cache the FlussAdmin instance. The cached FlussAdmin + // holds a reference to RpcClient, which manages connection reuse and re-acquisition + // when a cached connection becomes poisoned. Subsequent calls clone cheaply — + // all internal fields (ServerConnection, Arc, Arc) are + // Arc-backed so cloning is just a reference-count bump. + let admin = self + .admin_client + .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone())) + .await?; + Ok(admin.clone()) } pub fn get_or_create_writer_client(&self) -> Result> { From 44f8f47c37f520437ba036c1c2a9b3b11a1be6e6 Mon Sep 17 00:00:00 2001 From: Aryamaan Singh <71913204+toxicteddy00077@users.noreply.github.com> Date: Tue, 3 Mar 2026 21:16:29 +0530 Subject: [PATCH 2/2] Update crates/fluss/src/client/connection.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- bindings/cpp/src/lib.rs | 2 +- bindings/cpp/test/test_utils.h | 17 +- bindings/python/src/admin.rs | 4 +- bindings/python/src/table.rs | 4 +- bindings/python/test/conftest.py | 247 ++++++++++++++---- .../src/example_partitioned_kv_table.rs | 11 +- crates/fluss/src/client/admin.rs | 73 +++--- crates/fluss/src/client/connection.rs | 35 +-- crates/fluss/tests/integration/utils.rs | 3 +- 9 files changed, 282 insertions(+), 114 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index c310fc83..6d0d73a3 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -552,7 +552,7 @@ pub struct Connection { } pub struct Admin { - inner: fcore::client::FlussAdmin, + inner: Arc, } pub struct Table { diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index 98d119a5..76818309 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -281,8 +281,21 @@ class FlussTestEnvironment : public ::testing::Environment { if (result.Ok()) { auto admin_result = connection_.GetAdmin(admin_); if (admin_result.Ok()) { - std::cout << "Connected to Fluss cluster." << std::endl; - return; + // Check if CoordinatorEventProcessor is actually ready by performing a real + // RPC call. GetAdmin().Ok() always returns true, so we must probe further. + std::vector dbs; + auto list_result = admin_.ListDatabases(dbs); + // Also verify at least one TabletServer is available. + std::vector nodes; + auto nodes_result = admin_.GetServerNodes(nodes); + if (list_result.Ok() && nodes_result.Ok() && + std::any_of(nodes.begin(), nodes.end(), + [](const fluss::ServerNode& n) { + return n.server_type == "TabletServer"; + })) { + std::cout << "Connected to Fluss cluster." << std::endl; + return; + } } } std::cout << "Waiting for Fluss cluster to be ready..." << std::endl; diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index 703b1334..405dabf5 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -532,9 +532,9 @@ impl FlussAdmin { impl FlussAdmin { // Internal method to create FlussAdmin from core admin - pub fn from_core(admin: fcore::client::FlussAdmin) -> Self { + pub fn from_core(admin: Arc) -> Self { Self { - __admin: Arc::new(admin), + __admin: admin, } } } diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 660cd6be..8c9ea0e4 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1902,7 +1902,7 @@ macro_rules! with_scanner { #[pyclass] pub struct LogScanner { scanner: ScannerKind, - admin: fcore::client::FlussAdmin, + admin: Arc, table_info: fcore::metadata::TableInfo, /// The projected Arrow schema to use for empty table creation projected_schema: SchemaRef, @@ -2207,7 +2207,7 @@ impl LogScanner { impl LogScanner { fn new( scanner: ScannerKind, - admin: fcore::client::FlussAdmin, + admin: Arc, table_info: fcore::metadata::TableInfo, projected_schema: SchemaRef, projected_row_type: fcore::metadata::RowType, diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py index 0a969e84..7e5f8358 100644 --- a/bindings/python/test/conftest.py +++ b/bindings/python/test/conftest.py @@ -20,23 +20,50 @@ If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster. Otherwise, a Fluss cluster is started automatically via testcontainers. +The first pytest-xdist worker to run starts the cluster; other workers +detect it via port check and reuse it (matching the C++ test pattern). +Containers are cleaned up after all workers finish via pytest_unconfigure. + Run with: - uv run maturin develop && uv run pytest test/ -v + uv run maturin develop && uv run pytest test/ -v -n auto """ +import asyncio import os import socket +import subprocess import time +# Disable testcontainers Ryuk reaper for xdist runs — it would kill +# containers when the first worker exits, while others are still running. +# We handle cleanup ourselves in pytest_unconfigure. +# In single-process mode, keep Ryuk as a safety net for hard crashes. +if "PYTEST_XDIST_WORKER" in os.environ: + os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true") + import pytest import pytest_asyncio import fluss FLUSS_IMAGE = "apache/fluss" -FLUSS_VERSION = "0.8.0-incubating" +FLUSS_VERSION = "0.9.0-incubating" BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS") +# Container / network names +NETWORK_NAME = "fluss-python-test-network" +ZOOKEEPER_NAME = "zookeeper-python-test" +COORDINATOR_NAME = "coordinator-server-python-test" +TABLET_SERVER_NAME = "tablet-server-python-test" + +# Fixed host ports (must match across workers) +COORDINATOR_PORT = 9123 +TABLET_SERVER_PORT = 9124 +PLAIN_CLIENT_PORT = 9223 +PLAIN_CLIENT_TABLET_PORT = 9224 + +ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, PLAIN_CLIENT_TABLET_PORT] + def _wait_for_port(host, port, timeout=60): """Wait for a TCP port to become available.""" @@ -44,40 +71,82 @@ def _wait_for_port(host, port, timeout=60): while time.time() - start < timeout: try: with socket.create_connection((host, port), timeout=1): - return + return True except (ConnectionRefusedError, TimeoutError, OSError): time.sleep(1) - raise TimeoutError(f"Port {port} on {host} not available after {timeout}s") + return False -@pytest.fixture(scope="session") -def fluss_cluster(): - """Start a Fluss cluster using testcontainers, or use an existing one.""" - if BOOTSTRAP_SERVERS_ENV: - yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV) +def _all_ports_ready(timeout=60): + """Wait for all cluster ports to become available.""" + deadline = time.time() + timeout + for port in ALL_PORTS: + remaining = deadline - time.time() + if remaining <= 0 or not _wait_for_port("localhost", port, timeout=remaining): + return False + return True + + +def _run_cmd(cmd): + """Run a command (list form), return exit code.""" + return subprocess.run(cmd, capture_output=True).returncode + + +def _wait_for_coordinator_ready(host, port, timeout=60): + """Poll list_databases() until the CoordinatorEventProcessor is ready.""" + start = time.time() + while time.time() - start < timeout: + try: + async def _probe(): + cfg = fluss.Config({"bootstrap.servers": f"{host}:{port}"}) + conn = await fluss.FlussConnection.create(cfg) + try: + admin = await conn.get_admin() + await admin.list_databases() + return True + finally: + conn.close() + + if asyncio.run(_probe()): + return + except Exception: + pass + time.sleep(1) + raise TimeoutError( + f"Fluss coordinator on {host}:{port} did not become ready after {timeout}s. " + "CoordinatorEventProcessor may not be initialised." + ) + + +def _start_cluster(): + """Start the Fluss Docker cluster via testcontainers. + + If another worker already started the cluster (detected via port check), + reuse it. If container creation fails (name conflict from a racing worker), + wait for the other worker's cluster to become ready. + """ + # Reuse cluster started by another parallel worker or previous run. + if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1): + print("Reusing existing cluster via port check.") return from testcontainers.core.container import DockerContainer - from testcontainers.core.network import Network - network = Network() - network.create() + print("Starting Fluss cluster via testcontainers...") - zookeeper = ( - DockerContainer("zookeeper:3.9.2") - .with_network(network) - .with_name("zookeeper-python-test") - ) + # Create a named network via Docker CLI (idempotent, avoids orphaned + # random-named networks when multiple xdist workers race). + _run_cmd(["docker", "network", "create", NETWORK_NAME]) sasl_jaas = ( "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required" ' user_admin="admin-secret" user_alice="alice-secret";' ) coordinator_props = "\n".join([ - "zookeeper.address: zookeeper-python-test:2181", - "bind.listeners: INTERNAL://coordinator-server-python-test:0," - " CLIENT://coordinator-server-python-test:9123," - " PLAIN_CLIENT://coordinator-server-python-test:9223", + f"zookeeper.address: {ZOOKEEPER_NAME}:2181", + f"bind.listeners: INTERNAL://{COORDINATOR_NAME}:0," + f" CLIENT://{COORDINATOR_NAME}:9123," + f" PLAIN_CLIENT://{COORDINATOR_NAME}:9223", "advertised.listeners: CLIENT://localhost:9123," " PLAIN_CLIENT://localhost:9223", "internal.listener.name: INTERNAL", @@ -87,21 +156,11 @@ def fluss_cluster(): "netty.server.num-network-threads: 1", "netty.server.num-worker-threads: 3", ]) - coordinator = ( - DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}") - .with_network(network) - .with_name("coordinator-server-python-test") - .with_bind_ports(9123, 9123) - .with_bind_ports(9223, 9223) - .with_command("coordinatorServer") - .with_env("FLUSS_PROPERTIES", coordinator_props) - ) - tablet_props = "\n".join([ - "zookeeper.address: zookeeper-python-test:2181", - "bind.listeners: INTERNAL://tablet-server-python-test:0," - " CLIENT://tablet-server-python-test:9123," - " PLAIN_CLIENT://tablet-server-python-test:9223", + f"zookeeper.address: {ZOOKEEPER_NAME}:2181", + f"bind.listeners: INTERNAL://{TABLET_SERVER_NAME}:0," + f" CLIENT://{TABLET_SERVER_NAME}:9123," + f" PLAIN_CLIENT://{TABLET_SERVER_NAME}:9223", "advertised.listeners: CLIENT://localhost:9124," " PLAIN_CLIENT://localhost:9224", "internal.listener.name: INTERNAL", @@ -112,42 +171,122 @@ def fluss_cluster(): "netty.server.num-network-threads: 1", "netty.server.num-worker-threads: 3", ]) + + zookeeper = ( + DockerContainer("zookeeper:3.9.2") + .with_kwargs(network=NETWORK_NAME) + .with_name(ZOOKEEPER_NAME) + ) + coordinator = ( + DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}") + .with_kwargs(network=NETWORK_NAME) + .with_name(COORDINATOR_NAME) + .with_bind_ports(9123, 9123) + .with_bind_ports(9223, 9223) + .with_command("coordinatorServer") + .with_env("FLUSS_PROPERTIES", coordinator_props) + ) tablet_server = ( DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}") - .with_network(network) - .with_name("tablet-server-python-test") + .with_kwargs(network=NETWORK_NAME) + .with_name(TABLET_SERVER_NAME) .with_bind_ports(9123, 9124) .with_bind_ports(9223, 9224) .with_command("tabletServer") .with_env("FLUSS_PROPERTIES", tablet_props) ) - zookeeper.start() - coordinator.start() - tablet_server.start() + try: + zookeeper.start() + coordinator.start() + tablet_server.start() + except Exception as e: + # Another worker may have started containers with the same names. + # Wait for the cluster to become ready instead of failing. + print(f"Container start failed ({e}), waiting for cluster from another worker...") + if _all_ports_ready(): + return + raise - _wait_for_port("localhost", 9123) - _wait_for_port("localhost", 9124) - _wait_for_port("localhost", 9223) - _wait_for_port("localhost", 9224) - # Extra wait for cluster to fully initialize - time.sleep(10) + if not _all_ports_ready(): + raise RuntimeError("Cluster listeners did not become ready") + + _wait_for_coordinator_ready("localhost", PLAIN_CLIENT_PORT) + print("Fluss cluster started successfully.") + + +def _stop_cluster(): + """Stop and remove the Fluss Docker cluster containers.""" + for name in [TABLET_SERVER_NAME, COORDINATOR_NAME, ZOOKEEPER_NAME]: + subprocess.run(["docker", "rm", "-f", name], capture_output=True) + subprocess.run(["docker", "network", "rm", NETWORK_NAME], capture_output=True) - # (plaintext_bootstrap, sasl_bootstrap) - yield ("127.0.0.1:9223", "127.0.0.1:9123") - tablet_server.stop() - coordinator.stop() - zookeeper.stop() - network.remove() +async def _connect_with_retry(bootstrap_servers, timeout=60): + """Connect to the Fluss cluster with retries until it's fully ready. + + Waits until both the coordinator and at least one tablet server are + available, matching the Rust wait_for_cluster_ready pattern. + """ + config = fluss.Config({"bootstrap.servers": bootstrap_servers}) + start = time.time() + last_err = None + while time.time() - start < timeout: + conn = None + try: + conn = await fluss.FlussConnection.create(config) + admin = await conn.get_admin() + nodes = await admin.get_server_nodes() + if any(n.server_type == "TabletServer" for n in nodes): + return conn + last_err = RuntimeError("No TabletServer available yet") + except Exception as e: + last_err = e + if conn is not None: + conn.close() + await asyncio.sleep(1) + raise RuntimeError( + f"Could not connect to cluster after {timeout}s: {last_err}" + ) + + +def pytest_unconfigure(config): + """Clean up Docker containers after all xdist workers finish. + + Runs once on the controller process (or the single process when + not using xdist). Workers are identified by the 'workerinput' attr. + """ + if BOOTSTRAP_SERVERS_ENV: + return + if hasattr(config, "workerinput"): + return # This is a worker, skip + _stop_cluster() + + +@pytest.fixture(scope="session") +def fluss_cluster(): + """Start a Fluss cluster using testcontainers, or use an existing one.""" + if BOOTSTRAP_SERVERS_ENV: + sasl_env = os.environ.get( + "FLUSS_SASL_BOOTSTRAP_SERVERS", BOOTSTRAP_SERVERS_ENV + ) + yield (BOOTSTRAP_SERVERS_ENV, sasl_env) + return + + _start_cluster() + + # (plaintext_bootstrap, sasl_bootstrap) + yield ( + f"127.0.0.1:{PLAIN_CLIENT_PORT}", + f"127.0.0.1:{COORDINATOR_PORT}", + ) @pytest_asyncio.fixture(scope="session") async def connection(fluss_cluster): """Session-scoped connection to the Fluss cluster (plaintext).""" plaintext_addr, _sasl_addr = fluss_cluster - config = fluss.Config({"bootstrap.servers": plaintext_addr}) - conn = await fluss.FlussConnection.create(config) + conn = await _connect_with_retry(plaintext_addr) yield conn conn.close() diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index 9cd2e7df..70fd6229 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -22,6 +22,7 @@ use fluss::error::Result; use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; +use std::sync::Arc; #[tokio::main] #[allow(dead_code)] @@ -46,7 +47,7 @@ pub async fn main() -> Result<()> { let table_path = TablePath::new("fluss", "partitioned_kv_example"); - let mut admin = conn.get_admin().await?; + let admin = conn.get_admin().await?; admin .create_table(&table_path, &table_descriptor, true) .await?; @@ -55,9 +56,9 @@ pub async fn main() -> Result<()> { admin.get_table_info(&table_path).await? ); - create_partition(&table_path, &mut admin, "APAC", 1).await; - create_partition(&table_path, &mut admin, "EMEA", 2).await; - create_partition(&table_path, &mut admin, "US", 3).await; + create_partition(&table_path, &admin, "APAC", 1).await; + create_partition(&table_path, &admin, "EMEA", 2).await; + create_partition(&table_path, &admin, "US", 3).await; let table = conn.get_table(&table_path).await?; let table_upsert = table.new_upsert()?; @@ -129,7 +130,7 @@ pub async fn main() -> Result<()> { Ok(()) } -async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) { +async fn create_partition(table_path: &TablePath, admin: &Arc, region: &str, zone: i64) { let mut partition_values = HashMap::new(); partition_values.insert("region".to_string(), region.to_string()); partition_values.insert("zone".to_string(), zone.to_string()); diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index fbc90cca..f7199a17 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -37,32 +37,28 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::task::JoinHandle; -#[derive(Clone)] pub struct FlussAdmin { - admin_gateway: ServerConnection, - #[allow(dead_code)] metadata: Arc, - #[allow(dead_code)] rpc_client: Arc, } impl FlussAdmin { - pub async fn new(connections: Arc, metadata: Arc) -> Result { - let admin_con = - connections - .get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else( - || Error::UnexpectedError { - message: "Coordinator server not found in cluster metadata".to_string(), - source: None, - }, - )?) - .await?; - - Ok(FlussAdmin { - admin_gateway: admin_con, + pub fn new(connections: Arc, metadata: Arc) -> Self { + FlussAdmin { metadata, rpc_client: connections, - }) + } + } + + async fn admin_gateway(&self) -> Result { + let cluster = self.metadata.get_cluster(); + let coordinator = cluster.get_coordinator_server().ok_or_else(|| { + Error::UnexpectedError { + message: "Coordinator server not found in cluster metadata".to_string(), + source: None, + } + })?; + self.rpc_client.get_connection(coordinator).await } pub async fn create_database( @@ -72,7 +68,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreateDatabaseRequest::new( database_name, database_descriptor, @@ -89,7 +86,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreateTableRequest::new( table_path, table_descriptor, @@ -105,7 +103,8 @@ impl FlussAdmin { ignore_if_not_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropTableRequest::new(table_path, ignore_if_not_exists)) .await?; Ok(()) @@ -113,7 +112,8 @@ impl FlussAdmin { pub async fn get_table_info(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(GetTableRequest::new(table_path)) .await?; @@ -145,7 +145,8 @@ impl FlussAdmin { /// List all tables in the given database pub async fn list_tables(&self, database_name: &str) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListTablesRequest::new(database_name)) .await?; Ok(response.table_name) @@ -163,7 +164,8 @@ impl FlussAdmin { partial_partition_spec: Option<&PartitionSpec>, ) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListPartitionInfosRequest::new( table_path, partial_partition_spec, @@ -180,7 +182,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreatePartitionRequest::new( table_path, partition_spec, @@ -198,7 +201,8 @@ impl FlussAdmin { ignore_if_not_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropPartitionRequest::new( table_path, partition_spec, @@ -211,7 +215,8 @@ impl FlussAdmin { /// Check if a table exists pub async fn table_exists(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(TableExistsRequest::new(table_path)) .await?; Ok(response.exists) @@ -225,7 +230,8 @@ impl FlussAdmin { cascade: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropDatabaseRequest::new( database_name, ignore_if_not_exists, @@ -238,7 +244,8 @@ impl FlussAdmin { /// List all databases pub async fn list_databases(&self) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListDatabasesRequest::new()) .await?; Ok(response.database_name) @@ -247,7 +254,8 @@ impl FlussAdmin { /// Check if a database exists pub async fn database_exists(&self, database_name: &str) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(DatabaseExistsRequest::new(database_name)) .await?; Ok(response.exists) @@ -256,7 +264,7 @@ impl FlussAdmin { /// Get database information pub async fn get_database_info(&self, database_name: &str) -> Result { let request = GetDatabaseInfoRequest::new(database_name); - let response = self.admin_gateway.request(request).await?; + let response = self.admin_gateway().await?.request(request).await?; // Convert proto response to DatabaseInfo let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?; @@ -279,7 +287,8 @@ impl FlussAdmin { /// Get the latest lake snapshot for a table pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(GetLatestLakeSnapshotRequest::new(table_path)) .await?; diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 9b657ce9..23d85784 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -23,9 +23,7 @@ use crate::config::Config; use crate::rpc::RpcClient; use parking_lot::RwLock; use std::sync::Arc; -use tokio::sync::OnceCell; use std::time::Duration; - use crate::error::{Error, FlussError, Result}; use crate::metadata::TablePath; @@ -34,7 +32,7 @@ pub struct FlussConnection { network_connects: Arc, args: Config, writer_client: RwLock>>, - admin_client: OnceCell, + admin_client: RwLock>>, } impl FlussConnection { @@ -62,7 +60,7 @@ impl FlussConnection { network_connects: connections.clone(), args: arg.clone(), writer_client: Default::default(), - admin_client: OnceCell::new(), + admin_client: RwLock::new(None), }) } @@ -78,17 +76,24 @@ impl FlussConnection { &self.args } - pub async fn get_admin(&self) -> Result { - // Lazily initialize and cache the FlussAdmin instance. The cached FlussAdmin - // holds a reference to RpcClient, which manages connection reuse and re-acquisition - // when a cached connection becomes poisoned. Subsequent calls clone cheaply — - // all internal fields (ServerConnection, Arc, Arc) are - // Arc-backed so cloning is just a reference-count bump. - let admin = self - .admin_client - .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone())) - .await?; - Ok(admin.clone()) + pub async fn get_admin(&self) -> Result> { + // 1. Fast path: return cached instance if already initialized. + if let Some(admin) = self.admin_client.read().as_ref() { + return Ok(admin.clone()); + } + + // 2. Slow path: acquire write lock. + let mut admin_guard = self.admin_client.write(); + + // 3. Double-check: another thread may have initialized while we waited. + if let Some(admin) = admin_guard.as_ref() { + return Ok(admin.clone()); + } + + // 4. Initialize and cache. + let admin = Arc::new(FlussAdmin::new(self.network_connects.clone(), self.metadata.clone())); + *admin_guard = Some(admin.clone()); + Ok(admin) } pub fn get_or_create_writer_client(&self) -> Result> { diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index b53abc86..baa74452 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -109,7 +109,8 @@ pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) { let connection = cluster .get_fluss_connection_with_sasl(username, password) .await; - if connection.get_admin().await.is_ok() + let admin = connection.get_admin().await.expect("get_admin should not fail"); + if admin.list_databases().await.is_ok() && connection .get_metadata() .get_cluster()