Network resilience: connection health, plugin reconnection, and state visibility#288
Network resilience: connection health, plugin reconnection, and state visibility#288chipsenkbeil wants to merge 9 commits intomasterfrom
Conversation
…ction Move socket2 from unix-only to cross-platform dependencies and configure SO_KEEPALIVE (15s idle, 5s interval) on all TCP connections: client connect, client reconnect, and server accept. Keepalive failures log a warning but do not fail the connection.
Add max_heartbeat_failures (default 3) to ServerConfig. The connection loop now counts consecutive non-WouldBlock heartbeat write errors and terminates the connection when the threshold is reached. Counter resets on any successful write. Setting the value to 0 disables the feature. Backward-compatible via serde default.
…to Plugin trait Extend the Plugin trait with default reconnect() (returns Unsupported) and reconnect_strategy() (returns Fail). All three plugins override both: Host/SSH/Docker delegate reconnect to connect and return ExponentialBackoff with backend-appropriate parameters (Docker gets more retries for slow daemon restarts).
…ination Add ShutdownSender type and ServerRef::shutdown_sender() for lightweight shutdown signaling. SSH backend spawns a health monitor that polls session.is_closed() every 2s. Docker backend polls daemon ping and container state every 5s. When the backend dies, the health monitor triggers server shutdown, dropping the in-memory transport so the manager can detect the disconnection.
ManagerConnection::spawn() now clones the UntypedClient's ConnectionWatcher and optionally spawns a monitor task that sends the connection ID through a death notification channel when the connection transitions to Disconnected. ManagerServer wires this up for all connections created via connect().
Add SubscribeConnectionEvents and Reconnect request variants, plus ConnectionStateChanged, SubscribedConnectionEvents, and ReconnectInitiated response variants to the manager protocol. Add event broadcast infrastructure to ManagerServer and client methods to ManagerClient. Add Serialize/Deserialize to ConnectionState for wire transport.
Add handle_reconnection() to orchestrate plugin reconnection when a connection dies: looks up plugin by scheme, checks reconnect_strategy(), retries with backoff, and hot-swaps the connection on success. Add replace_client() on ManagerConnection for atomic client replacement with task abort/respawn. Add NonInteractiveAuthenticator for background reconnection using key-based auth. Make ReconnectStrategy backoff helpers public for reuse.
Add subscribe_and_display_connection_events() for background event display in long-running CLI commands (Shell, Api, Spawn, Ssh). Shell format prints to stderr, JSON to stdout. Add `distant client reconnect` subcommand to manually trigger reconnection by connection ID.
Add --no-reconnect flag to Connect, Launch, and Ssh subcommands to disable automatic reconnection on connection loss. Add --heartbeat-interval and --max-heartbeat-failures flags to server Listen subcommand for configuring heartbeat behavior. Wire no_reconnect through options Map into the manager's reconnection orchestration.
b63b224 to
a12a240
Compare
| // --------------------------------------------------------------- | ||
| // ConnectionState serde round-trip tests | ||
| // --------------------------------------------------------------- |
There was a problem hiding this comment.
Don't need separators like this
|
|
||
| #[test] | ||
| fn connection_state_should_reject_non_string_json_types() { | ||
| // Number |
There was a problem hiding this comment.
These comments are unnecessary
| pub use inmemory::*; | ||
|
|
||
| mod tcp; | ||
| pub(crate) use tcp::configure_tcp_keepalive; |
There was a problem hiding this comment.
Does this one function need to be pulled up to be available? Nothing exposed on the TCP transport or anywhere else to do this?
| let req = ManagerRequest::SubscribeConnectionEvents; | ||
| let json = serde_json::to_string(&req).unwrap(); | ||
| let val: serde_json::Value = serde_json::from_str(&json).unwrap(); | ||
| assert_eq!(val["type"], "subscribe_connection_events"); |
There was a problem hiding this comment.
Can we not make this a more generic name vs the really long subscribe_connection_events? Feels like we should make this more generic in case there are other events we'd subscribe to in the future. We could just call this subscribe and then we receive event or events as responses after a subscribed response that contain the connection events. We'd just need a generic event enum that has a type (connection for now) with a specific payload.
There was a problem hiding this comment.
Or if our response is ConnectionStateChanged that could be the event type connection_state_changed with a body that has whatever else we need.
| /// Unsolicited notification of a connection state change. | ||
| /// Sent to clients that have subscribed via `SubscribeConnectionEvents`. | ||
| ConnectionStateChanged { | ||
| /// Id of the connection whose state changed | ||
| id: ConnectionId, | ||
| /// New connection state | ||
| state: ConnectionState, | ||
| }, | ||
|
|
||
| /// Confirmation that connection event subscription was established. | ||
| SubscribedConnectionEvents, |
There was a problem hiding this comment.
Once again, we should make this be something like Subscribed and then Event (or Events) which has an inner type Event(Event) where the inner type is something like Event::ConnectionStateChanged { id: ConnectionId, state: ConnectionState }.
There was a problem hiding this comment.
Could have the type be event and then have a subtype of connection_state_changed or something like that. Also, that's still too long of a name. What is something more concise we can use?
| // --------------------------------------------------------------- | ||
| // ConnectionStateChanged serde round-trip | ||
| // --------------------------------------------------------------- |
There was a problem hiding this comment.
Stop including these
…plan Move docs/mount-tests-PRD.md → PRD.md and docs/mount-tests-progress.md → PROGRESS.md so they live alongside README.md as canonical project references. Correct the stale "9 FP failures remain" status — the FP suite has been at 37/37 since 86d794d, total at 228/228. Embed the full Network Resilience + Mount Health plan into PRD.md as a new "Plan: Network Resilience + Mount Health" section so the plan survives context compaction. PROGRESS.md gains an "Active plan" pointer at the top plus a Phase 0 checklist (0a–0j) tracking the incorporation of PR #288 and a Phase 1–6 checklist for the mount health work that follows. Update .claude/commands/mount-test-loop.md to point at the new top-level paths.
…epalive Move socket2 from unix-only to cross-platform dependencies and configure SO_KEEPALIVE (15s idle, 5s probe interval) on every TCP stream owned by distant: client connect, transport reconnect, and listener accept. Cherry-picked from #288 (commit 61e48c0). Address review comment 2933801998 ("Does this one function need to be pulled up to be available?") by exposing keepalive through the public TcpTransport surface instead of a `pub(crate) use` re-export of a free helper: - TcpTransport gains a `set_keepalive(&self) -> io::Result<()>` method that does the SO_KEEPALIVE configuration via socket2::SockRef. - TcpTransport gains `from_accepted(stream, peer_addr) -> Self` which the listener uses to wrap accepted streams; it sets keepalive internally. - TcpListener::accept stops reaching into a private helper and just delegates to TcpTransport::from_accepted. - TcpTransport::connect / Reconnectable::reconnect call the new set_keepalive method internally so callers get keepalive for free. Keepalive failures log a warning but do not fail the connection.
Add max_heartbeat_failures (default 3) to ServerConfig. The connection loop now counts consecutive non-WouldBlock heartbeat write errors and terminates the connection when the threshold is reached. The counter resets on any successful write (heartbeat or response). Setting the value to 0 disables the feature. Backward-compatible via serde default. Cherry-picked from #288 (commit fa40953).
Extend the Plugin trait with default reconnect() (returns Unsupported) and reconnect_strategy() (returns Fail). All three plugins override both: Host, SSH, and Docker delegate reconnect to connect and return ExponentialBackoff with backend-appropriate parameters: - Host: 3 retries / 2s base / 30s max / 60s timeout - SSH: 5 retries / 2s base / 30s max / 30s timeout - Docker: 10 retries / 1s base / 60s max / 30s timeout (slow daemon restarts deserve more patience) Cherry-picked from #288 (commit 3660e62). Strip the new separator-style test section comments per review comments 2915971580 / 2933755107 / 2933823312 (CLAUDE.md anti-pattern #11) and rename per-crate test functions to disambiguate them across crates (docker_reconnect_strategy_returns_*, ssh_reconnect_strategy_returns_*).
…ination Add ShutdownSender type and ServerRef::shutdown_sender() for lightweight shutdown signaling. The SSH backend spawns a health monitor that polls api.is_session_closed() every 2s. The Docker backend polls daemon ping and container state every 5s. When the backend dies, the health monitor triggers server shutdown, dropping the in-memory transport so the manager can detect the disconnection. Add ApiServerHandler::from_arc(Arc<T>) so the in-memory server can share its API instance with a health-monitor task that needs to query backend liveness. ChannelPool::is_closed and SshApi::is_session_closed are async on file-mount because the russh handle lives behind a tokio Mutex (added for tcpip_forward, see russh#658). The poll loop awaits both. Cherry-picked from #288 (commit 993ed8d). Resolved conflicts in distant-ssh/src/lib.rs (file-mount's tunnel state + SshApi 5-arg constructor needed to coexist with PR #288's Arc<SshApi> wrapping and ssh_health_monitor). Also added the `options` field to two test cases in distant-core/src/api.rs that the file-mount branch added since the cherry-pick base.
ManagerConnection::spawn() now clones the UntypedClient's ConnectionWatcher and optionally spawns a monitor task that sends the connection ID through a death notification channel when the connection transitions to Disconnected. ManagerServer wires this up for all connections created via connect(). For now the death-handling task in ManagerServer::new just logs the disconnect — full reconnection orchestration arrives in step 0h (adapted from PR #288 commit aa035a8). Step 0f only plumbs the watcher through. Cherry-picked from #288 (commit 594c3ca). Resolved conflicts in distant-core/src/net/manager/server.rs to keep file-mount's mount/tunnel struct fields and ManagerServer::new constructor alongside the new death_tx field.
…ations Adds the manager-side push protocol that PR #288 began but reshapes it per the review thread on #288 (comments 2933812110, 2933814790, 2933821911, 2933826601). Instead of bespoke SubscribeConnectionEvents / SubscribedConnectionEvents / ConnectionStateChanged / Reconnect / ReconnectInitiated variants, the protocol now exposes a generic three-piece API: ManagerRequest::Subscribe { topics: Vec<EventTopic> } ManagerRequest::Unsubscribe ManagerRequest::Reconnect { id: ConnectionId } ManagerResponse::Subscribed ManagerResponse::Unsubscribed ManagerResponse::Event { event: Event } ManagerResponse::ReconnectInitiated { id: ConnectionId } A new `distant-core/src/net/manager/data/event.rs` module defines: - `EventTopic { All, Connection, Mount }` — subscribers filter on topics; `All` matches every variant present and future. `Mount` is reserved (no producers yet — Phase 1 of the mount-health work ships `Event::MountState` together with the typed `MountStatus` enum). - `Event { ConnectionState { id, state } }` — a tagged event enum. Future variants (mount, tunnel, server status) plug in here without protocol additions. - `Event::topic(&self) -> EventTopic` — used by the dispatcher in step 0h to filter pushed events for clients that subscribed to specific topics. Wire shape (JSON): {"type":"subscribe","topics":["connection","mount"]} {"type":"event","event":{"type":"connection_state","id":7,"state":"reconnecting"}} To make this protocol layer fully functional in step 0h: - ConnectionState gains `Serialize`/`Deserialize` (snake_case) so it can ride the wire as part of `Event::ConnectionState`. - ReconnectStrategy::initial_sleep_duration and adjust_sleep are promoted from private to `pub` so the orchestration in 0h can drive its own retry loop. Stub handlers in `ManagerServer` return `Error` responses for the new request variants; step 0h replaces them with the real broadcast::channel + handle_reconnection wiring.
Add handle_reconnection() to orchestrate plugin reconnection when a
connection dies. The death loop in ManagerServer::new now drives this
function instead of just logging the disconnect:
1. Read the connection's destination + options under a brief read lock.
2. Look up the plugin by scheme.
3. If reconnect_strategy() is Fail, broadcast Disconnected and stop.
4. Honor the `no_reconnect` option from the CLI flag (added in 0i).
5. Broadcast Reconnecting and enter the retry loop, sleeping per the
plugin's strategy and timing each attempt against strategy.timeout().
6. On success, hot-swap the connection via ManagerConnection::replace_client
and broadcast Connected. On exhaustion, broadcast Disconnected.
Add ManagerConnection::replace_client which aborts old request /
response / monitor tasks, mints a fresh action task with a new
request_tx, and spawns a new connection monitor with the death_tx.
Existing channels are invalidated by design — callers must re-open
them after replacement.
Add NonInteractiveAuthenticator: a no-prompt Authenticator used during
background reconnection. challenge() fails with PermissionDenied
(callers using key-file or ssh-agent auth never invoke it); verify()
auto-accepts host verification because the host was verified on the
original connect.
Wire the protocol stubs from step 0g into the real implementations:
- Subscribe { topics } now spawns a forwarder task that drains the
broadcast bus, filters events by the requested topics
(EventTopic::All matches everything), and pushes
ManagerResponse::Event { event } back through the channel reply.
- Unsubscribe acks immediately. The forwarder task tied to the channel
exits naturally when the reply stream closes; per-channel teardown
while keeping the channel open is a future refinement.
- Reconnect { id } verifies the connection exists, then spawns
handle_reconnection in the background and returns ReconnectInitiated.
State transitions arrive later as Event::ConnectionState pushes.
Replace the placeholder publish helper from PR #288 with
publish_connection_state, which sends Event::ConnectionState into the
broadcast::Sender<Event> bus. ManagerServer gains an event_tx field;
broadcast::channel<Event> capacity is 16.
Cherry-picked from #288 (commit aa035a8) and
adapted to:
- Use the generic Subscribe/Event protocol from step 0g instead of
the bespoke SubscribeConnectionEvents/ConnectionStateChanged.
- Coexist with the file-mount branch's mount + tunnel struct fields
and request handlers.
- Match the existing reply_err helper and the file-mount tunnel +
mount handler ordering inside the request match.
Imports the new ConnectionState serde tests and ReconnectStrategy
{initial_sleep_duration, adjust_sleep} unit tests from the upstream
commit, with separator-style comments stripped per the review thread.
Cherry-picked from #288 (commit c40c543), adapted to use the generic Subscribe/Event protocol from step 0g instead of the bespoke SubscribeConnectionEvents helpers PR #288 originally shipped. CLI helper changes: - src/cli/common/client.rs: replace subscribe_and_display_connection_events(client, format) with subscribe_and_display_events(client, topics, format) accepting a Vec<EventTopic>. Long-running CLI commands (Shell, Api, Spawn, Ssh) now subscribe with [Connection, Mount] so a backgrounded mount drop surfaces in the same stderr/JSON stream as connection drops. JSON shape mirrors the wire format: {"type":"event","event":{"type":"connection_state",...}}. - A new display_event() helper renders each Event variant in both Format::Shell and Format::Json, ready to be extended for the Event::MountState variant that lands in Phase 1. ManagerClient API: - ManagerClient::subscribe(topics) → io::Result<Mailbox<...>>: sends Subscribe { topics }, waits for the Subscribed ack, then returns the mailbox so callers don't see the ack mixed with events. - ManagerClient::unsubscribe() → io::Result<()>: best-effort hint. - ManagerClient::reconnect(id) → io::Result<()>: sends Reconnect { id }, waits for ReconnectInitiated. The actual state transitions arrive later as Event::ConnectionState pushes on any open subscription. CLI command: - distant client reconnect <id> uses ManagerClient::reconnect. Format::Json prints {"type":"reconnect_initiated","id":<id>}; Format::Shell prints a Ui::success line.
Cherry-picked from #288 (commit a12a240). Add --no-reconnect to Connect, Launch, and Ssh client subcommands to disable automatic reconnection on connection loss. The flag is plumbed through the options Map (`no_reconnect=true`) into the manager's reconnection orchestration, where handle_reconnection checks for it before doing any work and broadcasts Disconnected straight away. Add --heartbeat-interval and --max-heartbeat-failures to the server Listen subcommand for configuring the heartbeat counter introduced in step 0c. Renamed notify_state_change → publish_connection_state at one follow-on call site that came in with this commit's no_reconnect check (the rest were renamed in step 0h).
Mark all 0a–0j and Phases 1–5 boxes as complete in PROGRESS.md with the commit hashes for each. Phase 6 (this commit) is the docs roll-up itself. Update PRD.md status section to reflect 228/228 mount tests + 2291 distant-core lib tests passing, with the highlight bullet list of what landed (generic Subscribe/Event protocol, MountStatus enum, per-mount monitor, kill-leak fix, network resilience stack from PR #288, CLI flags). Document the deferred items (HLT-01..04, EVT-01..02, granular per-backend probes, process audit, Windows VM testing) so the next session has a clear list to work from. docs/CHANGELOG.md gains an Unreleased section listing every user-facing addition and the two breaking changes (`MountInfo.status` enum, `kill(id)` cleans mounts).
Summary
Full-stack connection resilience system addressing #229 (client hangs on network switch) and #192 (connection state notifications).
--max-heartbeat-failures)reconnect()andreconnect_strategy()trait methods with per-plugin backoff (Host: 3 retries, SSH: 5 retries, Docker: 10 retries)session.is_closed(), Docker polls daemon ping + container state; both trigger server self-termination on failureConnectionWatcherdeath notifications trigger automatic reconnection orchestration with hot-swap of the underlying clientSubscribeConnectionEvents,Reconnect,ConnectionStateChangedrequest/response variants for push-based state events[distant] Connection {id}: reconnecting/connected/disconnectedstatus in long-running commands (Shell, Api, Spawn, Ssh);distant client reconnect <id>manual trigger--no-reconnecton Connect/Launch/Ssh;--heartbeat-intervaland--max-heartbeat-failureson server Listen9 commits across 26 files, +3296/-66 lines. All changes have comprehensive unit tests.
Test plan
cargo test --all-features --workspacepasses (verified locally)RUSTFLAGS="-Dwarnings" cargo clippy --all-features --workspace --all-targetscleandistant://, disable network → see "reconnecting" → re-enable → see "connected"distant client reconnect <id>triggers reconnection--no-reconnectprevents automatic reconnection--heartbeat-intervaland--max-heartbeat-failuresconfigure server behavior