-
Notifications
You must be signed in to change notification settings - Fork 34
fix: added check to get_admin() before creating new admin #369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use crate::client::WriterClient; | ||
| use crate::client::{WriterClient}; | ||
| use crate::client::admin::FlussAdmin; | ||
| use crate::client::metadata::Metadata; | ||
| use crate::client::table::FlussTable; | ||
|
|
@@ -24,7 +24,6 @@ use crate::rpc::RpcClient; | |
| use parking_lot::RwLock; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
|
|
||
| use crate::error::{Error, FlussError, Result}; | ||
| use crate::metadata::TablePath; | ||
|
|
||
|
|
@@ -33,6 +32,7 @@ pub struct FlussConnection { | |
| network_connects: Arc<RpcClient>, | ||
| args: Config, | ||
| writer_client: RwLock<Option<Arc<WriterClient>>>, | ||
| admin_client: RwLock<Option<FlussAdmin>>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to use |
||
| } | ||
|
|
||
| impl FlussConnection { | ||
|
|
@@ -60,6 +60,7 @@ impl FlussConnection { | |
| network_connects: connections.clone(), | ||
| args: arg.clone(), | ||
| writer_client: Default::default(), | ||
| admin_client: RwLock::new(None), | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -76,7 +77,23 @@ impl FlussConnection { | |
| } | ||
|
|
||
| pub async fn get_admin(&self) -> Result<FlussAdmin> { | ||
| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await | ||
| // 1. Fast path: return cached instance if already initialized. | ||
| if let Some(admin) = self.admin_client.read().as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 2. Slow path: acquire write lock. | ||
| let mut admin_guard = self.admin_client.write(); | ||
|
|
||
| // 3. Double-check: another thread may have initialized while we waited. | ||
| if let Some(admin) = admin_guard.as_ref() { | ||
| return Ok(admin.clone()); | ||
| } | ||
|
|
||
| // 4. Initialize and cache. | ||
| let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()); | ||
| *admin_guard = Some(admin.clone()); | ||
| Ok(admin) | ||
|
Comment on lines
79
to
+96
|
||
| } | ||
|
|
||
| pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlussAdmin::newchanged from anasync fnreturningResult<Self>to a sync constructor returningSelf. SinceFlussAdminis a public type, this is an API-breaking change for downstream crates and also changes when/where initialization errors surface (constructor can no longer fail).Consider keeping the existing public constructor signature (or adding a new
new_unchecked/from_partsconstructor for internal use) so callers depending on fallible/async initialization don’t break.