fix: added check to get_admin() before creating new admin#369
fix: added check to get_admin() before creating new admin#369toxicteddy00077 wants to merge 4 commits intoapache:mainfrom
Conversation
|
@luoyuxia could you review this, and help me refine this further? |
There was a problem hiding this comment.
Pull request overview
This PR addresses issue #319 by adding lazy caching for the FlussAdmin instance inside FlussConnection so repeated get_admin() calls don’t create a brand-new admin client each time.
Changes:
- Added an
admin_clientcache (RwLock<Option<Arc<FlussAdmin>>>) toFlussConnection. - Updated
get_admin()to return a cached admin instance when available (otherwise initialize and store one). - Made
FlussAdminclonable to preserve the existingResult<FlussAdmin>return type.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
crates/fluss/src/client/connection.rs |
Adds cached storage and a double-checked init path for get_admin() to avoid per-call construction. |
crates/fluss/src/client/admin.rs |
Adds Clone to FlussAdmin so cached admin can be returned without changing the public signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
charlesdong1991
left a comment
There was a problem hiding this comment.
Nice PR! Left a couple comments!
| // under the License. | ||
|
|
||
| use crate::client::WriterClient; | ||
| use crate::client::{WriterClient, admin}; |
There was a problem hiding this comment.
FlussAdmin is imported already below, by adding admin, it will it can be reached in 2 ways. i think we should have a consistent way for module import.
5293c9d to
7a122a6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
crates/fluss/src/client/admin.rs
Outdated
| use std::sync::Arc; | ||
| use tokio::task::JoinHandle; | ||
|
|
||
| #[derive(Clone)] |
There was a problem hiding this comment.
Adding #[derive(Clone)] to the public FlussAdmin type expands the public API surface and implicitly documents that cloning is a supported operation. Since a clone will share the same underlying ServerConnection (it’s an Arc), this can be surprising if callers expect an independent/fresh connection. If cloning isn’t required for this PR, consider removing it; if it is required, consider documenting the clone semantics explicitly (or returning Arc<FlussAdmin> instead of relying on Clone).
| #[derive(Clone)] |
There was a problem hiding this comment.
I don't think we need Clone anymore, since we use Arc now
7a122a6 to
f3c415e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let admin = self | ||
| .admin_client | ||
| .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone())) | ||
| .await?; | ||
| Ok(admin.clone()) |
There was a problem hiding this comment.
Caching FlussAdmin in a tokio::sync::OnceCell makes the admin instance effectively permanent for the lifetime of the connection. If the coordinator changes (metadata refresh) or the underlying ServerConnection becomes poisoned, get_admin() will keep returning the same cached instance and callers may be unable to recover without constructing a new FlussConnection. Consider using a cache that supports invalidation/refresh (e.g., async lock around an Option<FlussAdmin>), or make FlussAdmin acquire/refresh its admin_gateway on demand when the current connection is poisoned or when requests return InvalidCoordinatorException.
5c377cb to
dac8f93
Compare
|
@charlesdong1991 Please have a look at the latest commit and let me know the general direction i should take. Is using |
|
i think for the purpose of this ticket, we can use OneCell, and direction is right. But i think we should avoid |
|
Thanks for working on this @toxicteddy00077! The direction is right, we want to cache the admin like Java does. But we need to fix one thing first: FlussAdmin currently stores a concrete ServerConnection at construction time, which means a cached admin would be stuck with a dead connection if the coordinator restarts. Java avoids this because its FlussAdmin resolves the coordinator per-call via GatewayClientProxy. Here's how to do it: // admin.rs:
async fn admin_gateway(&self) -> Result<ServerConnection> {
let coordinator = self
.metadata
.get_cluster()
.get_coordinator_server()
.ok_or_else(|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
})?;
self.rpc_client.get_connection(coordinator).await
}
// connection.rs: But if we can still break the signature, I would just go with Arc and sync method, and no Clone derive with Arc |
dac8f93 to
d75ef69
Compare
|
Thank you very much @fresh-borzoni, I have understood and made the changes, seems to test fine. Like you mentioned, I have just used |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| impl FlussAdmin { | ||
| pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> { | ||
| let admin_con = | ||
| connections | ||
| .get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else( | ||
| || Error::UnexpectedError { | ||
| message: "Coordinator server not found in cluster metadata".to_string(), | ||
| source: None, | ||
| }, | ||
| )?) | ||
| .await?; | ||
|
|
||
| Ok(FlussAdmin { | ||
| admin_gateway: admin_con, | ||
| pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self { | ||
| FlussAdmin { | ||
| metadata, | ||
| rpc_client: connections, | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
FlussAdmin::new changed from an async fn returning Result<Self> to a sync constructor returning Self. Since FlussAdmin is a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).
Consider keeping the existing public constructor signature (or adding a new new_unchecked/from_parts constructor for internal use) so callers depending on fallible/async initialization don’t break.
| pub async fn get_admin(&self) -> Result<FlussAdmin> { | ||
| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await | ||
| // 1. Fast path: return cached instance if already initialized. | ||
| if let Some(admin) = self.admin_client.read().as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 2. Slow path: acquire write lock. | ||
| let mut admin_guard = self.admin_client.write(); | ||
|
|
||
| // 3. Double-check: another thread may have initialized while we waited. | ||
| if let Some(admin) = admin_guard.as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 4. Initialize and cache. | ||
| let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()); | ||
| *admin_guard = Some(admin.clone()); | ||
| Ok(admin) |
There was a problem hiding this comment.
get_admin() now always succeeds and caches a FlussAdmin without verifying that the coordinator exists / is reachable. This is a behavior change: previously get_admin() could fail early (e.g., missing coordinator in metadata), and the integration readiness check in tests/integration/utils.rs relies on get_admin().await.is_ok() as part of cluster readiness.
If get_admin() is intended to remain a readiness/validation point, consider performing a lightweight async validation (e.g., resolve coordinator + RpcClient::get_connection) before caching/returning. Since that introduces an await, prefer an async single-init primitive (tokio::sync::OnceCell / async mutex) rather than holding a parking_lot lock across an await.
There was a problem hiding this comment.
PTAL, if some integration tests depend on get_admin.await.is_ok(), we need to fix it.
I think that we call admin.get_server_nodes() in in python at least, so just check that other clients do this as well.
fresh-borzoni
left a comment
There was a problem hiding this comment.
@toxicteddy00077 TY, LGTM overall, left comment, PTAL
let's be consistent with Arcs, it will change public API signature, but Arc auto-derefs to FlussAdmin, so it will only break on explicit type annotation, which is low chance.
| network_connects: Arc<RpcClient>, | ||
| args: Config, | ||
| writer_client: RwLock<Option<Arc<WriterClient>>>, | ||
| admin_client: RwLock<Option<FlussAdmin>>, |
There was a problem hiding this comment.
It's better to use RwLock<Option<Arc<FlussAdmin>>>
There was a problem hiding this comment.
I've done as you mentioned. It does break bindings, should I correct those as well?
7c1fa55 to
37a2c4d
Compare
There was a problem hiding this comment.
@toxicteddy00077 Looked through, left comment. Also integration tests readiness check is still not fixed. PTAL
crates/fluss/src/client/admin.rs
Outdated
| use std::sync::Arc; | ||
| use tokio::task::JoinHandle; | ||
|
|
||
| #[derive(Clone)] |
There was a problem hiding this comment.
I don't think we need Clone anymore, since we use Arc now
37a2c4d to
a312e02
Compare
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
a312e02 to
44f8f47
Compare
fresh-borzoni
left a comment
There was a problem hiding this comment.
@toxicteddy00077 TY, left some comments PTAL
I don't think we need list_databases check
| } | ||
|
|
||
| async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) { | ||
| async fn create_partition(table_path: &TablePath, admin: &Arc<FlussAdmin>, region: &str, zone: i64) { |
There was a problem hiding this comment.
why not &FlussAdmin?
There was a problem hiding this comment.
Good catch, refactored
bindings/python/test/conftest.py
Outdated
| start = time.time() | ||
| while time.time() - start < timeout: | ||
| try: | ||
| async def _probe(): |
There was a problem hiding this comment.
it's wasteful to redefine on every loop iteration
bindings/python/test/conftest.py
Outdated
| return subprocess.run(cmd, capture_output=True).returncode | ||
|
|
||
|
|
||
| def _wait_for_coordinator_ready(host, port, timeout=60): |
There was a problem hiding this comment.
tbh, this is not necessary for python, we have _connect_with_retry
| let connection = cluster | ||
| .get_fluss_connection_with_sasl(username, password) | ||
| .await; | ||
| if connection.get_admin().await.is_ok() |
There was a problem hiding this comment.
I think we shall just remove this check and rely on get_one_available_server() later in the code, no need for list_databases check anywhere.
502b549 to
63d6da7
Compare
fresh-borzoni
left a comment
There was a problem hiding this comment.
@toxicteddy00077 Ty, LGTM
cc @luoyuxia
current implementation assumes that we don't want to break existing API using get_admin, so it stays async/Result as it was before, the cleaner version would have been to just have Arc ofc
Purpose
Linked issue: close #319
Description
I have followed a similar patter as the
get_or_create_writer_clientwith all necessary async checks .However, since the return type is stillResult<FlussAdmin>, I return a clone for now. The correct way would be to useResult<Arc<FlussAdmin>>. Please let me know if I have overlooked some part or misunderstood the issue.