Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,29 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::task::JoinHandle;

#[derive(Clone)]
pub struct FlussAdmin {
admin_gateway: ServerConnection,
#[allow(dead_code)]
metadata: Arc<Metadata>,
#[allow(dead_code)]
rpc_client: Arc<RpcClient>,
}

impl FlussAdmin {
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> {
let admin_con =
connections
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
|| Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
},
)?)
.await?;

Ok(FlussAdmin {
admin_gateway: admin_con,
pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
FlussAdmin {
metadata,
rpc_client: connections,
})
}
}
Comment on lines 46 to +52
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussAdmin::new changed from an async fn returning Result<Self> to a sync constructor returning Self. Since FlussAdmin is a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).

Consider keeping the existing public constructor signature (or adding a new new_unchecked/from_parts constructor for internal use) so callers depending on fallible/async initialization don’t break.

Copilot uses AI. Check for mistakes.

async fn admin_gateway(&self) -> Result<ServerConnection> {
let cluster = self.metadata.get_cluster();
let coordinator = cluster.get_coordinator_server().ok_or_else(|| {
Error::UnexpectedError {
message: "Coordinator server not found in cluster metadata".to_string(),
source: None,
}
})?;
self.rpc_client.get_connection(coordinator).await
}

pub async fn create_database(
Expand All @@ -71,7 +69,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreateDatabaseRequest::new(
database_name,
database_descriptor,
Expand All @@ -88,7 +87,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreateTableRequest::new(
table_path,
table_descriptor,
Expand All @@ -104,15 +104,17 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropTableRequest::new(table_path, ignore_if_not_exists))
.await?;
Ok(())
}

pub async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(GetTableRequest::new(table_path))
.await?;

Expand Down Expand Up @@ -144,7 +146,8 @@ impl FlussAdmin {
/// List all tables in the given database
pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListTablesRequest::new(database_name))
.await?;
Ok(response.table_name)
Expand All @@ -162,7 +165,8 @@ impl FlussAdmin {
partial_partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<PartitionInfo>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListPartitionInfosRequest::new(
table_path,
partial_partition_spec,
Expand All @@ -179,7 +183,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(CreatePartitionRequest::new(
table_path,
partition_spec,
Expand All @@ -197,7 +202,8 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropPartitionRequest::new(
table_path,
partition_spec,
Expand All @@ -210,7 +216,8 @@ impl FlussAdmin {
/// Check if a table exists
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(TableExistsRequest::new(table_path))
.await?;
Ok(response.exists)
Expand All @@ -224,7 +231,8 @@ impl FlussAdmin {
cascade: bool,
) -> Result<()> {
let _response = self
.admin_gateway
.admin_gateway()
.await?
.request(DropDatabaseRequest::new(
database_name,
ignore_if_not_exists,
Expand All @@ -237,7 +245,8 @@ impl FlussAdmin {
/// List all databases
pub async fn list_databases(&self) -> Result<Vec<String>> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(ListDatabasesRequest::new())
.await?;
Ok(response.database_name)
Expand All @@ -246,7 +255,8 @@ impl FlussAdmin {
/// Check if a database exists
pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(DatabaseExistsRequest::new(database_name))
.await?;
Ok(response.exists)
Expand All @@ -255,7 +265,7 @@ impl FlussAdmin {
/// Get database information
pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> {
let request = GetDatabaseInfoRequest::new(database_name);
let response = self.admin_gateway.request(request).await?;
let response = self.admin_gateway().await?.request(request).await?;

// Convert proto response to DatabaseInfo
let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?;
Expand All @@ -278,7 +288,8 @@ impl FlussAdmin {
/// Get the latest lake snapshot for a table
pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
let response = self
.admin_gateway
.admin_gateway()
.await?
.request(GetLatestLakeSnapshotRequest::new(table_path))
.await?;

Expand Down
23 changes: 20 additions & 3 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::WriterClient;
use crate::client::{WriterClient};
use crate::client::admin::FlussAdmin;
use crate::client::metadata::Metadata;
use crate::client::table::FlussTable;
Expand All @@ -24,7 +24,6 @@ use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;

use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;

Expand All @@ -33,6 +32,7 @@ pub struct FlussConnection {
network_connects: Arc<RpcClient>,
args: Config,
writer_client: RwLock<Option<Arc<WriterClient>>>,
admin_client: RwLock<Option<FlussAdmin>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use RwLock<Option<Arc<FlussAdmin>>>

}

impl FlussConnection {
Expand Down Expand Up @@ -60,6 +60,7 @@ impl FlussConnection {
network_connects: connections.clone(),
args: arg.clone(),
writer_client: Default::default(),
admin_client: RwLock::new(None),
})
}

Expand All @@ -76,7 +77,23 @@ impl FlussConnection {
}

pub async fn get_admin(&self) -> Result<FlussAdmin> {
FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await
// 1. Fast path: return cached instance if already initialized.
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
}

// 2. Slow path: acquire write lock.
let mut admin_guard = self.admin_client.write();

// 3. Double-check: another thread may have initialized while we waited.
if let Some(admin) = admin_guard.as_ref() {
return Ok(admin.clone());
}

// 4. Initialize and cache.
let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone());
*admin_guard = Some(admin.clone());
Ok(admin)
Comment on lines 79 to +96
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_admin() now always succeeds and caches a FlussAdmin without verifying that the coordinator exists / is reachable. This is a behavior change: previously get_admin() could fail early (e.g., missing coordinator in metadata), and the integration readiness check in tests/integration/utils.rs relies on get_admin().await.is_ok() as part of cluster readiness.

If get_admin() is intended to remain a readiness/validation point, consider performing a lightweight async validation (e.g., resolve coordinator + RpcClient::get_connection) before caching/returning. Since that introduces an await, prefer an async single-init primitive (tokio::sync::OnceCell / async mutex) rather than holding a parking_lot lock across an await.

Copilot uses AI. Check for mistakes.
}

pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
Expand Down
Loading