Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions fns_cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def __init__(self, config: AppConfig) -> None:
self._handlers: dict[str, Callable[..., Coroutine]] = {}
self._binary_handler: Callable[..., Coroutine] | None = None
self._on_reconnect: Callable[[], Coroutine] | None = None
self._reconnect_task: asyncio.Task | None = None
self._msg_queue: list[str | bytes] = []
self._ready_event = asyncio.Event()
self._send_lock = asyncio.Lock()

def on_reconnect(self, handler: Callable[[], Coroutine]) -> None:
self._on_reconnect = handler
Expand All @@ -60,13 +62,14 @@ async def send_bytes(self, data: bytes) -> None:
await self._raw_send(data)

async def _raw_send(self, data: str | bytes) -> None:
if self.ws is None:
self._msg_queue.append(data)
return
try:
await self.ws.send(data)
except websockets.ConnectionClosed:
self._msg_queue.append(data)
async with self._send_lock:
if self.ws is None:
self._msg_queue.append(data)
return
try:
await self.ws.send(data)
except websockets.ConnectionClosed:
self._msg_queue.append(data)

async def _flush_queue(self) -> None:
queue, self._msg_queue = self._msg_queue, []
Expand Down Expand Up @@ -121,24 +124,16 @@ async def _connect(self) -> None:
)
log.info("Connecting to %s", url)

# Heartbeat: the server sends Ping every 25s and closes the connection
# if it sees no frame within 60s (PingWait). The websockets library
# auto-responds to server pings with a pong, so the server's deadline
# is always refreshed as long as the connection is actually alive.
#
# We also send our own pings to detect the reverse case: a half-open
# TCP connection or a proxy that silently drops the server's pings.
# The interval/timeout are generous because the server's gws write
# queue can hold our Pong behind large binary frames during heavy
# chunk transfer — a 30s-or-less timeout false-triggers on initial
# sync. 45s + 90s gives roughly 135s worst-case dead-connection
# detection, which is comfortably past any observed Pong delay while
# still catching genuine network failures.
# The server already sends keepalive pings and will close dead
# connections on its side. During large attachment uploads, our own
# client-initiated ping/pong cycle causes false timeouts and forces an
# expensive reconnect + full resync loop, which then delays realtime
# local pushes even further. Rely on the server heartbeat here.
self.ws = await websockets.connect(
url,
max_size=128 * 1024 * 1024,
ping_interval=45,
ping_timeout=90,
ping_interval=None,
ping_timeout=None,
close_timeout=10,
)
log.info("WebSocket connected, sending auth")
Expand Down Expand Up @@ -169,7 +164,7 @@ async def _handle_text(self, raw: str) -> None:
except Exception:
log.exception("Handler error for %s", msg.action)
else:
log.debug("Unhandled action: %s", msg.action)
log.warning("Unhandled action from server: %s", msg.action)

async def _handle_binary(self, raw: bytes) -> None:
if self._binary_handler and len(raw) > 42 and raw[:2] == b"00":
Expand All @@ -196,17 +191,25 @@ async def _on_auth_response(self, msg: WSMessage) -> None:
await self._flush_queue()
self._ready_event.set()
if self._connect_count > 1 and self._on_reconnect:
try:
await self._on_reconnect()
except Exception:
log.exception("Reconnect handler error")
if self._reconnect_task and not self._reconnect_task.done():
self._reconnect_task.cancel()
self._reconnect_task = asyncio.create_task(self._run_reconnect_handler())
else:
err = data.get("msg", data.get("message", "unknown"))
log.error("Authentication failed (code=%s): %s", code, err)
self._running = False
if self.ws:
await self.ws.close()

async def _run_reconnect_handler(self) -> None:
try:
assert self._on_reconnect is not None
await self._on_reconnect()
except asyncio.CancelledError:
raise
except Exception:
log.exception("Reconnect handler error")

async def wait_ready(self, timeout: float = 30) -> bool:
try:
await asyncio.wait_for(self._ready_event.wait(), timeout)
Expand All @@ -216,5 +219,7 @@ async def wait_ready(self, timeout: float = 30) -> bool:

async def close(self) -> None:
self._running = False
if self._reconnect_task and not self._reconnect_task.done():
self._reconnect_task.cancel()
if self.ws:
await self.ws.close()
2 changes: 2 additions & 0 deletions fns_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class SyncConfig:
sync_notes: bool = True
sync_files: bool = True
sync_config: bool = True
upload_concurrency: int = 2
exclude_patterns: list[str] = field(
default_factory=lambda: [".git/**", ".trash/**", "*.tmp"]
)
Expand Down Expand Up @@ -85,6 +86,7 @@ def load_config(path: str) -> AppConfig:
sync_notes=s.get("sync_notes", True),
sync_files=s.get("sync_files", True),
sync_config=s.get("sync_config", True),
upload_concurrency=max(1, s.get("upload_concurrency", 2)),
exclude_patterns=s.get(
"exclude_patterns", [".git/**", ".trash/**", "*.tmp"]
),
Expand Down
Loading
Loading