diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 284eec89..82254ea9 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -556,7 +556,7 @@ pub struct Connection { } pub struct Admin { - inner: fcore::client::FlussAdmin, + inner: Arc, } pub struct Table { diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index 17a1da76..1ff7e281 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -306,7 +306,6 @@ class FlussTestEnvironment : public ::testing::Environment { if (result.Ok()) { auto admin_result = connection_.GetAdmin(admin_); if (admin_result.Ok()) { - // check tablet server is available std::vector nodes; auto nodes_result = admin_.GetServerNodes(nodes); if (nodes_result.Ok() && diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index 703b1334..5f4e45d5 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -532,10 +532,8 @@ impl FlussAdmin { impl FlussAdmin { // Internal method to create FlussAdmin from core admin - pub fn from_core(admin: fcore::client::FlussAdmin) -> Self { - Self { - __admin: Arc::new(admin), - } + pub fn from_core(admin: Arc) -> Self { + Self { __admin: admin } } } diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 660cd6be..8c9ea0e4 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1902,7 +1902,7 @@ macro_rules! with_scanner { #[pyclass] pub struct LogScanner { scanner: ScannerKind, - admin: fcore::client::FlussAdmin, + admin: Arc, table_info: fcore::metadata::TableInfo, /// The projected Arrow schema to use for empty table creation projected_schema: SchemaRef, @@ -2207,7 +2207,7 @@ impl LogScanner { impl LogScanner { fn new( scanner: ScannerKind, - admin: fcore::client::FlussAdmin, + admin: Arc, table_info: fcore::metadata::TableInfo, projected_schema: SchemaRef, projected_row_type: fcore::metadata::RowType, diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index 9cd2e7df..ba49934e 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -46,7 +46,7 @@ pub async fn main() -> Result<()> { let table_path = TablePath::new("fluss", "partitioned_kv_example"); - let mut admin = conn.get_admin().await?; + let admin = conn.get_admin().await?; admin .create_table(&table_path, &table_descriptor, true) .await?; @@ -55,9 +55,9 @@ pub async fn main() -> Result<()> { admin.get_table_info(&table_path).await? ); - create_partition(&table_path, &mut admin, "APAC", 1).await; - create_partition(&table_path, &mut admin, "EMEA", 2).await; - create_partition(&table_path, &mut admin, "US", 3).await; + create_partition(&table_path, &admin, "APAC", 1).await; + create_partition(&table_path, &admin, "EMEA", 2).await; + create_partition(&table_path, &admin, "US", 3).await; let table = conn.get_table(&table_path).await?; let table_upsert = table.new_upsert()?; @@ -129,7 +129,7 @@ pub async fn main() -> Result<()> { Ok(()) } -async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) { +async fn create_partition(table_path: &TablePath, admin: &FlussAdmin, region: &str, zone: i64) { let mut partition_values = HashMap::new(); partition_values.insert("region".to_string(), region.to_string()); partition_values.insert("zone".to_string(), zone.to_string()); diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 7a79e5ed..7f1f64ef 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -38,30 +38,28 @@ use std::sync::Arc; use tokio::task::JoinHandle; pub struct FlussAdmin { - admin_gateway: ServerConnection, - #[allow(dead_code)] metadata: Arc, - #[allow(dead_code)] rpc_client: Arc, } impl FlussAdmin { - pub async fn new(connections: Arc, metadata: Arc) -> Result { - let admin_con = - connections - .get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else( - || Error::UnexpectedError { - message: "Coordinator server not found in cluster metadata".to_string(), - source: None, - }, - )?) - .await?; - - Ok(FlussAdmin { - admin_gateway: admin_con, + pub fn new(connections: Arc, metadata: Arc) -> Self { + FlussAdmin { metadata, rpc_client: connections, - }) + } + } + + async fn admin_gateway(&self) -> Result { + let cluster = self.metadata.get_cluster(); + let coordinator = + cluster + .get_coordinator_server() + .ok_or_else(|| Error::UnexpectedError { + message: "Coordinator server not found in cluster metadata".to_string(), + source: None, + })?; + self.rpc_client.get_connection(coordinator).await } pub async fn create_database( @@ -71,7 +69,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreateDatabaseRequest::new( database_name, database_descriptor, @@ -88,7 +87,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreateTableRequest::new( table_path, table_descriptor, @@ -104,7 +104,8 @@ impl FlussAdmin { ignore_if_not_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropTableRequest::new(table_path, ignore_if_not_exists)) .await?; Ok(()) @@ -112,7 +113,8 @@ impl FlussAdmin { pub async fn get_table_info(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(GetTableRequest::new(table_path)) .await?; @@ -144,7 +146,8 @@ impl FlussAdmin { /// List all tables in the given database pub async fn list_tables(&self, database_name: &str) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListTablesRequest::new(database_name)) .await?; Ok(response.table_name) @@ -162,7 +165,8 @@ impl FlussAdmin { partial_partition_spec: Option<&PartitionSpec>, ) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListPartitionInfosRequest::new( table_path, partial_partition_spec, @@ -179,7 +183,8 @@ impl FlussAdmin { ignore_if_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(CreatePartitionRequest::new( table_path, partition_spec, @@ -197,7 +202,8 @@ impl FlussAdmin { ignore_if_not_exists: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropPartitionRequest::new( table_path, partition_spec, @@ -210,7 +216,8 @@ impl FlussAdmin { /// Check if a table exists pub async fn table_exists(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(TableExistsRequest::new(table_path)) .await?; Ok(response.exists) @@ -224,7 +231,8 @@ impl FlussAdmin { cascade: bool, ) -> Result<()> { let _response = self - .admin_gateway + .admin_gateway() + .await? .request(DropDatabaseRequest::new( database_name, ignore_if_not_exists, @@ -237,7 +245,8 @@ impl FlussAdmin { /// List all databases pub async fn list_databases(&self) -> Result> { let response = self - .admin_gateway + .admin_gateway() + .await? .request(ListDatabasesRequest::new()) .await?; Ok(response.database_name) @@ -246,7 +255,8 @@ impl FlussAdmin { /// Check if a database exists pub async fn database_exists(&self, database_name: &str) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(DatabaseExistsRequest::new(database_name)) .await?; Ok(response.exists) @@ -255,7 +265,7 @@ impl FlussAdmin { /// Get database information pub async fn get_database_info(&self, database_name: &str) -> Result { let request = GetDatabaseInfoRequest::new(database_name); - let response = self.admin_gateway.request(request).await?; + let response = self.admin_gateway().await?.request(request).await?; // Convert proto response to DatabaseInfo let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?; @@ -278,7 +288,8 @@ impl FlussAdmin { /// Get the latest lake snapshot for a table pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result { let response = self - .admin_gateway + .admin_gateway() + .await? .request(GetLatestLakeSnapshotRequest::new(table_path)) .await?; diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 78e9362b..a66f8c59 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -20,6 +20,8 @@ use crate::client::admin::FlussAdmin; use crate::client::metadata::Metadata; use crate::client::table::FlussTable; use crate::config::Config; +use crate::error::{Error, FlussError, Result}; +use crate::metadata::TablePath; use crate::rpc::RpcClient; use parking_lot::RwLock; use std::sync::Arc; @@ -37,6 +39,7 @@ pub struct FlussConnection { network_connects: Arc, args: Config, writer_client: RwLock>>, + admin_client: RwLock>>, } impl FlussConnection { @@ -66,6 +69,7 @@ impl FlussConnection { network_connects: connections.clone(), args: arg.clone(), writer_client: Default::default(), + admin_client: RwLock::new(None), }) } @@ -81,8 +85,27 @@ impl FlussConnection { &self.args } - pub async fn get_admin(&self) -> Result { - FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await + pub async fn get_admin(&self) -> Result> { + // 1. Fast path: return cached instance if already initialized. + if let Some(admin) = self.admin_client.read().as_ref() { + return Ok(admin.clone()); + } + + // 2. Slow path: acquire write lock. + let mut admin_guard = self.admin_client.write(); + + // 3. Double-check: another thread may have initialized while we waited. + if let Some(admin) = admin_guard.as_ref() { + return Ok(admin.clone()); + } + + // 4. Initialize and cache. + let admin = Arc::new(FlussAdmin::new( + self.network_connects.clone(), + self.metadata.clone(), + )); + *admin_guard = Some(admin.clone()); + Ok(admin) } pub fn get_or_create_writer_client(&self) -> Result> { diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index b53abc86..970b84ae 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -109,12 +109,11 @@ pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) { let connection = cluster .get_fluss_connection_with_sasl(username, password) .await; - if connection.get_admin().await.is_ok() - && connection - .get_metadata() - .get_cluster() - .get_one_available_server() - .is_some() + if connection + .get_metadata() + .get_cluster() + .get_one_available_server() + .is_some() { return; }