diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index d517664467..e7fe9028d3 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -253,28 +253,77 @@ impl DistributedRuntime { } // Start health check manager if enabled + // IMPORTANT: Delay canary health check start until system is ready + // This prevents race conditions where canary checks fire before NATS endpoints + // are fully subscribed and ready to receive requests. if config.health_check_enabled { - let health_check_config = crate::health_check::HealthCheckConfig { - canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs), - request_timeout: std::time::Duration::from_secs( - config.health_check_request_timeout_secs, - ), - }; + let drt_clone = distributed_runtime.clone(); + let canary_wait_time_secs = config.canary_wait_time_secs; + let request_timeout_secs = config.health_check_request_timeout_secs; + + tokio::spawn(async move { + tracing::info!("Waiting for system readiness before starting canary health checks"); + + // Poll readiness with exponential backoff + let mut wait_time = std::time::Duration::from_millis(100); + let max_wait = std::time::Duration::from_secs(300); // 5 minutes for large model loading + let mut total_waited = std::time::Duration::ZERO; + + while total_waited < max_wait { + let (healthy, endpoints) = drt_clone.system_health().lock().get_health_status(); + if healthy { + tracing::info!( + "System ready after {:?} (endpoints: {:?}), starting canary health check manager", + total_waited, + endpoints.keys().collect::>() + ); + break; + } + + // Log progress every 10 seconds + if total_waited.as_secs() > 0 && total_waited.as_secs().is_multiple_of(10) { + tracing::debug!( + "Still waiting for system readiness... ({:?} elapsed, endpoints: {:?})", + total_waited, + endpoints + ); + } - // Start the health check manager (spawns per-endpoint monitoring tasks) - match crate::health_check::start_health_check_manager( - distributed_runtime.clone(), - Some(health_check_config), - ) - .await - { - Ok(()) => tracing::info!( - "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)", - config.canary_wait_time_secs, - config.health_check_request_timeout_secs - ), - Err(e) => tracing::error!("Health check manager failed to start: {}", e), - } + tokio::time::sleep(wait_time).await; + total_waited += wait_time; + // Exponential backoff: 100ms, 200ms, 400ms, 800ms, 1600ms, max 5s + wait_time = std::cmp::min(wait_time * 2, std::time::Duration::from_secs(5)); + } + + if total_waited >= max_wait { + let (_, endpoints) = drt_clone.system_health().lock().get_health_status(); + tracing::warn!( + "System not ready after {:?} (endpoints: {:?}), starting canary health checks anyway", + max_wait, + endpoints + ); + } + + // Now start the health check manager + let health_check_config = crate::health_check::HealthCheckConfig { + canary_wait_time: std::time::Duration::from_secs(canary_wait_time_secs), + request_timeout: std::time::Duration::from_secs(request_timeout_secs), + }; + + match crate::health_check::start_health_check_manager( + drt_clone, + Some(health_check_config), + ) + .await + { + Ok(()) => tracing::info!( + "Canary health check manager started (canary_wait_time: {}s, request_timeout: {}s)", + canary_wait_time_secs, + request_timeout_secs + ), + Err(e) => tracing::error!("Canary health check manager failed to start: {}", e), + } + }); } Ok(distributed_runtime)