@@ -21,6 +21,9 @@ use crate::{
2121 transports:: etcd:: Client as EtcdClient ,
2222} ;
2323
24+ /// Default interval for periodic reconciliation of instance_avail with instance_source
25+ const DEFAULT_RECONCILE_INTERVAL : Duration = Duration :: from_secs ( 5 ) ;
26+
2427#[ derive( Clone , Debug ) ]
2528pub struct Client {
2629 // This is me
@@ -35,11 +38,24 @@ pub struct Client {
3538 instance_avail_tx : Arc < tokio:: sync:: watch:: Sender < Vec < u64 > > > ,
3639 // Watch receiver for available instance IDs (for cloning to external subscribers)
3740 instance_avail_rx : tokio:: sync:: watch:: Receiver < Vec < u64 > > ,
41+ /// Interval for periodic reconciliation of instance_avail with instance_source.
42+ /// This ensures instances removed via `report_instance_down` are eventually restored.
43+ reconcile_interval : Duration ,
3844}
3945
4046impl Client {
4147 // Client with auto-discover instances using key-value store
4248 pub ( crate ) async fn new ( endpoint : Endpoint ) -> Result < Self > {
49+ Self :: with_reconcile_interval ( endpoint, DEFAULT_RECONCILE_INTERVAL ) . await
50+ }
51+
52+ /// Create a client with a custom reconcile interval.
53+ /// The reconcile interval controls how often `instance_avail` is reset to match
54+ /// `instance_source`, restoring any instances removed via `report_instance_down`.
55+ pub ( crate ) async fn with_reconcile_interval (
56+ endpoint : Endpoint ,
57+ reconcile_interval : Duration ,
58+ ) -> Result < Self > {
4359 tracing:: trace!(
4460 "Client::new_dynamic: Creating dynamic client for endpoint: {}" ,
4561 endpoint. id( )
@@ -54,6 +70,7 @@ impl Client {
5470 instance_free : Arc :: new ( ArcSwap :: from ( Arc :: new ( vec ! [ ] ) ) ) ,
5571 instance_avail_tx : Arc :: new ( avail_tx) ,
5672 instance_avail_rx : avail_rx,
73+ reconcile_interval,
5774 } ;
5875 client. monitor_instance_source ( ) ;
5976 Ok ( client)
@@ -132,7 +149,13 @@ impl Client {
132149 }
133150
134151 /// Monitor the key-value instance source and update instance_avail.
152+ ///
153+ /// This function also performs periodic reconciliation: if `instance_source` hasn't
154+ /// changed for `reconcile_interval`, we reset `instance_avail` to match
155+ /// `instance_source`. This ensures instances removed via `report_instance_down`
156+ /// are eventually restored even if the discovery source doesn't emit updates.
135157 fn monitor_instance_source ( & self ) {
158+ let reconcile_interval = self . reconcile_interval ;
136159 let cancel_token = self . endpoint . drt ( ) . primary_token ( ) ;
137160 let client = self . clone ( ) ;
138161 let endpoint_id = self . endpoint . id ( ) ;
@@ -152,11 +175,20 @@ impl Client {
152175 // Send update to watch channel subscribers
153176 let _ = client. instance_avail_tx . send ( instance_ids) ;
154177
155- if let Err ( err) = rx. changed ( ) . await {
156- tracing:: error!(
157- "monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}" ,
158- ) ;
159- cancel_token. cancel ( ) ;
178+ tokio:: select! {
179+ result = rx. changed( ) => {
180+ if let Err ( err) = result {
181+ tracing:: error!(
182+ "monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}" ,
183+ ) ;
184+ cancel_token. cancel( ) ;
185+ }
186+ }
187+ _ = tokio:: time:: sleep( reconcile_interval) => {
188+ tracing:: trace!(
189+ "monitor_instance_source: periodic reconciliation for endpoint={endpoint_id}" ,
190+ ) ;
191+ }
160192 }
161193 }
162194 } ) ;
@@ -241,3 +273,118 @@ impl Client {
241273 Ok ( instance_source)
242274 }
243275}
276+
277+ #[ cfg( test) ]
278+ mod tests {
279+ use super :: * ;
280+ use crate :: { DistributedRuntime , Runtime , distributed:: DistributedConfig } ;
281+
282+ /// Test that instances removed via report_instance_down are restored after
283+ /// the reconciliation interval elapses.
284+ #[ tokio:: test]
285+ async fn test_instance_reconciliation ( ) {
286+ const TEST_RECONCILE_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
287+
288+ let rt = Runtime :: from_current ( ) . unwrap ( ) ;
289+ // Use process_local config to avoid needing etcd/nats
290+ let drt = DistributedRuntime :: new ( rt. clone ( ) , DistributedConfig :: process_local ( ) )
291+ . await
292+ . unwrap ( ) ;
293+ let ns = drt. namespace ( "test_reconciliation" . to_string ( ) ) . unwrap ( ) ;
294+ let component = ns. component ( "test_component" . to_string ( ) ) . unwrap ( ) ;
295+ let endpoint = component. endpoint ( "test_endpoint" . to_string ( ) ) ;
296+
297+ // Use a short reconcile interval for faster tests
298+ let client = Client :: with_reconcile_interval ( endpoint, TEST_RECONCILE_INTERVAL )
299+ . await
300+ . unwrap ( ) ;
301+
302+ // Initially, instance_avail should be empty (no registered instances)
303+ assert ! ( client. instance_ids_avail( ) . is_empty( ) ) ;
304+
305+ // For this test, we'll directly manipulate instance_avail and verify reconciliation
306+ // Store some test IDs
307+ client. instance_avail . store ( Arc :: new ( vec ! [ 1 , 2 , 3 ] ) ) ;
308+
309+ assert_eq ! ( * * client. instance_ids_avail( ) , vec![ 1u64 , 2 , 3 ] ) ;
310+
311+ // Simulate report_instance_down removing instance 2
312+ client. report_instance_down ( 2 ) ;
313+ assert_eq ! ( * * client. instance_ids_avail( ) , vec![ 1u64 , 3 ] ) ;
314+
315+ // Wait for reconciliation interval + buffer
316+ // The monitor_instance_source will reset instance_avail to match instance_source
317+ // Since instance_source is empty, after reconciliation instance_avail should be empty
318+ tokio:: time:: sleep ( TEST_RECONCILE_INTERVAL + Duration :: from_millis ( 50 ) ) . await ;
319+
320+ // After reconciliation, instance_avail should match instance_source (which is empty)
321+ assert ! (
322+ client. instance_ids_avail( ) . is_empty( ) ,
323+ "After reconciliation, instance_avail should match instance_source"
324+ ) ;
325+
326+ rt. shutdown ( ) ;
327+ }
328+
329+ /// Test that report_instance_down correctly removes an instance from instance_avail.
330+ #[ tokio:: test]
331+ async fn test_report_instance_down ( ) {
332+ let rt = Runtime :: from_current ( ) . unwrap ( ) ;
333+ // Use process_local config to avoid needing etcd/nats
334+ let drt = DistributedRuntime :: new ( rt. clone ( ) , DistributedConfig :: process_local ( ) )
335+ . await
336+ . unwrap ( ) ;
337+ let ns = drt. namespace ( "test_report_down" . to_string ( ) ) . unwrap ( ) ;
338+ let component = ns. component ( "test_component" . to_string ( ) ) . unwrap ( ) ;
339+ let endpoint = component. endpoint ( "test_endpoint" . to_string ( ) ) ;
340+
341+ let client = endpoint. client ( ) . await . unwrap ( ) ;
342+
343+ // Manually set up instance_avail with test instances
344+ client. instance_avail . store ( Arc :: new ( vec ! [ 1 , 2 , 3 ] ) ) ;
345+ assert_eq ! ( * * client. instance_ids_avail( ) , vec![ 1u64 , 2 , 3 ] ) ;
346+
347+ // Report instance 2 as down
348+ client. report_instance_down ( 2 ) ;
349+
350+ // Verify instance 2 is removed
351+ let avail = client. instance_ids_avail ( ) ;
352+ assert ! ( avail. contains( & 1 ) , "Instance 1 should still be available" ) ;
353+ assert ! (
354+ !avail. contains( & 2 ) ,
355+ "Instance 2 should be removed after report_instance_down"
356+ ) ;
357+ assert ! ( avail. contains( & 3 ) , "Instance 3 should still be available" ) ;
358+
359+ rt. shutdown ( ) ;
360+ }
361+
362+ /// Test that instance_avail_watcher receives updates when instances change.
363+ #[ tokio:: test]
364+ async fn test_instance_avail_watcher ( ) {
365+ let rt = Runtime :: from_current ( ) . unwrap ( ) ;
366+ // Use process_local config to avoid needing etcd/nats
367+ let drt = DistributedRuntime :: new ( rt. clone ( ) , DistributedConfig :: process_local ( ) )
368+ . await
369+ . unwrap ( ) ;
370+ let ns = drt. namespace ( "test_watcher" . to_string ( ) ) . unwrap ( ) ;
371+ let component = ns. component ( "test_component" . to_string ( ) ) . unwrap ( ) ;
372+ let endpoint = component. endpoint ( "test_endpoint" . to_string ( ) ) ;
373+
374+ let client = endpoint. client ( ) . await . unwrap ( ) ;
375+ let watcher = client. instance_avail_watcher ( ) ;
376+
377+ // Set initial instances
378+ client. instance_avail . store ( Arc :: new ( vec ! [ 1 , 2 , 3 ] ) ) ;
379+
380+ // Report instance down - this should notify the watcher
381+ client. report_instance_down ( 2 ) ;
382+
383+ // The watcher should receive the update
384+ // Note: We need to check if changed() was signaled
385+ let current = watcher. borrow ( ) . clone ( ) ;
386+ assert_eq ! ( current, vec![ 1 , 3 ] ) ;
387+
388+ rt. shutdown ( ) ;
389+ }
390+ }
0 commit comments