fix: added check to get_admin() before creating new admin#369
fix: added check to get_admin() before creating new admin#369toxicteddy00077 wants to merge 3 commits intoapache:mainfrom
Conversation
|
@luoyuxia could you review this, and help me refine this further? |
There was a problem hiding this comment.
Pull request overview
This PR addresses issue #319 by adding lazy caching for the FlussAdmin instance inside FlussConnection so repeated get_admin() calls don’t create a brand-new admin client each time.
Changes:
- Added an
admin_clientcache (RwLock<Option<Arc<FlussAdmin>>>) toFlussConnection. - Updated
get_admin()to return a cached admin instance when available (otherwise initialize and store one). - Made
FlussAdminclonable to preserve the existingResult<FlussAdmin>return type.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
crates/fluss/src/client/connection.rs |
Adds cached storage and a double-checked init path for get_admin() to avoid per-call construction. |
crates/fluss/src/client/admin.rs |
Adds Clone to FlussAdmin so cached admin can be returned without changing the public signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
charlesdong1991
left a comment
There was a problem hiding this comment.
Nice PR! Left a couple comments!
| // under the License. | ||
|
|
||
| use crate::client::WriterClient; | ||
| use crate::client::{WriterClient, admin}; |
There was a problem hiding this comment.
FlussAdmin is imported already below, by adding admin, it will it can be reached in 2 ways. i think we should have a consistent way for module import.
5293c9d to
7a122a6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use std::sync::Arc; | ||
| use tokio::task::JoinHandle; | ||
|
|
||
| #[derive(Clone)] |
There was a problem hiding this comment.
Adding #[derive(Clone)] to the public FlussAdmin type expands the public API surface and implicitly documents that cloning is a supported operation. Since a clone will share the same underlying ServerConnection (it’s an Arc), this can be surprising if callers expect an independent/fresh connection. If cloning isn’t required for this PR, consider removing it; if it is required, consider documenting the clone semantics explicitly (or returning Arc<FlussAdmin> instead of relying on Clone).
| #[derive(Clone)] |
7a122a6 to
f3c415e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let admin = self | ||
| .admin_client | ||
| .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone())) | ||
| .await?; | ||
| Ok(admin.clone()) |
There was a problem hiding this comment.
Caching FlussAdmin in a tokio::sync::OnceCell makes the admin instance effectively permanent for the lifetime of the connection. If the coordinator changes (metadata refresh) or the underlying ServerConnection becomes poisoned, get_admin() will keep returning the same cached instance and callers may be unable to recover without constructing a new FlussConnection. Consider using a cache that supports invalidation/refresh (e.g., async lock around an Option<FlussAdmin>), or make FlussAdmin acquire/refresh its admin_gateway on demand when the current connection is poisoned or when requests return InvalidCoordinatorException.
5c377cb to
dac8f93
Compare
|
@charlesdong1991 Please have a look at the latest commit and let me know the general direction i should take. Is using |
|
i think for the purpose of this ticket, we can use OneCell, and direction is right. But i think we should avoid |
|
Thanks for working on this @toxicteddy00077! The direction is right, we want to cache the admin like Java does. But we need to fix one thing first: FlussAdmin currently stores a concrete ServerConnection at construction time, which means a cached admin would be stuck with a dead connection if the coordinator restarts. Java avoids this because its FlussAdmin resolves the coordinator per-call via GatewayClientProxy. Here's how to do it: // admin.rs:
async fn admin_gateway(&self) -> Result<ServerConnection> {
let coordinator = self
.metadata
.get_cluster()
.get_coordinator_server()
.ok_or_else(|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
})?;
self.rpc_client.get_connection(coordinator).await
}
// connection.rs: But if we can still break the signature, I would just go with Arc and sync method, and no Clone derive with Arc |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
dac8f93 to
d75ef69
Compare
|
Thank you very much @fresh-borzoni, I have understood and made the changes, seems to test fine. Like you mentioned, I have just used |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| impl FlussAdmin { | ||
| pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> { | ||
| let admin_con = | ||
| connections | ||
| .get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else( | ||
| || Error::UnexpectedError { | ||
| message: "Coordinator server not found in cluster metadata".to_string(), | ||
| source: None, | ||
| }, | ||
| )?) | ||
| .await?; | ||
|
|
||
| Ok(FlussAdmin { | ||
| admin_gateway: admin_con, | ||
| pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self { | ||
| FlussAdmin { | ||
| metadata, | ||
| rpc_client: connections, | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
FlussAdmin::new changed from an async fn returning Result<Self> to a sync constructor returning Self. Since FlussAdmin is a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).
Consider keeping the existing public constructor signature (or adding a new new_unchecked/from_parts constructor for internal use) so callers depending on fallible/async initialization don’t break.
| pub async fn get_admin(&self) -> Result<FlussAdmin> { | ||
| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await | ||
| // 1. Fast path: return cached instance if already initialized. | ||
| if let Some(admin) = self.admin_client.read().as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 2. Slow path: acquire write lock. | ||
| let mut admin_guard = self.admin_client.write(); | ||
|
|
||
| // 3. Double-check: another thread may have initialized while we waited. | ||
| if let Some(admin) = admin_guard.as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 4. Initialize and cache. | ||
| let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()); | ||
| *admin_guard = Some(admin.clone()); | ||
| Ok(admin) |
There was a problem hiding this comment.
get_admin() now always succeeds and caches a FlussAdmin without verifying that the coordinator exists / is reachable. This is a behavior change: previously get_admin() could fail early (e.g., missing coordinator in metadata), and the integration readiness check in tests/integration/utils.rs relies on get_admin().await.is_ok() as part of cluster readiness.
If get_admin() is intended to remain a readiness/validation point, consider performing a lightweight async validation (e.g., resolve coordinator + RpcClient::get_connection) before caching/returning. Since that introduces an await, prefer an async single-init primitive (tokio::sync::OnceCell / async mutex) rather than holding a parking_lot lock across an await.
fresh-borzoni
left a comment
There was a problem hiding this comment.
@toxicteddy00077 TY, LGTM overall, left comment, PTAL
let's be consistent with Arcs, it will change public API signature, but Arc auto-derefs to FlussAdmin, so it will only break on explicit type annotation, which is low chance.
| network_connects: Arc<RpcClient>, | ||
| args: Config, | ||
| writer_client: RwLock<Option<Arc<WriterClient>>>, | ||
| admin_client: RwLock<Option<FlussAdmin>>, |
There was a problem hiding this comment.
It's better to use RwLock<Option<Arc<FlussAdmin>>>
Purpose
Linked issue: close #319
Description
I have followed a similar patter as the
get_or_create_writer_clientwith all necessary async checks .However, since the return type is stillResult<FlussAdmin>, I return a clone for now. The correct way would be to useResult<Arc<FlussAdmin>>. Please let me know if I have overlooked some part or misunderstood the issue.