Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ pub struct Connection {
}

pub struct Admin {
inner: fcore::client::FlussAdmin,
inner: Arc<fcore::client::FlussAdmin>,
}

pub struct Table {
Expand Down
1 change: 0 additions & 1 deletion bindings/cpp/test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ class FlussTestEnvironment : public ::testing::Environment {
if (result.Ok()) {
auto admin_result = connection_.GetAdmin(admin_);
if (admin_result.Ok()) {
// check tablet server is available
std::vector<fluss::ServerNode> nodes;
auto nodes_result = admin_.GetServerNodes(nodes);
if (nodes_result.Ok() &&
Expand Down
6 changes: 2 additions & 4 deletions bindings/python/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,8 @@ impl FlussAdmin {

impl FlussAdmin {
// Internal method to create FlussAdmin from core admin
pub fn from_core(admin: fcore::client::FlussAdmin) -> Self {
Self {
__admin: Arc::new(admin),
}
pub fn from_core(admin: Arc<fcore::client::FlussAdmin>) -> Self {
Self { __admin: admin }
}
}

Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ macro_rules! with_scanner {
#[pyclass]
pub struct LogScanner {
scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
admin: Arc<fcore::client::FlussAdmin>,
table_info: fcore::metadata::TableInfo,
/// The projected Arrow schema to use for empty table creation
projected_schema: SchemaRef,
Expand Down Expand Up @@ -2207,7 +2207,7 @@ impl LogScanner {
impl LogScanner {
fn new(
scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
admin: Arc<fcore::client::FlussAdmin>,
table_info: fcore::metadata::TableInfo,
projected_schema: SchemaRef,
projected_row_type: fcore::metadata::RowType,
Expand Down
10 changes: 5 additions & 5 deletions crates/examples/src/example_partitioned_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn main() -> Result<()> {

let table_path = TablePath::new("fluss", "partitioned_kv_example");

let mut admin = conn.get_admin().await?;
let admin = conn.get_admin().await?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
Expand All @@ -55,9 +55,9 @@ pub async fn main() -> Result<()> {
admin.get_table_info(&table_path).await?
);

create_partition(&table_path, &mut admin, "APAC", 1).await;
create_partition(&table_path, &mut admin, "EMEA", 2).await;
create_partition(&table_path, &mut admin, "US", 3).await;
create_partition(&table_path, &admin, "APAC", 1).await;
create_partition(&table_path, &admin, "EMEA", 2).await;
create_partition(&table_path, &admin, "US", 3).await;

let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
Expand Down Expand Up @@ -129,7 +129,7 @@ pub async fn main() -> Result<()> {
Ok(())
}

async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) {
async fn create_partition(table_path: &TablePath, admin: &FlussAdmin, region: &str, zone: i64) {
let mut partition_values = HashMap::new();
partition_values.insert("region".to_string(), region.to_string());
partition_values.insert("zone".to_string(), zone.to_string());
Expand Down
73 changes: 42 additions & 31 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,28 @@ use std::sync::Arc;
use tokio::task::JoinHandle;

pub struct FlussAdmin {
admin_gateway: ServerConnection,
#[allow(dead_code)]
metadata: Arc<Metadata>,
#[allow(dead_code)]
rpc_client: Arc<RpcClient>,
}

impl FlussAdmin {
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> {
let admin_con =
connections
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
},
)?)
.await?;

Ok(FlussAdmin {
admin_gateway: admin_con,
pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
FlussAdmin {
metadata,
rpc_client: connections,
})
}
}
Comment on lines 45 to +51
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussAdmin::new changed from an async fn returning Result<Self> to a sync constructor returning Self. Since FlussAdmin is a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).

Consider keeping the existing public constructor signature (or adding a new new_unchecked/from_parts constructor for internal use) so callers depending on fallible/async initialization don’t break.

Copilot uses AI. Check for mistakes.

async fn admin_gateway(&self) -> Result<ServerConnection> {
let cluster = self.metadata.get_cluster();
let coordinator =
cluster
.get_coordinator_server()
.ok_or_else(|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
})?;
self.rpc_client.get_connection(coordinator).await
}

pub async fn create_database(
Expand All @@ -71,7 +69,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreateDatabaseRequest::new(
database_name,
database_descriptor,
Expand All @@ -88,7 +87,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreateTableRequest::new(
table_path,
table_descriptor,
Expand All @@ -104,15 +104,17 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropTableRequest::new(table_path, ignore_if_not_exists))
.await?;
Ok(())
}

pub async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(GetTableRequest::new(table_path))
.await?;

Expand Down Expand Up @@ -144,7 +146,8 @@ impl FlussAdmin {
/// List all tables in the given database
pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListTablesRequest::new(database_name))
.await?;
Ok(response.table_name)
Expand All @@ -162,7 +165,8 @@ impl FlussAdmin {
partial_partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<PartitionInfo>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListPartitionInfosRequest::new(
table_path,
partial_partition_spec,
Expand All @@ -179,7 +183,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreatePartitionRequest::new(
table_path,
partition_spec,
Expand All @@ -197,7 +202,8 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropPartitionRequest::new(
table_path,
partition_spec,
Expand All @@ -210,7 +216,8 @@ impl FlussAdmin {
/// Check if a table exists
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(TableExistsRequest::new(table_path))
.await?;
Ok(response.exists)
Expand All @@ -224,7 +231,8 @@ impl FlussAdmin {
cascade: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropDatabaseRequest::new(
database_name,
ignore_if_not_exists,
Expand All @@ -237,7 +245,8 @@ impl FlussAdmin {
/// List all databases
pub async fn list_databases(&self) -> Result<Vec<String>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListDatabasesRequest::new())
.await?;
Ok(response.database_name)
Expand All @@ -246,7 +255,8 @@ impl FlussAdmin {
/// Check if a database exists
pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(DatabaseExistsRequest::new(database_name))
.await?;
Ok(response.exists)
Expand All @@ -255,7 +265,7 @@ impl FlussAdmin {
/// Get database information
pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> {
let request = GetDatabaseInfoRequest::new(database_name);
let response = self.admin_gateway.request(request).await?;
let response = self.admin_gateway().await?.request(request).await?;

// Convert proto response to DatabaseInfo
let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?;
Expand All @@ -278,7 +288,8 @@ impl FlussAdmin {
/// Get the latest lake snapshot for a table
pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(GetLatestLakeSnapshotRequest::new(table_path))
.await?;

Expand Down
27 changes: 25 additions & 2 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::client::admin::FlussAdmin;
use crate::client::metadata::Metadata;
use crate::client::table::FlussTable;
use crate::config::Config;
use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;
use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
Expand All @@ -37,6 +39,7 @@ pub struct FlussConnection {
network_connects: Arc<RpcClient>,
args: Config,
writer_client: RwLock<Option<Arc<WriterClient>>>,
admin_client: RwLock<Option<Arc<FlussAdmin>>>,
}

impl FlussConnection {
Expand Down Expand Up @@ -66,6 +69,7 @@ impl FlussConnection {
network_connects: connections.clone(),
args: arg.clone(),
writer_client: Default::default(),
admin_client: RwLock::new(None),
})
}

Expand All @@ -81,8 +85,27 @@ impl FlussConnection {
&self.args
}

pub async fn get_admin(&self) -> Result<FlussAdmin> {
FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await
pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
// 1. Fast path: return cached instance if already initialized.
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
}

// 2. Slow path: acquire write lock.
let mut admin_guard = self.admin_client.write();

// 3. Double-check: another thread may have initialized while we waited.
if let Some(admin) = admin_guard.as_ref() {
return Ok(admin.clone());
}

// 4. Initialize and cache.
let admin = Arc::new(FlussAdmin::new(
self.network_connects.clone(),
self.metadata.clone(),
));
*admin_guard = Some(admin.clone());
Ok(admin)
}

pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
Expand Down
11 changes: 5 additions & 6 deletions crates/fluss/tests/integration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@ pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) {
let connection = cluster
.get_fluss_connection_with_sasl(username, password)
.await;
if connection.get_admin().await.is_ok()
Copy link
Contributor

@fresh-borzoni fresh-borzoni Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shall just remove this check and rely on get_one_available_server() later in the code, no need for list_databases check anywhere.

&& connection
.get_metadata()
.get_cluster()
.get_one_available_server()
.is_some()
if connection
.get_metadata()
.get_cluster()
.get_one_available_server()
.is_some()
{
return;
}
Expand Down