diff --git a/fns_cli/client.py b/fns_cli/client.py index 13ac9b8..98fbe96 100644 --- a/fns_cli/client.py +++ b/fns_cli/client.py @@ -34,8 +34,10 @@ def __init__(self, config: AppConfig) -> None: self._handlers: dict[str, Callable[..., Coroutine]] = {} self._binary_handler: Callable[..., Coroutine] | None = None self._on_reconnect: Callable[[], Coroutine] | None = None + self._reconnect_task: asyncio.Task | None = None self._msg_queue: list[str | bytes] = [] self._ready_event = asyncio.Event() + self._send_lock = asyncio.Lock() def on_reconnect(self, handler: Callable[[], Coroutine]) -> None: self._on_reconnect = handler @@ -60,13 +62,14 @@ async def send_bytes(self, data: bytes) -> None: await self._raw_send(data) async def _raw_send(self, data: str | bytes) -> None: - if self.ws is None: - self._msg_queue.append(data) - return - try: - await self.ws.send(data) - except websockets.ConnectionClosed: - self._msg_queue.append(data) + async with self._send_lock: + if self.ws is None: + self._msg_queue.append(data) + return + try: + await self.ws.send(data) + except websockets.ConnectionClosed: + self._msg_queue.append(data) async def _flush_queue(self) -> None: queue, self._msg_queue = self._msg_queue, [] @@ -121,24 +124,16 @@ async def _connect(self) -> None: ) log.info("Connecting to %s", url) - # Heartbeat: the server sends Ping every 25s and closes the connection - # if it sees no frame within 60s (PingWait). The websockets library - # auto-responds to server pings with a pong, so the server's deadline - # is always refreshed as long as the connection is actually alive. - # - # We also send our own pings to detect the reverse case: a half-open - # TCP connection or a proxy that silently drops the server's pings. - # The interval/timeout are generous because the server's gws write - # queue can hold our Pong behind large binary frames during heavy - # chunk transfer — a 30s-or-less timeout false-triggers on initial - # sync. 45s + 90s gives roughly 135s worst-case dead-connection - # detection, which is comfortably past any observed Pong delay while - # still catching genuine network failures. + # The server already sends keepalive pings and will close dead + # connections on its side. During large attachment uploads, our own + # client-initiated ping/pong cycle causes false timeouts and forces an + # expensive reconnect + full resync loop, which then delays realtime + # local pushes even further. Rely on the server heartbeat here. self.ws = await websockets.connect( url, max_size=128 * 1024 * 1024, - ping_interval=45, - ping_timeout=90, + ping_interval=None, + ping_timeout=None, close_timeout=10, ) log.info("WebSocket connected, sending auth") @@ -169,7 +164,7 @@ async def _handle_text(self, raw: str) -> None: except Exception: log.exception("Handler error for %s", msg.action) else: - log.debug("Unhandled action: %s", msg.action) + log.warning("Unhandled action from server: %s", msg.action) async def _handle_binary(self, raw: bytes) -> None: if self._binary_handler and len(raw) > 42 and raw[:2] == b"00": @@ -196,10 +191,9 @@ async def _on_auth_response(self, msg: WSMessage) -> None: await self._flush_queue() self._ready_event.set() if self._connect_count > 1 and self._on_reconnect: - try: - await self._on_reconnect() - except Exception: - log.exception("Reconnect handler error") + if self._reconnect_task and not self._reconnect_task.done(): + self._reconnect_task.cancel() + self._reconnect_task = asyncio.create_task(self._run_reconnect_handler()) else: err = data.get("msg", data.get("message", "unknown")) log.error("Authentication failed (code=%s): %s", code, err) @@ -207,6 +201,15 @@ async def _on_auth_response(self, msg: WSMessage) -> None: if self.ws: await self.ws.close() + async def _run_reconnect_handler(self) -> None: + try: + assert self._on_reconnect is not None + await self._on_reconnect() + except asyncio.CancelledError: + raise + except Exception: + log.exception("Reconnect handler error") + async def wait_ready(self, timeout: float = 30) -> bool: try: await asyncio.wait_for(self._ready_event.wait(), timeout) @@ -216,5 +219,7 @@ async def wait_ready(self, timeout: float = 30) -> bool: async def close(self) -> None: self._running = False + if self._reconnect_task and not self._reconnect_task.done(): + self._reconnect_task.cancel() if self.ws: await self.ws.close() diff --git a/fns_cli/config.py b/fns_cli/config.py index e1d46ce..182f47b 100644 --- a/fns_cli/config.py +++ b/fns_cli/config.py @@ -20,6 +20,7 @@ class SyncConfig: sync_notes: bool = True sync_files: bool = True sync_config: bool = True + upload_concurrency: int = 2 exclude_patterns: list[str] = field( default_factory=lambda: [".git/**", ".trash/**", "*.tmp"] ) @@ -85,6 +86,7 @@ def load_config(path: str) -> AppConfig: sync_notes=s.get("sync_notes", True), sync_files=s.get("sync_files", True), sync_config=s.get("sync_config", True), + upload_concurrency=max(1, s.get("upload_concurrency", 2)), exclude_patterns=s.get( "exclude_patterns", [".git/**", ".trash/**", "*.tmp"] ), diff --git a/fns_cli/file_sync.py b/fns_cli/file_sync.py index a000d2d..9a25101 100644 --- a/fns_cli/file_sync.py +++ b/fns_cli/file_sync.py @@ -2,8 +2,10 @@ from __future__ import annotations +import asyncio import logging import os +import time import uuid from pathlib import Path from typing import TYPE_CHECKING @@ -20,6 +22,7 @@ ACTION_FILE_SYNC_RENAME, ACTION_FILE_SYNC_UPDATE, ACTION_FILE_UPLOAD, + ACTION_FILE_UPLOAD_ACK, ACTION_FILE_UPLOAD_CHECK, WSMessage, build_binary_chunk, @@ -65,11 +68,21 @@ def __init__(self, engine: SyncEngine) -> None: self.vault_path = engine.vault_path self._sync_complete = False self._download_sessions: dict[str, _DownloadSession] = {} + self._pending_download_paths: set[str] = set() self._expected_modify = 0 self._expected_delete = 0 self._received_modify = 0 self._received_delete = 0 self._got_end = False + self._pending_last_time = 0 + self._last_sync_activity_monotonic = time.monotonic() + self._upload_tasks: set[asyncio.Task] = set() + upload_concurrency = getattr(self.config.sync, "upload_concurrency", 2) + if not isinstance(upload_concurrency, int) or upload_concurrency < 1: + upload_concurrency = 2 + self._upload_worker_count = upload_concurrency + self._upload_workers: set[asyncio.Task] = set() + self._upload_queue: asyncio.Queue = asyncio.Queue() # See NoteSync._echo_hashes — same semantics here. Updated on both # inbound (server write / chunk finalize / rename / delete) and # outbound (push_upload / push_delete) so the cache tracks the most @@ -88,16 +101,35 @@ def register_handlers(self) -> None: ws.on(ACTION_FILE_SYNC_MTIME, self._on_sync_mtime) ws.on(ACTION_FILE_SYNC_CHUNK_DOWNLOAD, self._on_chunk_download_start) ws.on(ACTION_FILE_UPLOAD, self._on_upload_session) + ws.on(ACTION_FILE_UPLOAD_ACK, self._on_upload_ack) ws.on(ACTION_FILE_SYNC_END, self._on_sync_end) ws.on_binary(self._on_binary_chunk) def _reset_counters(self) -> None: self._sync_complete = False self._got_end = False + self._download_sessions.clear() + self._pending_download_paths.clear() self._expected_modify = 0 self._expected_delete = 0 self._received_modify = 0 self._received_delete = 0 + self._pending_last_time = 0 + self._last_sync_activity_monotonic = time.monotonic() + + def _mark_sync_activity(self) -> None: + self._last_sync_activity_monotonic = time.monotonic() + + def is_stalled(self, stale_seconds: float) -> bool: + if self._sync_complete or not self._got_end: + return False + if self._pending_download_paths or self._download_sessions: + return False + total_expected = self._expected_modify + self._expected_delete + total_received = self._received_modify + self._received_delete + if total_received >= total_expected: + return False + return (time.monotonic() - self._last_sync_activity_monotonic) >= stale_seconds async def request_sync(self) -> None: self._reset_counters() @@ -166,28 +198,75 @@ async def _on_upload_session(self, msg: WSMessage) -> None: log.warning("Upload requested but file missing: %s", rel_path) return - log.info("Uploading %s (sessionId=%s, chunkSize=%d)", rel_path, session_id[:8], chunk_size) + await self._ensure_upload_workers() + completion = asyncio.get_running_loop().create_future() + await self._upload_queue.put((session_id, chunk_size, rel_path, full, completion)) + task = asyncio.create_task(self._await_upload_completion(completion)) + self._upload_tasks.add(task) + task.add_done_callback(self._upload_tasks.discard) + + async def _ensure_upload_workers(self) -> None: + while len(self._upload_workers) < self._upload_worker_count: + task = asyncio.create_task(self._upload_queue_worker()) + self._upload_workers.add(task) + task.add_done_callback(self._upload_workers.discard) + + async def _await_upload_completion(self, completion: asyncio.Future) -> None: + await completion + + async def _upload_queue_worker(self) -> None: + while True: + session_id, chunk_size, rel_path, full, completion = await self._upload_queue.get() + try: + await self._upload_session_worker(session_id, chunk_size, rel_path, full) + except Exception as exc: + if not completion.done(): + completion.set_exception(exc) + else: + if not completion.done(): + completion.set_result(None) + finally: + self._upload_queue.task_done() + + async def _upload_session_worker( + self, + session_id: str, + chunk_size: int, + rel_path: str, + full: Path, + ) -> None: + log.info( + "Uploading %s (sessionId=%s, chunkSize=%d)", + rel_path, + session_id[:8], + chunk_size, + ) + + file_data = full.read_bytes() + total = len(file_data) + idx = 0 + offset = 0 + while offset < total: + end = min(offset + chunk_size, total) + chunk = build_binary_chunk(session_id, idx, file_data[offset:end]) + await self.engine.ws_client.send_bytes(chunk) + offset = end + idx += 1 + await asyncio.sleep(0) + if total == 0: + chunk = build_binary_chunk(session_id, 0, b"") + await self.engine.ws_client.send_bytes(chunk) + log.info("Upload complete: %s (%d chunks)", rel_path, idx) - try: - file_data = full.read_bytes() - total = len(file_data) - idx = 0 - offset = 0 - while offset < total: - end = min(offset + chunk_size, total) - chunk = build_binary_chunk(session_id, idx, file_data[offset:end]) - await self.engine.ws_client.send_bytes(chunk) - offset = end - idx += 1 - if total == 0: - chunk = build_binary_chunk(session_id, 0, b"") - await self.engine.ws_client.send_bytes(chunk) - log.info("Upload complete: %s (%d chunks)", rel_path, idx) - except Exception: - log.exception("Upload failed for %s", rel_path) + async def _on_upload_ack(self, msg: WSMessage) -> None: + data = _extract_inner(msg.data) + rel_path = data.get("path", "") + if rel_path: + log.debug("← FileUploadAck: %s", rel_path) async def _on_sync_update(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() rel_path: str = data.get("path", "") content = data.get("content") mtime = data.get("mtime", 0) @@ -198,6 +277,7 @@ async def _on_sync_update(self, msg: WSMessage) -> None: if content is None: # Attachment files: server sends metadata only, we must request # a chunked download via FileChunkDownload. + self._pending_download_paths.add(rel_path) log.info("← FileSyncUpdate (requesting chunk download): %s", rel_path) await self._request_chunk_download(rel_path, data) self._received_modify += 1 @@ -237,6 +317,7 @@ async def _request_chunk_download(self, rel_path: str, data: dict) -> None: async def _on_sync_delete(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() rel_path: str = data.get("path", "") if not rel_path: return @@ -257,6 +338,7 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: async def _on_sync_rename(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() old_path: str = data.get("oldPath", "") new_path: str = data.get("path", "") if not old_path or not new_path: @@ -280,6 +362,7 @@ async def _on_sync_rename(self, msg: WSMessage) -> None: async def _on_sync_mtime(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() rel_path: str = data.get("path", "") mtime = data.get("mtime", 0) if not rel_path or not mtime: @@ -294,6 +377,7 @@ async def _on_sync_mtime(self, msg: WSMessage) -> None: async def _on_chunk_download_start(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() session_id: str = data.get("sessionId", "") rel_path: str = data.get("path", "") size: int = data.get("size", 0) @@ -307,9 +391,13 @@ async def _on_chunk_download_start(self, msg: WSMessage) -> None: "← FileSyncChunkDownload start: %s (%d bytes, %d chunks)", rel_path, size, total_chunks, ) + if total_chunks <= 0: + await self._finalize_empty_download(rel_path) + return self._download_sessions[session_id] = _DownloadSession( path=rel_path, size=size, total_chunks=total_chunks, chunk_size=chunk_size, ) + self._pending_download_paths.discard(rel_path) async def _on_binary_chunk(self, session_id: str, chunk_index: int, data: bytes) -> None: session = self._download_sessions.get(session_id) @@ -322,6 +410,7 @@ async def _on_binary_chunk(self, session_id: str, chunk_index: int, data: bytes) await self._finalize_download(session_id, session) async def _finalize_download(self, session_id: str, session: _DownloadSession) -> None: + self._mark_sync_activity() rel_path = session.path full = self.vault_path / rel_path try: @@ -336,14 +425,34 @@ async def _finalize_download(self, session_id: str, session: _DownloadSession) - log.exception("Failed to write downloaded file %s", rel_path) finally: self._download_sessions.pop(session_id, None) + self._pending_download_paths.discard(rel_path) + self._check_complete() + + async def _finalize_empty_download(self, rel_path: str) -> None: + self._mark_sync_activity() + full = self.vault_path / rel_path + try: + if full.exists() and full.is_dir(): + log.warning( + "Skipping zero-chunk download for directory path: %s", + rel_path, + ) + else: + full.parent.mkdir(parents=True, exist_ok=True) + full.write_bytes(b"") + self._echo_hashes[rel_path] = file_content_hash_binary(full) + log.info("← Zero-chunk download complete: %s", rel_path) + except Exception: + log.exception("Failed to finalize zero-chunk download %s", rel_path) + finally: + self._pending_download_paths.discard(rel_path) + self._check_complete() async def _on_sync_end(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) + self._mark_sync_activity() last_time = data.get("lastTime", 0) - if last_time: - self.engine.state.last_file_sync_time = last_time - self.engine.state.save() - + self._pending_last_time = last_time self._expected_modify = data.get("needModifyCount", 0) self._expected_delete = data.get("needDeleteCount", 0) need_upload = data.get("needUploadCount", 0) @@ -354,23 +463,33 @@ async def _on_sync_end(self, msg: WSMessage) -> None: last_time, self._expected_modify, self._expected_delete, need_upload, ) - total = self._expected_modify + self._expected_delete - if total == 0: - self._sync_complete = True - else: - self._check_complete() + self._check_complete() def _check_complete(self) -> None: if not self._got_end: return total_expected = self._expected_modify + self._expected_delete total_received = self._received_modify + self._received_delete - if total_received >= total_expected: + if total_received >= total_expected and not self._pending_download_paths and not self._download_sessions: log.info( "FileSync complete: %d modified, %d deleted", self._received_modify, self._received_delete, ) self._sync_complete = True + self._commit_last_time() + elif total_received >= total_expected and (self._pending_download_paths or self._download_sessions): + log.info( + "FileSync detail messages complete, waiting for %d pending downloads and %d active sessions", + len(self._pending_download_paths), + len(self._download_sessions), + ) + + def _commit_last_time(self) -> None: + if self._pending_last_time: + log.info("Committing file lastTime=%d", self._pending_last_time) + self.engine.state.last_file_sync_time = self._pending_last_time + self.engine.state.save() + self._pending_last_time = 0 def _collect_local_files(self) -> list[dict]: """Collect non-note, non-excluded local files with hashes for FileSync.""" diff --git a/fns_cli/folder_sync.py b/fns_cli/folder_sync.py new file mode 100644 index 0000000..838267a --- /dev/null +++ b/fns_cli/folder_sync.py @@ -0,0 +1,86 @@ +"""Folder sync protocol: apply server-pushed folder create/delete/rename events.""" + +from __future__ import annotations + +import logging +import shutil +from pathlib import Path +from typing import TYPE_CHECKING + +from .protocol import ( + ACTION_FOLDER_SYNC_DELETE, + ACTION_FOLDER_SYNC_MODIFY, + ACTION_FOLDER_SYNC_RENAME, + WSMessage, +) + +if TYPE_CHECKING: + from .sync_engine import SyncEngine + +log = logging.getLogger("fns_cli.folder_sync") + + +def _extract_inner(msg_data: dict) -> dict: + if isinstance(msg_data, dict) and "data" in msg_data: + inner = msg_data["data"] + if isinstance(inner, dict): + return inner + return msg_data if isinstance(msg_data, dict) else {} + + +class FolderSync: + def __init__(self, engine: SyncEngine) -> None: + self.engine = engine + self.vault_path = engine.vault_path + + def register_handlers(self) -> None: + ws = self.engine.ws_client + ws.on(ACTION_FOLDER_SYNC_MODIFY, self._on_sync_modify) + ws.on(ACTION_FOLDER_SYNC_DELETE, self._on_sync_delete) + ws.on(ACTION_FOLDER_SYNC_RENAME, self._on_sync_rename) + + async def _on_sync_modify(self, msg: WSMessage) -> None: + data = _extract_inner(msg.data) + rel_path: str = data.get("path", "") + if not rel_path: + return + + full = self.vault_path / rel_path + try: + full.mkdir(parents=True, exist_ok=True) + log.info("← FolderSyncModify: %s", rel_path) + except Exception: + log.exception("Failed to create folder %s", rel_path) + + async def _on_sync_delete(self, msg: WSMessage) -> None: + data = _extract_inner(msg.data) + rel_path: str = data.get("path", "") + if not rel_path: + return + + full = self.vault_path / rel_path + try: + if full.exists(): + shutil.rmtree(full) + log.info("← FolderSyncDelete: %s", rel_path) + except Exception: + log.exception("Failed to delete folder %s", rel_path) + + async def _on_sync_rename(self, msg: WSMessage) -> None: + data = _extract_inner(msg.data) + old_path: str = data.get("oldPath", "") + new_path: str = data.get("path", "") + if not old_path or not new_path: + return + + old_full = self.vault_path / old_path + new_full = self.vault_path / new_path + try: + new_full.parent.mkdir(parents=True, exist_ok=True) + if old_full.exists(): + old_full.rename(new_full) + else: + new_full.mkdir(parents=True, exist_ok=True) + log.info("← FolderSyncRename: %s → %s", old_path, new_path) + except Exception: + log.exception("Failed to rename folder %s → %s", old_path, new_path) diff --git a/fns_cli/note_sync.py b/fns_cli/note_sync.py index 7e61159..d136187 100644 --- a/fns_cli/note_sync.py +++ b/fns_cli/note_sync.py @@ -18,6 +18,7 @@ ACTION_NOTE_SYNC_MODIFY, ACTION_NOTE_SYNC_MTIME, ACTION_NOTE_SYNC_NEED_PUSH, + ACTION_NOTE_SYNC_RENAME, WSMessage, ) @@ -50,6 +51,7 @@ def __init__(self, engine: SyncEngine) -> None: self._received_modify = 0 self._received_delete = 0 self._got_end = False + self._pending_last_time = 0 # Per-path "last known synced state" — updated on BOTH inbound # (server → local write) and outbound (local → server push). Value # is the content hash, or _DELETED sentinel for an absent file. @@ -77,6 +79,7 @@ def register_handlers(self) -> None: ws = self.engine.ws_client ws.on(ACTION_NOTE_SYNC_MODIFY, self._on_sync_modify) ws.on(ACTION_NOTE_SYNC_DELETE, self._on_sync_delete) + ws.on(ACTION_NOTE_SYNC_RENAME, self._on_sync_rename) ws.on(ACTION_NOTE_SYNC_MTIME, self._on_sync_mtime) ws.on(ACTION_NOTE_SYNC_NEED_PUSH, self._on_sync_need_push) ws.on(ACTION_NOTE_SYNC_END, self._on_sync_end) @@ -203,6 +206,32 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: self._received_delete += 1 self._check_all_received() + async def _on_sync_rename(self, msg: WSMessage) -> None: + data = _extract_inner(msg.data) + old_path: str = data.get("oldPath", "") + new_path: str = data.get("path", "") + if not old_path or not new_path: + return + + self._echo_hashes[old_path] = _DELETED + + old_full = self.vault_path / old_path + new_full = self.vault_path / new_path + try: + if old_full.exists(): + new_full.parent.mkdir(parents=True, exist_ok=True) + old_full.rename(new_full) + try: + text = new_full.read_text(encoding="utf-8") + except Exception: + log.exception("Failed to read renamed note %s", new_path) + else: + self._echo_hashes[new_path] = content_hash(text) + log.info("← NoteSyncRename: %s → %s", old_path, new_path) + self._try_remove_empty_parent(old_full) + except Exception: + log.exception("Failed to rename %s → %s", old_path, new_path) + async def _on_sync_mtime(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) rel_path: str = data.get("path", "") @@ -232,10 +261,7 @@ async def _on_sync_end(self, msg: WSMessage) -> None: last_time = data.get("lastTime", 0) self._expected_modify = data.get("needModifyCount", 0) self._expected_delete = data.get("needDeleteCount", 0) - - if last_time: - self.engine.state.last_note_sync_time = last_time - self.engine.state.save() + self._pending_last_time = last_time self._got_end = True log.info( @@ -249,6 +275,7 @@ async def _on_sync_end(self, msg: WSMessage) -> None: total_expected = self._expected_modify + self._expected_delete if total_expected == 0: self._sync_complete = True + self._commit_last_time() else: self._check_all_received() @@ -261,6 +288,7 @@ def _reset_counters(self) -> None: self._expected_delete = 0 self._received_modify = 0 self._received_delete = 0 + self._pending_last_time = 0 def _check_all_received(self) -> None: if not self._got_end: @@ -274,6 +302,14 @@ def _check_all_received(self) -> None: self._received_delete, ) self._sync_complete = True + self._commit_last_time() + + def _commit_last_time(self) -> None: + if self._pending_last_time: + log.info("Committing note lastTime=%d", self._pending_last_time) + self.engine.state.last_note_sync_time = self._pending_last_time + self.engine.state.save() + self._pending_last_time = 0 def _try_remove_empty_parent(self, file_path: Path) -> None: parent = file_path.parent diff --git a/fns_cli/protocol.py b/fns_cli/protocol.py index 7cd9cd8..9e426a4 100644 --- a/fns_cli/protocol.py +++ b/fns_cli/protocol.py @@ -44,8 +44,12 @@ ACTION_FILE_SYNC_MTIME = "FileSyncMtime" ACTION_FILE_SYNC_CHUNK_DOWNLOAD = "FileSyncChunkDownload" ACTION_FILE_UPLOAD = "FileUpload" +ACTION_FILE_UPLOAD_ACK = "FileUploadAck" ACTION_FILE_SYNC_END = "FileSyncEnd" +ACTION_FOLDER_SYNC_MODIFY = "FolderSyncModify" +ACTION_FOLDER_SYNC_DELETE = "FolderSyncDelete" +ACTION_FOLDER_SYNC_RENAME = "FolderSyncRename" ACTION_FOLDER_SYNC_END = "FolderSyncEnd" # ── Status codes ────────────────────────────────────────────────────── diff --git a/fns_cli/sync_engine.py b/fns_cli/sync_engine.py index 4ad4ef7..7a53fa3 100644 --- a/fns_cli/sync_engine.py +++ b/fns_cli/sync_engine.py @@ -10,6 +10,7 @@ from .client import WSClient from .config import AppConfig from .file_sync import FileSync +from .folder_sync import FolderSync from .note_sync import NoteSync from .state import SyncState @@ -24,6 +25,7 @@ def __init__(self, config: AppConfig) -> None: self.state = SyncState.load(self.vault_path) self.note_sync = NoteSync(self) self.file_sync = FileSync(self) + self.folder_sync = FolderSync(self) self._ignored_files: set[str] = set() self._watch_enabled = False @@ -89,6 +91,7 @@ async def on_local_rename(self, new_rel: str, old_rel: str) -> None: def _register_handlers(self) -> None: self.note_sync.register_handlers() self.file_sync.register_handlers() + self.folder_sync.register_handlers() async def run(self) -> None: """Connect, do initial sync, then watch for changes indefinitely.""" @@ -227,6 +230,14 @@ async def _wait_file_sync(self, timeout: float = 60) -> None: loop = asyncio.get_running_loop() deadline = loop.time() + timeout while not self.file_sync.is_sync_complete: + if self.file_sync.is_stalled(stale_seconds=5): + log.warning( + "FileSync stalled after FileSyncEnd " + "(expected=%d, received=%d); continuing with watcher enabled", + self.file_sync._expected_modify + self.file_sync._expected_delete, + self.file_sync._received_modify + self.file_sync._received_delete, + ) + break if loop.time() > deadline: log.warning("FileSync timed out after %.0fs", timeout) break diff --git a/fns_cli/watcher.py b/fns_cli/watcher.py index f67e19d..8b2a2fb 100644 --- a/fns_cli/watcher.py +++ b/fns_cli/watcher.py @@ -33,10 +33,18 @@ def __init__(self, engine: SyncEngine, loop: asyncio.AbstractEventLoop) -> None: self.engine = engine self.loop = loop self._pending: dict[str, asyncio.TimerHandle] = {} + self._known_files: set[str] = set() + self._seed_known_files() def _rel(self, abs_path: str) -> str: return Path(abs_path).relative_to(self.engine.vault_path).as_posix() + def _rel_or_none(self, abs_path: str) -> str | None: + try: + return self._rel(abs_path) + except ValueError: + return None + def _schedule(self, key: str, coro_factory): handle = self._pending.pop(key, None) if handle: @@ -48,6 +56,22 @@ def _fire(): self._pending[key] = self.loop.call_later(DEBOUNCE_SECONDS, _fire) + def _seed_known_files(self) -> None: + for fp in self.engine.vault_path.rglob("*"): + if not fp.is_file(): + continue + rel = fp.relative_to(self.engine.vault_path).as_posix() + if self.engine.is_excluded(rel): + continue + self._known_files.add(rel) + + def _track_file(self, rel_path: str) -> None: + if not self.engine.is_excluded(rel_path): + self._known_files.add(rel_path) + + def _untrack_file(self, rel_path: str) -> None: + self._known_files.discard(rel_path) + # ── watchdog callbacks (called from observer thread) ───────────── def on_created(self, event): @@ -59,6 +83,7 @@ def on_created(self, event): return if self.engine.is_ignored(rel) or self.engine.is_excluded(rel): return + self._track_file(rel) self._schedule(f"mod:{rel}", lambda: self.engine.on_local_change(rel)) def on_modified(self, event): @@ -70,15 +95,12 @@ def on_modified(self, event): return if self.engine.is_ignored(rel) or self.engine.is_excluded(rel): return + self._track_file(rel) self._schedule(f"mod:{rel}", lambda: self.engine.on_local_change(rel)) def on_deleted(self, event): - # Directory delete: watchdog on most platforms already fires per-file - # delete events for children before the directory event, so we only - # need to process individual files here. If a platform skips those - # child events we cannot reconstruct them (the files are already - # gone), so there is nothing more we can do at this layer. if event.is_directory: + self._handle_directory_delete(event) return try: rel = self._rel(event.src_path) @@ -86,6 +108,7 @@ def on_deleted(self, event): return if self.engine.is_ignored(rel) or self.engine.is_excluded(rel): return + self._untrack_file(rel) self._schedule(f"del:{rel}", lambda: self.engine.on_local_delete(rel)) def on_moved(self, event): @@ -97,13 +120,39 @@ def on_moved(self, event): # computing each old path by swapping the renamed prefix. self._handle_directory_move(event) return - try: - old_rel = self._rel(event.src_path) - new_rel = self._rel(event.dest_path) - except ValueError: + old_rel = self._rel_or_none(event.src_path) + new_rel = self._rel_or_none(event.dest_path) + if old_rel is None and new_rel is None: + return + if old_rel is None: + if self.engine.is_ignored(new_rel) or self.engine.is_excluded(new_rel): + return + self._track_file(new_rel) + self._schedule(f"mod:{new_rel}", lambda n=new_rel: self.engine.on_local_change(n)) + return + if new_rel is None: + if self.engine.is_ignored(old_rel) or self.engine.is_excluded(old_rel): + return + self._untrack_file(old_rel) + self._schedule(f"del:{old_rel}", lambda o=old_rel: self.engine.on_local_delete(o)) return self._schedule_move_transition(old_rel, new_rel) + def _handle_directory_delete(self, event) -> None: + rel_dir = self._rel_or_none(event.src_path) + if rel_dir is None: + return + prefix = f"{rel_dir}/" + victims = sorted( + rel for rel in self._known_files + if rel == rel_dir or rel.startswith(prefix) + ) + for rel in victims: + if self.engine.is_ignored(rel) or self.engine.is_excluded(rel): + continue + self._untrack_file(rel) + self._schedule(f"del:{rel}", lambda p=rel: self.engine.on_local_delete(p)) + def _handle_directory_move(self, event) -> None: """Enumerate a renamed directory's children and schedule per-file renames.""" new_dir = Path(event.dest_path) @@ -137,20 +186,25 @@ def _schedule_move_transition(self, old_rel: str, new_rel: str) -> None: new_excluded = self.engine.is_excluded(new_rel) if old_excluded and new_excluded: + self._untrack_file(old_rel) return if not old_excluded and new_excluded: + self._untrack_file(old_rel) self._schedule( f"del:{old_rel}", lambda o=old_rel: self.engine.on_local_delete(o), ) return if old_excluded and not new_excluded: + self._track_file(new_rel) self._schedule( f"mod:{new_rel}", lambda n=new_rel: self.engine.on_local_change(n), ) return + self._untrack_file(old_rel) + self._track_file(new_rel) self._schedule( f"mv:{old_rel}:{new_rel}", lambda o=old_rel, n=new_rel: self.engine.on_local_rename(n, o), diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..4e73f82 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,79 @@ +"""Unit tests for WSClient reconnect behaviour.""" + +from __future__ import annotations + +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock + +from fns_cli.client import WSClient +from fns_cli.protocol import WSMessage + + +def _make_config() -> MagicMock: + config = MagicMock() + config.client.reconnect_base_delay = 1 + config.client.reconnect_max_retries = 3 + config.server.token = "token" + config.ws_api = "wss://example.com" + return config + + +class TestWSClientReconnect(unittest.IsolatedAsyncioTestCase): + + async def test_auth_response_does_not_block_on_reconnect_sync(self): + client = WSClient(_make_config()) + client._connect_count = 2 + client._raw_send = AsyncMock() + client._flush_queue = AsyncMock() + + started = asyncio.Event() + release = asyncio.Event() + + async def on_reconnect(): + started.set() + await release.wait() + + client.on_reconnect(on_reconnect) + + await client._on_auth_response(WSMessage("Authorization", {"code": 1})) + + self.assertTrue(client.is_authenticated) + self.assertTrue(client._ready_event.is_set()) + self.assertIsNotNone(client._reconnect_task) + await asyncio.wait_for(started.wait(), timeout=1) + self.assertFalse(client._reconnect_task.done()) + + release.set() + await asyncio.wait_for(client._reconnect_task, timeout=1) + + async def test_raw_send_is_serialized(self): + client = WSClient(_make_config()) + + active = 0 + max_active = 0 + release = asyncio.Event() + + class FakeWS: + async def send(self, _data): + nonlocal active, max_active + active += 1 + max_active = max(max_active, active) + await release.wait() + active -= 1 + + client.ws = FakeWS() + + first = asyncio.create_task(client._raw_send("a")) + await asyncio.sleep(0) + second = asyncio.create_task(client._raw_send("b")) + await asyncio.sleep(0.05) + + self.assertEqual(max_active, 1) + + release.set() + await asyncio.wait_for(asyncio.gather(first, second), timeout=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_file_sync.py b/tests/test_file_sync.py index c32ddcc..298652d 100644 --- a/tests/test_file_sync.py +++ b/tests/test_file_sync.py @@ -12,9 +12,11 @@ import unittest from pathlib import Path from unittest.mock import AsyncMock, MagicMock +from unittest.mock import patch from fns_cli.file_sync import FileSync from fns_cli.protocol import ( + ACTION_FILE_SYNC_CHUNK_DOWNLOAD, ACTION_FILE_SYNC_DELETE, ACTION_FILE_SYNC_END, ACTION_FILE_SYNC_UPDATE, @@ -27,12 +29,14 @@ def _make_engine(vault_path: Path) -> MagicMock: config = MagicMock() config.server.vault = "test-vault" config.sync.file_chunk_size = 1024 * 1024 + config.sync.upload_concurrency = 1 state = MagicMock() state.last_file_sync_time = 0 ws = MagicMock() ws.send = AsyncMock() + ws.send_bytes = AsyncMock() engine = MagicMock() engine.config = config @@ -175,6 +179,149 @@ async def test_unexpected_content_type_does_not_stall(self): await self.fs._on_sync_update(msg) self.assertTrue(self.fs.is_sync_complete) + async def test_last_time_commits_only_after_chunk_download_finishes(self): + session_id = "12345678-1234-1234-1234-123456789012" + + await self._send_end(need_modify=1, last_time=4321) + self.assertEqual(self.engine.state.last_file_sync_time, 0) + + msg = _wrap(ACTION_FILE_SYNC_UPDATE, { + "path": "asset.bin", + "content": None, + "size": 3, + "totalChunks": 1, + "chunkSize": 3, + }) + await self.fs._on_sync_update(msg) + self.assertEqual(self.engine.state.last_file_sync_time, 0) + self.assertIn("asset.bin", self.fs._pending_download_paths) + + start = _wrap(ACTION_FILE_SYNC_CHUNK_DOWNLOAD, { + "sessionId": session_id, + "path": "asset.bin", + "size": 3, + "totalChunks": 1, + "chunkSize": 3, + }) + await self.fs._on_chunk_download_start(start) + await self.fs._on_binary_chunk(session_id, 0, b"abc") + + self.assertTrue(self.fs.is_sync_complete) + self.assertEqual(self.engine.state.last_file_sync_time, 4321) + self.engine.state.save.assert_called() + self.assertEqual((self.vault / "asset.bin").read_bytes(), b"abc") + + async def test_zero_chunk_download_completes_immediately(self): + msg = _wrap(ACTION_FILE_SYNC_UPDATE, { + "path": "empty.bin", + "content": None, + "size": 0, + "totalChunks": 0, + "chunkSize": 1024, + }) + start = _wrap(ACTION_FILE_SYNC_CHUNK_DOWNLOAD, { + "sessionId": "12345678-1234-1234-1234-123456789012", + "path": "empty.bin", + "size": 0, + "totalChunks": 0, + "chunkSize": 1024, + }) + + await self._send_end(need_modify=1, last_time=5555) + await self.fs._on_sync_update(msg) + await self.fs._on_chunk_download_start(start) + + self.assertTrue(self.fs.is_sync_complete) + self.assertEqual(self.engine.state.last_file_sync_time, 5555) + self.assertEqual((self.vault / "empty.bin").read_bytes(), b"") + + async def test_request_sync_clears_stale_download_state(self): + self.fs._download_sessions["stale"] = object() + self.fs._pending_download_paths.add("stale.bin") + + await self.fs.request_sync() + + self.assertEqual(self.fs._download_sessions, {}) + self.assertEqual(self.fs._pending_download_paths, set()) + + async def test_upload_session_is_scheduled_without_blocking_handler(self): + target = self.vault / "asset.bin" + target.write_bytes(b"abc") + + gate = asyncio.Event() + + async def slow_send_bytes(_data): + await gate.wait() + + self.engine.ws_client.send_bytes.side_effect = slow_send_bytes + + msg = _wrap("FileUpload", { + "sessionId": "12345678-1234-1234-1234-123456789012", + "chunkSize": 1, + "path": "asset.bin", + }) + await self.fs._on_upload_session(msg) + + self.assertEqual(len(self.fs._upload_tasks), 1) + gate.set() + await asyncio.wait_for(asyncio.gather(*self.fs._upload_tasks), timeout=1) + + async def test_is_stalled_when_end_counts_never_arrive(self): + await self._send_end(need_delete=1) + + with patch("fns_cli.file_sync.time.monotonic", return_value=self.fs._last_sync_activity_monotonic + 6): + self.assertTrue(self.fs.is_stalled(5)) + + async def test_is_not_stalled_while_download_pending(self): + await self._send_end(need_modify=1) + self.fs._pending_download_paths.add("asset.bin") + + with patch("fns_cli.file_sync.time.monotonic", return_value=self.fs._last_sync_activity_monotonic + 6): + self.assertFalse(self.fs.is_stalled(5)) + + async def test_upload_sessions_respect_concurrency_limit(self): + first = self.vault / "first.bin" + second = self.vault / "second.bin" + first.write_bytes(b"a") + second.write_bytes(b"b") + + started = asyncio.Event() + release = asyncio.Event() + active = 0 + max_active = 0 + + async def blocking_send_bytes(_data): + nonlocal active, max_active + active += 1 + max_active = max(max_active, active) + started.set() + await release.wait() + active -= 1 + + self.engine.ws_client.send_bytes.side_effect = blocking_send_bytes + + msg1 = _wrap("FileUpload", { + "sessionId": "12345678-1234-1234-1234-123456789012", + "chunkSize": 1, + "path": "first.bin", + }) + msg2 = _wrap("FileUpload", { + "sessionId": "22345678-1234-1234-1234-123456789012", + "chunkSize": 1, + "path": "second.bin", + }) + + await self.fs._on_upload_session(msg1) + await self.fs._on_upload_session(msg2) + await asyncio.wait_for(started.wait(), timeout=1) + await asyncio.sleep(0.05) + + self.assertEqual(max_active, 1) + self.assertEqual(self.engine.ws_client.send_bytes.await_count, 1) + + release.set() + await asyncio.wait_for(asyncio.gather(*self.fs._upload_tasks), timeout=1) + class TestFileSyncWatcherIgnore(unittest.TestCase): """Watcher on_moved must honour is_ignored to avoid echo-backs.""" @@ -256,6 +403,50 @@ def test_move_into_excluded_schedules_delete(self): handler._pending["del:old.png"].cancel() loop.close() + def test_move_from_outside_into_vault_schedules_upload(self): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + outside = Path(tempfile.mkdtemp()) + try: + handler, engine, loop = self._make_handler(vault, set()) + + from watchdog.events import FileMovedEvent + ev = FileMovedEvent( + str(outside / "incoming.png"), + str(vault / "incoming.png"), + ) + handler.on_moved(ev) + + self.assertIn("mod:incoming.png", handler._pending) + handler._pending["mod:incoming.png"].cancel() + loop.close() + finally: + for child in outside.iterdir(): + child.unlink() + outside.rmdir() + + def test_move_from_vault_to_outside_schedules_delete(self): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + outside = Path(tempfile.mkdtemp()) + try: + handler, engine, loop = self._make_handler(vault, set()) + + from watchdog.events import FileMovedEvent + ev = FileMovedEvent( + str(vault / "outgoing.png"), + str(outside / "outgoing.png"), + ) + handler.on_moved(ev) + + self.assertIn("del:outgoing.png", handler._pending) + handler._pending["del:outgoing.png"].cancel() + loop.close() + finally: + for child in outside.iterdir(): + child.unlink() + outside.rmdir() + def test_directory_move_into_excluded_schedules_child_deletes(self): with tempfile.TemporaryDirectory() as tmp: vault = Path(tmp) @@ -282,6 +473,30 @@ def test_directory_move_into_excluded_schedules_child_deletes(self): handler._pending["del:old/b.txt"].cancel() loop.close() + def test_directory_delete_schedules_child_file_deletes(self): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + doomed = vault / "12345" + doomed.mkdir(parents=True) + (doomed / "a.md").write_text("a", encoding="utf-8") + (doomed / "b.png").write_bytes(b"png") + (doomed / "nested").mkdir() + (doomed / "nested" / "c.txt").write_text("c", encoding="utf-8") + + handler, engine, loop = self._make_handler(vault, set()) + + from watchdog.events import DirDeletedEvent + ev = DirDeletedEvent(str(vault / "12345")) + handler.on_deleted(ev) + + self.assertIn("del:12345/a.md", handler._pending) + self.assertIn("del:12345/b.png", handler._pending) + self.assertIn("del:12345/nested/c.txt", handler._pending) + handler._pending["del:12345/a.md"].cancel() + handler._pending["del:12345/b.png"].cancel() + handler._pending["del:12345/nested/c.txt"].cancel() + loop.close() + if __name__ == "__main__": unittest.main() diff --git a/tests/test_folder_sync.py b/tests/test_folder_sync.py new file mode 100644 index 0000000..77a949a --- /dev/null +++ b/tests/test_folder_sync.py @@ -0,0 +1,68 @@ +"""Unit tests for FolderSync server-pushed folder events.""" + +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +from fns_cli.folder_sync import FolderSync +from fns_cli.protocol import WSMessage + + +def _make_engine(vault_path: Path) -> MagicMock: + config = MagicMock() + ws = MagicMock() + ws.send = AsyncMock() + + engine = MagicMock() + engine.config = config + engine.vault_path = vault_path + engine.ws_client = ws + return engine + + +def _wrap(action: str, inner: dict) -> WSMessage: + return WSMessage(action, {"data": inner}) + + +class TestFolderSync(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.vault = Path(self._tmp.name) + self.fs = FolderSync(_make_engine(self.vault)) + + async def asyncTearDown(self): + self._tmp.cleanup() + + async def test_modify_creates_folder(self): + await self.fs._on_sync_modify(_wrap("FolderSyncModify", {"path": "foo/bar"})) + self.assertTrue((self.vault / "foo" / "bar").is_dir()) + + async def test_rename_moves_folder(self): + old_dir = self.vault / "foo" / "bar" + old_dir.mkdir(parents=True) + (old_dir / "note.md").write_text("x", encoding="utf-8") + + await self.fs._on_sync_rename(_wrap("FolderSyncRename", { + "oldPath": "foo/bar", + "path": "foo/baz", + })) + + self.assertFalse((self.vault / "foo" / "bar").exists()) + self.assertTrue((self.vault / "foo" / "baz" / "note.md").exists()) + + async def test_delete_removes_folder_tree(self): + doomed = self.vault / "foo" / "bar" + doomed.mkdir(parents=True) + (doomed / "note.md").write_text("x", encoding="utf-8") + + await self.fs._on_sync_delete(_wrap("FolderSyncDelete", {"path": "foo"})) + + self.assertFalse((self.vault / "foo").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_note_sync.py b/tests/test_note_sync.py new file mode 100644 index 0000000..dbdeec1 --- /dev/null +++ b/tests/test_note_sync.py @@ -0,0 +1,87 @@ +"""Unit tests for NoteSync inbound state handling and rename behaviour.""" + +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +from fns_cli.note_sync import NoteSync +from fns_cli.protocol import ( + ACTION_NOTE_SYNC_END, + ACTION_NOTE_SYNC_MODIFY, + ACTION_NOTE_SYNC_RENAME, + WSMessage, +) + + +def _make_engine(vault_path: Path) -> MagicMock: + config = MagicMock() + config.server.vault = "test-vault" + + state = MagicMock() + state.last_note_sync_time = 0 + + ws = MagicMock() + ws.send = AsyncMock() + + engine = MagicMock() + engine.config = config + engine.vault_path = vault_path + engine.state = state + engine.ws_client = ws + engine.is_excluded = MagicMock(return_value=False) + return engine + + +def _wrap(action: str, inner: dict) -> WSMessage: + return WSMessage(action, {"data": inner}) + + +class TestNoteSyncInbound(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.vault = Path(self._tmp.name) + self.engine = _make_engine(self.vault) + self.ns = NoteSync(self.engine) + + async def asyncTearDown(self): + self._tmp.cleanup() + + async def test_last_time_commits_only_after_all_details_arrive(self): + await self.ns._on_sync_end(_wrap(ACTION_NOTE_SYNC_END, { + "lastTime": 1234, + "needModifyCount": 1, + "needDeleteCount": 0, + "needUploadCount": 0, + })) + self.assertEqual(self.engine.state.last_note_sync_time, 0) + + await self.ns._on_sync_modify(_wrap(ACTION_NOTE_SYNC_MODIFY, { + "path": "note.md", + "content": "hello", + "mtime": 0, + })) + + self.assertTrue(self.ns.is_sync_complete) + self.assertEqual(self.engine.state.last_note_sync_time, 1234) + self.engine.state.save.assert_called() + + async def test_sync_rename_moves_note_on_disk(self): + old_path = self.vault / "folder" / "old.md" + old_path.parent.mkdir(parents=True, exist_ok=True) + old_path.write_text("hello", encoding="utf-8") + + await self.ns._on_sync_rename(_wrap(ACTION_NOTE_SYNC_RENAME, { + "oldPath": "folder/old.md", + "path": "folder/new.md", + })) + + self.assertFalse((self.vault / "folder" / "old.md").exists()) + self.assertEqual((self.vault / "folder" / "new.md").read_text(encoding="utf-8"), "hello") + + +if __name__ == "__main__": + unittest.main()