Skip to content

fix: added check to get_admin() before creating new admin#369

Open
toxicteddy00077 wants to merge 3 commits intoapache:mainfrom
toxicteddy00077:fluss-admin-inst-and-cache
Open

fix: added check to get_admin() before creating new admin#369
toxicteddy00077 wants to merge 3 commits intoapache:mainfrom
toxicteddy00077:fluss-admin-inst-and-cache

Conversation

@toxicteddy00077
Copy link

Purpose

Linked issue: close #319

Description

I have followed a similar patter as the get_or_create_writer_client with all necessary async checks .However, since the return type is still Result<FlussAdmin>, I return a clone for now. The correct way would be to use Result<Arc<FlussAdmin>>. Please let me know if I have overlooked some part or misunderstood the issue.

@toxicteddy00077
Copy link
Author

@luoyuxia could you review this, and help me refine this further?

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses issue #319 by adding lazy caching for the FlussAdmin instance inside FlussConnection so repeated get_admin() calls don’t create a brand-new admin client each time.

Changes:

  • Added an admin_client cache (RwLock<Option<Arc<FlussAdmin>>>) to FlussConnection.
  • Updated get_admin() to return a cached admin instance when available (otherwise initialize and store one).
  • Made FlussAdmin clonable to preserve the existing Result<FlussAdmin> return type.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
crates/fluss/src/client/connection.rs Adds cached storage and a double-checked init path for get_admin() to avoid per-call construction.
crates/fluss/src/client/admin.rs Adds Clone to FlussAdmin so cached admin can be returned without changing the public signature.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@charlesdong1991 charlesdong1991 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice PR! Left a couple comments!

// under the License.

use crate::client::WriterClient;
use crate::client::{WriterClient, admin};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussAdmin is imported already below, by adding admin, it will it can be reached in 2 ways. i think we should have a consistent way for module import.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

use std::sync::Arc;
use tokio::task::JoinHandle;

#[derive(Clone)]
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding #[derive(Clone)] to the public FlussAdmin type expands the public API surface and implicitly documents that cloning is a supported operation. Since a clone will share the same underlying ServerConnection (it’s an Arc), this can be surprising if callers expect an independent/fresh connection. If cloning isn’t required for this PR, consider removing it; if it is required, consider documenting the clone semantics explicitly (or returning Arc<FlussAdmin> instead of relying on Clone).

Suggested change
#[derive(Clone)]

Copilot uses AI. Check for mistakes.
@toxicteddy00077 toxicteddy00077 force-pushed the fluss-admin-inst-and-cache branch from 7a122a6 to f3c415e Compare March 2, 2026 06:30
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +87 to +91
let admin = self
.admin_client
.get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()))
.await?;
Ok(admin.clone())
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching FlussAdmin in a tokio::sync::OnceCell makes the admin instance effectively permanent for the lifetime of the connection. If the coordinator changes (metadata refresh) or the underlying ServerConnection becomes poisoned, get_admin() will keep returning the same cached instance and callers may be unable to recover without constructing a new FlussConnection. Consider using a cache that supports invalidation/refresh (e.g., async lock around an Option<FlussAdmin>), or make FlussAdmin acquire/refresh its admin_gateway on demand when the current connection is poisoned or when requests return InvalidCoordinatorException.

Copilot uses AI. Check for mistakes.
@toxicteddy00077 toxicteddy00077 force-pushed the fluss-admin-inst-and-cache branch from 5c377cb to dac8f93 Compare March 3, 2026 17:11
@toxicteddy00077
Copy link
Author

@charlesdong1991 Please have a look at the latest commit and let me know the general direction i should take. Is using OneCell<FlussAdmin> the correct approach for the admin client type?

@charlesdong1991
Copy link
Contributor

i think for the purpose of this ticket, we can use OneCell, and direction is right.

But i think we should avoid derive(clone) IMHO, we can do it by wrap the cached admin in Arc

@fresh-borzoni
Copy link
Contributor

fresh-borzoni commented Mar 3, 2026

Thanks for working on this @toxicteddy00077!

The direction is right, we want to cache the admin like Java does. But we need to fix one thing first: FlussAdmin currently stores a concrete ServerConnection at construction time, which means a cached admin would be stuck with a dead connection if the coordinator restarts. Java avoids this because its FlussAdmin resolves the coordinator per-call via GatewayClientProxy.

Here's how to do it:

// admin.rs:

  • Remove admin_gateway field, remove both #[allow(dead_code)] on metadata and rpc_client, add #[derive(Clone)] (now safe - struct is just two Arcs)
  • Make new() sync, rewrite to just store the arcs
  • Add a helper that resolves the coordinator connection per-call, smth like this
async fn admin_gateway(&self) -> Result<ServerConnection> {
      let coordinator = self
          .metadata
          .get_cluster()
          .get_coordinator_server()
          .ok_or_else(|| Error::UnexpectedError {
              message: "Coordinator server not found in cluster metadata".to_string(),
              source: None,
          })?;
      self.rpc_client.get_connection(coordinator).await
  }
  • Update all methods, mechanical
    // before
    self.admin_gateway.request(SomeRequest::new(...)).await?;
    // after
    self.admin_gateway().await?.request(SomeRequest::new(...)).await?;

// connection.rs:
Add cache field admin_client: RwLock<Option> and use double-checked locking in get_admin(), same pattern as get_or_create_writer_client() right above it.
Just get_admin() should return Result and and stay async and doesn't await internally, so that it's compatible with previous signature.
With this, public API stays async fn get_admin(&self) -> Result, bindings untouched.

But if we can still break the signature, I would just go with Arc and sync method, and no Clone derive with Arc
cc @luoyuxia

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@toxicteddy00077 toxicteddy00077 force-pushed the fluss-admin-inst-and-cache branch from dac8f93 to d75ef69 Compare March 7, 2026 04:28
@toxicteddy00077
Copy link
Author

Thank you very much @fresh-borzoni, I have understood and made the changes, seems to test fine. Like you mentioned, I have just used RwLock over OneCell for admin_client.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 46 to +52
impl FlussAdmin {
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> {
let admin_con =
connections
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
},
)?)
.await?;

Ok(FlussAdmin {
admin_gateway: admin_con,
pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
FlussAdmin {
metadata,
rpc_client: connections,
})
}
}
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussAdmin::new changed from an async fn returning Result<Self> to a sync constructor returning Self. Since FlussAdmin is a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).

Consider keeping the existing public constructor signature (or adding a new new_unchecked/from_parts constructor for internal use) so callers depending on fallible/async initialization don’t break.

Copilot uses AI. Check for mistakes.
Comment on lines 79 to +96
pub async fn get_admin(&self) -> Result<FlussAdmin> {
FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await
// 1. Fast path: return cached instance if already initialized.
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
}

// 2. Slow path: acquire write lock.
let mut admin_guard = self.admin_client.write();

// 3. Double-check: another thread may have initialized while we waited.
if let Some(admin) = admin_guard.as_ref() {
return Ok(admin.clone());
}

// 4. Initialize and cache.
let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone());
*admin_guard = Some(admin.clone());
Ok(admin)
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_admin() now always succeeds and caches a FlussAdmin without verifying that the coordinator exists / is reachable. This is a behavior change: previously get_admin() could fail early (e.g., missing coordinator in metadata), and the integration readiness check in tests/integration/utils.rs relies on get_admin().await.is_ok() as part of cluster readiness.

If get_admin() is intended to remain a readiness/validation point, consider performing a lightweight async validation (e.g., resolve coordinator + RpcClient::get_connection) before caching/returning. Since that introduces an await, prefer an async single-init primitive (tokio::sync::OnceCell / async mutex) rather than holding a parking_lot lock across an await.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toxicteddy00077 TY, LGTM overall, left comment, PTAL

let's be consistent with Arcs, it will change public API signature, but Arc auto-derefs to FlussAdmin, so it will only break on explicit type annotation, which is low chance.

network_connects: Arc<RpcClient>,
args: Config,
writer_client: RwLock<Option<Arc<WriterClient>>>,
admin_client: RwLock<Option<FlussAdmin>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use RwLock<Option<Arc<FlussAdmin>>>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cache Admin instance in FlussConnection

4 participants