diff --git a/.gitignore b/.gitignore index aaffb97..df13dc9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ vault/ +test-vault/ +test-config.yaml __pycache__/ *.pyc .fns_state.json @@ -7,3 +9,7 @@ fns_cli.log dist/ build/ .env + +# Local integration test artifacts — not committed +test_sync_local.py +test_sync_report.log diff --git a/doc/server-protocol.md b/doc/server-protocol.md new file mode 100644 index 0000000..2f4b2db --- /dev/null +++ b/doc/server-protocol.md @@ -0,0 +1,172 @@ +# FastNodeSync 服务端协议与实现分析 + +> 本文档供 Claude Code / 后续 agent 阅读,避免重复发现服务端行为细节。基于 [haierkeys/fast-note-sync-service](https://github.com/haierkeys/fast-note-sync-service) 的 `master` 分支源码分析。 + +## 0. 为什么写这份文档 + +客户端多次在心跳、断连、广播方向上踩坑。根本原因是服务端行为在 README 里没有完整描述,需要看 Go 源码才能理解。本文档把**影响客户端实现的关键行为**记下来,新的维护者不用再从头挖源码。 + +--- + +## 1. 连接 & 鉴权 + +- **Endpoint**: `GET /api/user/sync`,升级为标准 WebSocket (RFC 6455) +- **消息封装**: 文本帧统一格式 `Action|JSON`,用管道符分隔 + - 例:`Authorization|"eyJhbGciOi..."` + - 响应体的 JSON 部分结构见 `docs/ws_api.md` 的 `Res` 定义 +- **鉴权流程**: + 1. 客户端连上后立刻发 `Authorization|` + 2. 服务端校验 JWT → 校验用户存在 → 返回 `Authorization` 响应(含版本信息) + 3. **鉴权成功后服务端才启动 PingLoop**(见 `pkg/app/websocket.go:826`) +- **鉴权失败**:服务端 `WriteClose(1000, []byte("AuthorizationFaild"))`,注意拼写是 "Faild",不是 "Failed" + +--- + +## 2. 心跳:服务端主动 Ping(关键) + +源码:`pkg/app/websocket.go` + +```go +const ( + WSPingInterval = 25 // 服务端每 25 秒发一次 Ping + WSPingWait = 60 // 60 秒收不到任何帧就超时断连 +) +``` + +### 2.1 服务端行为 + +- **服务端主动 `WritePing(nil)`**:鉴权成功后启动的 `PingLoop` 每 25 秒发 Ping 给客户端(`websocket.go:400-429`) +- **Deadline 机制**:`OnOpen` 把连接 deadline 设为 now+60s,每次 `OnPing` / `OnPong` 都续到 now+60s(`websocket.go:942, 996, 1001`) +- **Deadline 过期** → 底层 `gws` 库自动关闭连接 + +### 2.2 客户端该怎么做(踩过多次坑后的结论) + +✅ **靠库自动回 Pong** 保持服务端 deadline 新鲜——Python `websockets` 库默认就会自动响应服务端 Ping,这步不用自己做。 + +❌ **不要做"N 秒没收到业务消息就断开"的 watchdog**:服务端的 Ping/Pong 是 WS 协议层帧,不会传到应用层消息回调。如果按业务消息计算 idle,空闲期永远误判为死连接。 + +✅ **仍要启用我方 ping,但参数要宽松**,用于检测半开连接或代理吞 ping 帧的兜底。不启用就在网络异常时可能永远挂住。 + +✅ **Python 推荐配置**: +```python +websockets.connect( + url, + ping_interval=45, # 每 45s 发 ping(避开服务端的 25s 节奏) + ping_timeout=90, # pong 90s 内不来才视为死连接 +) +``` + +`ping_timeout` 必须远大于"可能的业务写入延迟"。实测在 Zeabur 部署下,大量 chunk 下载期间服务端 gws 的写队列会让 Pong 被阻塞数十秒;timeout 设 30s 或 60s 都会在初始同步期被误触发。90s 是观测过的安全下限。 + +--- + +## 3. 多客户端广播(mac 编辑 → ubuntu 收到) + +源码:`pkg/app/websocket.go:482-564` + `internal/routers/websocket_router/ws_note.go:358` + +服务端对每个**用户 ID** 维护一个 `ConnStorage`(`UserClients` 字段),同一个 token 登录的所有连接都在里面。 + +**关键机制**:当客户端 A 执行 `NoteModify`,服务端会调用: + +```go +c.BroadcastResponse(code.Success.WithData( + dto.NoteSyncModifyMessage{...} +).WithVault(params.Vault), isExcludeSelf, dto.NoteSyncModify) +``` + +→ **主动推送 `NoteSyncModify` 给该用户的所有其他连接**(`isExcludeSelf=true` 排除发起者自己)。 + +这意味着: +- mac 编辑 → 立刻通过 WS 推送到 ubuntu 客户端,**不需要 ubuntu 端主动 poll** +- 如果客户端这一刻没连上(重连中 / 死锁中 / 网络断开),**这条推送会丢**,不会被服务端缓存重发 +- 重连后客户端必须发新的 `NoteSync` 请求(带上 `lastTime`)来拉回这段时间错过的变更 + +覆盖的广播动作: +| 动作 | 广播 action | +|---|---| +| `NoteModify` | `NoteSyncModify` | +| `NoteDelete` | `NoteSyncDelete` | +| `NoteRename` | `NoteSyncRename` | +| `FolderModify` / `FolderDelete` / `FolderRename` | 对应的 `FolderSync*` | +| `FileUpload` / `FileDelete` / `FileRename` | 对应的 `FileSync*` | +| `SettingModify` / `SettingDelete` | 对应的 `SettingSync*` | + +--- + +## 4. 同步协议:先 End,后 Detail + +服务端采用"先返回统计结束消息,再逐条推送详情"的模式(见 `docs/SyncProtocol.md`)。 + +### 4.1 以 NoteSync 为例 + +``` +Client → NoteSync (带 context, vault, lastTime) +Server → NoteSyncEnd (先到: {needModifyCount, needDeleteCount, ...}) +Server → NoteSyncModify (逐条) +Server → NoteSyncDelete (逐条) +... +``` + +### 4.2 context 透传 + +请求里的 `context` 字段(UUID 或时间戳)会原样回传到所有后续消息。客户端可以用 `context` 匹配: +- 区分并发的多个同步请求 +- 对账 `NoteSyncEnd` 的 count 与实际收到的 detail 数量 + +### 4.3 涉及的模块 + +| 模块 | Request | End Type | Detail Types | +|---|---|---|---| +| 笔记 | `NoteSync` | `NoteSyncEnd` | `NoteSyncModify`, `NoteSyncDelete`, `NoteSyncMtime`, `NoteSyncNeedPush` | +| 文件夹 | `FolderSync` | `FolderSyncEnd` | `FolderSyncModify`, `FolderSyncDelete` | +| 设置 | `SettingSync` | `SettingSyncEnd` | `SettingSyncModify`, `SettingSyncDelete`, `SettingSyncMtime`, `SettingSyncNeedUpload` | +| 文件/附件 | `FileSync` | `FileSyncEnd` | `FileSyncUpdate`, `FileSyncDelete`, `FileSyncMtime`, `FileUpload` | + +--- + +## 5. 二进制分块传输 + +附件/大文件用二进制 WebSocket 帧(`gws.OpcodeBinary`)传输,而不是走 Base64 编码的 JSON。 + +- 二进制帧前 2 字节是路由前缀(如 `"00"` 表示下载分块) +- 之后的 42 字节是分块头(sessionId + chunkIndex 等) +- 剩余是 payload +- 客户端解析见 `fns_cli/protocol.py:parse_binary_chunk` + +**注意**:服务端在大量二进制分块传输期间可能导致 Pong 回程被阻塞,这就是为什么 `ping_timeout` 必须宽松(见 §2.2)。 + +--- + +## 6. 服务端代码导航 + +| 场景 | 文件 | +|---|---| +| WS 升级、注册、PingLoop、广播 | `pkg/app/websocket.go` | +| 鉴权、OnOpen/OnClose/OnPing | `pkg/app/websocket.go:690-1050` | +| 笔记路由(Modify/Delete/Rename/Sync) | `internal/routers/websocket_router/ws_note.go` | +| 文件路由 | `internal/routers/websocket_router/ws_file.go` | +| 文件夹路由 | `internal/routers/websocket_router/ws_folder.go` | +| 设置路由 | `internal/routers/websocket_router/ws_setting.go` | +| DTO(消息结构) | `internal/dto/*_dto_ws.go` | +| 官方协议文档 | `docs/SyncProtocol.md`, `docs/ws_api.md` | + +--- + +## 7. 客户端踩过的坑 & 教训 + +| 坑 | 现象 | 根因 | 修法 | +|---|---|---|---| +| Idle watchdog 误触发 | 30~90s 空闲后自动断开重连(issue #9) | 服务端 Ping 在 WS 协议层,不传到业务消息回调;watchdog 按业务消息计 idle 会错杀连接 | 删掉 watchdog,用 websockets 库自带的 ping_interval/ping_timeout | +| ping_timeout 在大量 chunk 下载期间误触发 | `keepalive ping timeout` 1011 错误 | 服务端 gws 写队列被二进制分块撑满,Pong 回程被延迟数十秒 | `ping_timeout=90`(远大于观测到的最坏 Pong 延迟) | +| 服务端推送丢失 | 另一端编辑后没收到 | 本端正好在重连/死锁期间,broadcast 帧丢失 | 重连后的 `_on_reconnect` 回调里重新发起 `NoteSync` / `FileSync` 拉回增量(已实现) | +| Echo push-back 环 | 收到一条 NoteSyncModify 之后 CLI 又把同样内容推回服务端 | 本地写盘触发 watcher;时间窗口版 `ignore_file` 在高负载下 watchdog 事件延迟到窗口外失效 | 内容哈希去重:`_echo_hashes[path]` 记下每次 server 推来的 hash;`push_modify` 前比对,相同即跳过 | +| 在 receive handler 里 `await sleep()` 阻塞 | 初始同步 178 条 modify × 2s = 356s 卡死 | client.py `_handle_text` 是串行 await handler 的,handler 里 sleep 会卡整个消息消费 | handler 不做任何时间等待;echo 抑制改纯同步的哈希查表 | + +--- + +## 8. 后续排查的快捷入口 + +- 想确认服务端是否还在线:看日志有没有 `← Authorization` 回复 +- 想确认为什么推送没到:看日志里最后一次 `← NoteSync*` / `← FileSync*` 时间戳,对照那段时间本端连接状态 +- 想确认心跳是否工作:**客户端层面看不到 Ping/Pong**(被 websockets 库吃掉),只能靠"长时间空闲后是否断开"来反推 +- 服务端 healthz:`GET https://fastnode.zeabur.app/api/health`(如果暴露了) diff --git a/fns_cli/client.py b/fns_cli/client.py index 7d1eb65..13ac9b8 100644 --- a/fns_cli/client.py +++ b/fns_cli/client.py @@ -1,10 +1,9 @@ -"""WebSocket client: connect, authenticate, send/receive, reconnect, heartbeat.""" +"""WebSocket client: connect, authenticate, send/receive, reconnect.""" from __future__ import annotations import asyncio import logging -import time from typing import Any, Callable, Coroutine import websockets @@ -37,7 +36,6 @@ def __init__(self, config: AppConfig) -> None: self._on_reconnect: Callable[[], Coroutine] | None = None self._msg_queue: list[str | bytes] = [] self._ready_event = asyncio.Event() - self._last_received_at: float = 0.0 def on_reconnect(self, handler: Callable[[], Coroutine]) -> None: self._on_reconnect = handler @@ -123,14 +121,26 @@ async def _connect(self) -> None: ) log.info("Connecting to %s", url) + # Heartbeat: the server sends Ping every 25s and closes the connection + # if it sees no frame within 60s (PingWait). The websockets library + # auto-responds to server pings with a pong, so the server's deadline + # is always refreshed as long as the connection is actually alive. + # + # We also send our own pings to detect the reverse case: a half-open + # TCP connection or a proxy that silently drops the server's pings. + # The interval/timeout are generous because the server's gws write + # queue can hold our Pong behind large binary frames during heavy + # chunk transfer — a 30s-or-less timeout false-triggers on initial + # sync. 45s + 90s gives roughly 135s worst-case dead-connection + # detection, which is comfortably past any observed Pong delay while + # still catching genuine network failures. self.ws = await websockets.connect( url, max_size=128 * 1024 * 1024, - ping_interval=None, - ping_timeout=None, + ping_interval=45, + ping_timeout=90, close_timeout=10, ) - self._last_received_at = time.monotonic() log.info("WebSocket connected, sending auth") auth_raw = f"{ACTION_AUTHORIZATION}{SEPARATOR}{self.config.server.token}" @@ -138,33 +148,13 @@ async def _connect(self) -> None: async def _listen(self) -> None: assert self.ws is not None - watchdog = asyncio.create_task(self._inactivity_watchdog()) - try: - async for raw in self.ws: - if isinstance(raw, bytes): - await self._handle_binary(raw) - else: - await self._handle_text(raw) - finally: - watchdog.cancel() - - async def _inactivity_watchdog(self) -> None: - """Close the connection if no message is received for 2 × heartbeat_interval.""" - interval = self.config.client.heartbeat_interval - deadline = interval * 2 - while True: - await asyncio.sleep(interval) - idle = time.monotonic() - self._last_received_at - if idle >= deadline: - log.warning( - "No data received for %.0fs — closing for reconnect", idle - ) - if self.ws: - await self.ws.close() - return + async for raw in self.ws: + if isinstance(raw, bytes): + await self._handle_binary(raw) + else: + await self._handle_text(raw) async def _handle_text(self, raw: str) -> None: - self._last_received_at = time.monotonic() msg = decode_message(raw) log.debug("← %s | %s", msg.action, str(msg.data)[:200]) @@ -182,7 +172,6 @@ async def _handle_text(self, raw: str) -> None: log.debug("Unhandled action: %s", msg.action) async def _handle_binary(self, raw: bytes) -> None: - self._last_received_at = time.monotonic() if self._binary_handler and len(raw) > 42 and raw[:2] == b"00": try: sid, idx, data = parse_binary_chunk(raw[2:]) diff --git a/fns_cli/file_sync.py b/fns_cli/file_sync.py index 93e5500..a000d2d 100644 --- a/fns_cli/file_sync.py +++ b/fns_cli/file_sync.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import logging import os import uuid @@ -31,6 +30,9 @@ log = logging.getLogger("fns_cli.file_sync") +# Sentinel stored in _echo_hashes to mark a just-received delete. +_DELETED = "__deleted__" + def _extract_inner(msg_data: dict) -> dict: """Server wraps payloads as {code, status, message, data: {actual fields}}.""" @@ -68,6 +70,11 @@ def __init__(self, engine: SyncEngine) -> None: self._received_modify = 0 self._received_delete = 0 self._got_end = False + # See NoteSync._echo_hashes — same semantics here. Updated on both + # inbound (server write / chunk finalize / rename / delete) and + # outbound (push_upload / push_delete) so the cache tracks the most + # recently known synced state, not just what the server pushed. + self._echo_hashes: dict[str, str] = {} @property def is_sync_complete(self) -> bool: @@ -111,20 +118,29 @@ async def push_upload(self, rel_path: str) -> None: if not full.exists(): return + hash_ = file_content_hash_binary(full) + if self._echo_hashes.get(rel_path) == hash_: + return + stat = full.stat() msg = WSMessage(ACTION_FILE_UPLOAD_CHECK, { "vault": self.config.server.vault, "path": rel_path, "pathHash": path_hash(rel_path), - "contentHash": file_content_hash_binary(full), + "contentHash": hash_, "size": stat.st_size, "ctime": int(stat.st_ctime * 1000), "mtime": int(stat.st_mtime * 1000), }) log.info("FileUploadCheck → %s (%d bytes)", rel_path, stat.st_size) await self.engine.ws_client.send(msg) + # Record the outbound hash so a later revert-to-previous-content + # still differs from the cache and triggers a real upload. + self._echo_hashes[rel_path] = hash_ async def push_delete(self, rel_path: str) -> None: + if self._echo_hashes.get(rel_path) == _DELETED: + return msg = WSMessage(ACTION_FILE_DELETE, { "vault": self.config.server.vault, "path": rel_path, @@ -132,6 +148,7 @@ async def push_delete(self, rel_path: str) -> None: }) log.info("FileDelete → %s", rel_path) await self.engine.ws_client.send(msg) + self._echo_hashes[rel_path] = _DELETED # ── Server → Client handlers ───────────────────────────────────── @@ -188,7 +205,6 @@ async def _on_sync_update(self, msg: WSMessage) -> None: return full = self.vault_path / rel_path - self.engine.ignore_file(rel_path) try: full.parent.mkdir(parents=True, exist_ok=True) if isinstance(content, str): @@ -200,12 +216,12 @@ async def _on_sync_update(self, msg: WSMessage) -> None: if mtime and full.exists(): ts = mtime / 1000.0 os.utime(full, (ts, ts)) + # Record echo hash after the write is durable on disk. + if full.exists(): + self._echo_hashes[rel_path] = file_content_hash_binary(full) log.info("← FileSyncUpdate: %s", rel_path) except Exception: log.exception("Failed to write file %s", rel_path) - finally: - await asyncio.sleep(0.6) - self.engine.unignore_file(rel_path) self._received_modify += 1 self._check_complete() @@ -224,8 +240,10 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: rel_path: str = data.get("path", "") if not rel_path: return + + self._echo_hashes[rel_path] = _DELETED + full = self.vault_path / rel_path - self.engine.ignore_file(rel_path) try: if full.exists(): full.unlink() @@ -233,9 +251,6 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: self._try_remove_empty_parent(full) except Exception: log.exception("Failed to delete file %s", rel_path) - finally: - await asyncio.sleep(0.6) - self.engine.unignore_file(rel_path) self._received_delete += 1 self._check_complete() @@ -247,21 +262,21 @@ async def _on_sync_rename(self, msg: WSMessage) -> None: if not old_path or not new_path: return + # The rename arrives as a single server message but the watcher will + # observe it as delete(old) + create(new). Prime the echo cache for + # both paths. + self._echo_hashes[old_path] = _DELETED old_full = self.vault_path / old_path new_full = self.vault_path / new_path - self.engine.ignore_file(old_path) - self.engine.ignore_file(new_path) try: if old_full.exists(): new_full.parent.mkdir(parents=True, exist_ok=True) old_full.rename(new_full) + if new_full.exists(): + self._echo_hashes[new_path] = file_content_hash_binary(new_full) log.info("← FileSyncRename: %s → %s", old_path, new_path) except Exception: log.exception("Failed to rename file %s → %s", old_path, new_path) - finally: - await asyncio.sleep(0.6) - self.engine.unignore_file(old_path) - self.engine.unignore_file(new_path) async def _on_sync_mtime(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) @@ -309,19 +324,18 @@ async def _on_binary_chunk(self, session_id: str, chunk_index: int, data: bytes) async def _finalize_download(self, session_id: str, session: _DownloadSession) -> None: rel_path = session.path full = self.vault_path / rel_path - self.engine.ignore_file(rel_path) try: full.parent.mkdir(parents=True, exist_ok=True) with open(full, "wb") as f: for i in range(session.total_chunks): f.write(session.chunks.get(i, b"")) + if full.exists(): + self._echo_hashes[rel_path] = file_content_hash_binary(full) log.info("← Chunked download complete: %s", rel_path) except Exception: log.exception("Failed to write downloaded file %s", rel_path) finally: self._download_sessions.pop(session_id, None) - await asyncio.sleep(0.6) - self.engine.unignore_file(rel_path) async def _on_sync_end(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) diff --git a/fns_cli/note_sync.py b/fns_cli/note_sync.py index 83cc2d5..7e61159 100644 --- a/fns_cli/note_sync.py +++ b/fns_cli/note_sync.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import logging import os import uuid @@ -27,6 +26,9 @@ log = logging.getLogger("fns_cli.note_sync") +# Sentinel stored in _echo_hashes to mark a just-received delete. +_DELETED = "__deleted__" + def _extract_inner(msg_data: dict) -> dict: """Server wraps payloads as {code, status, message, data: {actual fields}}.""" @@ -48,6 +50,24 @@ def __init__(self, engine: SyncEngine) -> None: self._received_modify = 0 self._received_delete = 0 self._got_end = False + # Per-path "last known synced state" — updated on BOTH inbound + # (server → local write) and outbound (local → server push). Value + # is the content hash, or _DELETED sentinel for an absent file. + # + # Push flow: if current hash == cache → skip (echo); else push and + # update cache to the new hash/tombstone. + # Receive flow: after applying the server change to disk, update the + # cache to match the new on-disk state. + # + # Updating on *outbound* is critical for two cases the inbound-only + # cache got wrong: + # * revert: server=A → user edits to B, push → user reverts to A. + # Without outbound update the cache still reads A and the revert + # push is dropped as an echo. + # * tombstone reuse: server deletes → user recreates same path. + # Without outbound update the later local delete is dropped + # because the cache still holds _DELETED. + self._echo_hashes: dict[str, str] = {} @property def is_sync_complete(self) -> bool: @@ -89,7 +109,7 @@ async def request_full_sync(self) -> None: log.info("Requesting full NoteSync with %d local notes", len(notes)) await self.engine.ws_client.send(msg) - async def push_modify(self, rel_path: str) -> None: + async def push_modify(self, rel_path: str, *, force: bool = False) -> None: full = self.vault_path / rel_path if not full.exists(): return @@ -99,20 +119,29 @@ async def push_modify(self, rel_path: str) -> None: log.exception("Failed to read %s", rel_path) return + hash_ = content_hash(text) + if not force and self._echo_hashes.get(rel_path) == hash_: + return + stat = full.stat() msg = WSMessage(ACTION_NOTE_MODIFY, { "vault": self.config.server.vault, "path": rel_path, "pathHash": path_hash(rel_path), "content": text, - "contentHash": content_hash(text), + "contentHash": hash_, "ctime": int(stat.st_ctime * 1000), "mtime": int(stat.st_mtime * 1000), }) log.info("NoteModify → %s", rel_path) await self.engine.ws_client.send(msg) + # Record the outbound hash so an echoed broadcast from the server + # (or a repeated watcher event from our own write) is recognized. + self._echo_hashes[rel_path] = hash_ async def push_delete(self, rel_path: str) -> None: + if self._echo_hashes.get(rel_path) == _DELETED: + return msg = WSMessage(ACTION_NOTE_DELETE, { "vault": self.config.server.vault, "path": rel_path, @@ -120,6 +149,7 @@ async def push_delete(self, rel_path: str) -> None: }) log.info("NoteDelete → %s", rel_path) await self.engine.ws_client.send(msg) + self._echo_hashes[rel_path] = _DELETED async def push_rename(self, new_rel: str, old_rel: str) -> None: await self.push_modify(new_rel) @@ -137,19 +167,17 @@ async def _on_sync_modify(self, msg: WSMessage) -> None: return full = self.vault_path / rel_path - self.engine.ignore_file(rel_path) try: full.parent.mkdir(parents=True, exist_ok=True) full.write_text(content, encoding="utf-8") if mtime: ts = mtime / 1000.0 os.utime(full, (ts, ts)) + # Update the cache only after the new content is durable on disk. + self._echo_hashes[rel_path] = content_hash(content) log.info("← NoteSyncModify: %s", rel_path) except Exception: log.exception("Failed to write %s", rel_path) - finally: - await asyncio.sleep(0.6) - self.engine.unignore_file(rel_path) self._received_modify += 1 self._check_all_received() @@ -159,18 +187,18 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: rel_path: str = data.get("path", "") if not rel_path: return + full = self.vault_path / rel_path - self.engine.ignore_file(rel_path) try: if full.exists(): full.unlink() log.info("← NoteSyncDelete: %s", rel_path) self._try_remove_empty_parent(full) + # Whether the file existed or not, the on-disk state is now + # "absent" if we got here without an exception. + self._echo_hashes[rel_path] = _DELETED except Exception: log.exception("Failed to delete %s", rel_path) - finally: - await asyncio.sleep(0.6) - self.engine.unignore_file(rel_path) self._received_delete += 1 self._check_all_received() @@ -195,7 +223,9 @@ async def _on_sync_need_push(self, msg: WSMessage) -> None: if not rel_path: return log.info("← NoteSyncNeedPush: %s", rel_path) - await self.push_modify(rel_path) + # NeedPush is an explicit server request to re-send the local content; + # it must bypass the normal echo suppression check. + await self.push_modify(rel_path, force=True) async def _on_sync_end(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) diff --git a/fns_cli/watcher.py b/fns_cli/watcher.py index e9cca16..f67e19d 100644 --- a/fns_cli/watcher.py +++ b/fns_cli/watcher.py @@ -73,6 +73,11 @@ def on_modified(self, event): self._schedule(f"mod:{rel}", lambda: self.engine.on_local_change(rel)) def on_deleted(self, event): + # Directory delete: watchdog on most platforms already fires per-file + # delete events for children before the directory event, so we only + # need to process individual files here. If a platform skips those + # child events we cannot reconstruct them (the files are already + # gone), so there is nothing more we can do at this layer. if event.is_directory: return try: @@ -85,21 +90,70 @@ def on_deleted(self, event): def on_moved(self, event): if event.is_directory: + # On Windows (and commonly on macOS) a directory rename is a + # single atomic event — watchdog does NOT emit child events — + # so if we ignore it the server never learns about the rename. + # Enumerate the new path's files and emit a rename for each, + # computing each old path by swapping the renamed prefix. + self._handle_directory_move(event) return try: old_rel = self._rel(event.src_path) new_rel = self._rel(event.dest_path) except ValueError: return - if ( - self.engine.is_ignored(old_rel) - or self.engine.is_ignored(new_rel) - or self.engine.is_excluded(new_rel) - ): + self._schedule_move_transition(old_rel, new_rel) + + def _handle_directory_move(self, event) -> None: + """Enumerate a renamed directory's children and schedule per-file renames.""" + new_dir = Path(event.dest_path) + try: + old_dir_rel = self._rel(event.src_path) + new_dir_rel = self._rel(event.dest_path) + except ValueError: return + if not new_dir.exists(): + return + for child in new_dir.rglob("*"): + if not child.is_file(): + continue + try: + new_rel = self._rel(str(child)) + except ValueError: + continue + # child sits under new_dir_rel/...; compute old_rel by replacing + # the directory prefix. + if not new_rel.startswith(new_dir_rel + "/"): + continue + tail = new_rel[len(new_dir_rel) + 1:] + old_rel = f"{old_dir_rel}/{tail}" if old_dir_rel else tail + self._schedule_move_transition(old_rel, new_rel) + + def _schedule_move_transition(self, old_rel: str, new_rel: str) -> None: + if self.engine.is_ignored(old_rel) or self.engine.is_ignored(new_rel): + return + + old_excluded = self.engine.is_excluded(old_rel) + new_excluded = self.engine.is_excluded(new_rel) + + if old_excluded and new_excluded: + return + if not old_excluded and new_excluded: + self._schedule( + f"del:{old_rel}", + lambda o=old_rel: self.engine.on_local_delete(o), + ) + return + if old_excluded and not new_excluded: + self._schedule( + f"mod:{new_rel}", + lambda n=new_rel: self.engine.on_local_change(n), + ) + return + self._schedule( f"mv:{old_rel}:{new_rel}", - lambda: self.engine.on_local_rename(new_rel, old_rel), + lambda o=old_rel, n=new_rel: self.engine.on_local_rename(n, o), ) diff --git a/tests/test_echo_cache.py b/tests/test_echo_cache.py new file mode 100644 index 0000000..f89c3a0 --- /dev/null +++ b/tests/test_echo_cache.py @@ -0,0 +1,258 @@ +"""Regression tests for the content-hash echo cache. + +The cache must track "this path's most recently announced state" — updated on +both inbound (server → local) and outbound (local → server) transitions — +so legitimate user edits that happen to *restore* a previous value still get +pushed. + +Two scenarios cover the bugs that shipped in the inbound-only version: + + 1. receive A → push B → revert to A + Inbound-only cache would still read A after step 2, so the A revert in + step 3 would be treated as an echo and dropped. + + 2. receive delete → recreate → delete again + Inbound-only tombstone would persist, so the final delete would be + dropped. +""" + +from __future__ import annotations + +import asyncio +import tempfile +import unittest +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +from fns_cli.file_sync import FileSync +from fns_cli.note_sync import NoteSync +from fns_cli.protocol import ( + ACTION_FILE_SYNC_DELETE, + ACTION_FILE_SYNC_UPDATE, + ACTION_NOTE_SYNC_DELETE, + ACTION_NOTE_SYNC_MODIFY, + ACTION_NOTE_SYNC_NEED_PUSH, + WSMessage, +) + + +def _make_engine(vault_path: Path) -> MagicMock: + config = MagicMock() + config.server.vault = "test-vault" + config.sync.file_chunk_size = 1024 * 1024 + + state = MagicMock() + state.last_note_sync_time = 0 + state.last_file_sync_time = 0 + + ws = MagicMock() + ws.send = AsyncMock() + + engine = MagicMock() + engine.config = config + engine.vault_path = vault_path + engine.state = state + engine.ws_client = ws + engine.ignore_file = MagicMock() + engine.unignore_file = MagicMock() + engine.is_excluded = MagicMock(return_value=False) + return engine + + +def _wrap(action: str, inner: dict) -> WSMessage: + return WSMessage(action, {"data": inner}) + + +def _sent_actions(engine: MagicMock) -> list[str]: + """Extract the Action name of each WSMessage passed to ws.send.""" + return [call.args[0].action for call in engine.ws_client.send.await_args_list] + + +class TestNoteEchoCache(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.vault = Path(self._tmp.name) + self.engine = _make_engine(self.vault) + self.ns = NoteSync(self.engine) + + async def asyncTearDown(self): + self._tmp.cleanup() + + async def _inbound_modify(self, rel: str, content: str) -> None: + msg = _wrap(ACTION_NOTE_SYNC_MODIFY, { + "path": rel, "content": content, "mtime": 0, + }) + await self.ns._on_sync_modify(msg) + + async def _inbound_delete(self, rel: str) -> None: + msg = _wrap(ACTION_NOTE_SYNC_DELETE, {"path": rel}) + await self.ns._on_sync_delete(msg) + + async def test_revert_after_push_still_pushes(self): + """receive A → push B → revert to A: the revert must be pushed.""" + rel = "note.md" + target = self.vault / rel + + # 1. Server hands us A. + await self._inbound_modify(rel, "A") + self.assertEqual(target.read_text(encoding="utf-8"), "A") + + # 2. User edits locally to B and we push. + target.write_text("B", encoding="utf-8") + await self.ns.push_modify(rel) + + # 3. User reverts to A. The cache must NOT suppress this. + target.write_text("A", encoding="utf-8") + await self.ns.push_modify(rel) + + actions = _sent_actions(self.engine) + self.assertEqual( + actions.count("NoteModify"), 2, + f"expected two NoteModify pushes (B and A), got {actions}", + ) + + async def test_delete_after_recreate_still_pushes(self): + """receive delete → local recreate → local delete: last delete must be pushed.""" + rel = "note.md" + target = self.vault / rel + target.write_text("initial", encoding="utf-8") + + # 1. Server tells us to delete. + await self._inbound_delete(rel) + self.assertFalse(target.exists()) + + # 2. User recreates and we push. + target.write_text("fresh", encoding="utf-8") + await self.ns.push_modify(rel) + + # 3. User deletes again locally. + target.unlink() + await self.ns.push_delete(rel) + + actions = _sent_actions(self.engine) + self.assertIn("NoteModify", actions) + self.assertEqual( + actions.count("NoteDelete"), 1, + f"expected the final delete to be pushed, got {actions}", + ) + + async def test_need_push_bypasses_echo_suppression(self): + """NeedPush is an explicit server request and must force a re-send.""" + rel = "note.md" + + await self._inbound_modify(rel, "A") + await self.ns._on_sync_need_push( + _wrap(ACTION_NOTE_SYNC_NEED_PUSH, {"path": rel}) + ) + + actions = _sent_actions(self.engine) + self.assertEqual( + actions.count("NoteModify"), 1, + f"expected NeedPush to force a NoteModify, got {actions}", + ) + + async def test_failed_inbound_modify_does_not_poison_cache(self): + """A failed server write must not suppress the next real local push.""" + rel = "note.md" + target = self.vault / rel + + with patch.object(Path, "write_text", side_effect=OSError("boom")): + await self._inbound_modify(rel, "A") + + self.assertNotIn(rel, self.ns._echo_hashes) + + target.write_text("A", encoding="utf-8") + await self.ns.push_modify(rel) + + actions = _sent_actions(self.engine) + self.assertEqual( + actions.count("NoteModify"), 1, + f"expected local push after failed inbound write, got {actions}", + ) + + async def test_failed_inbound_delete_does_not_poison_cache(self): + """A failed server delete must not suppress a later real local delete.""" + rel = "note.md" + target = self.vault / rel + target.write_text("A", encoding="utf-8") + + with patch.object(Path, "unlink", side_effect=OSError("boom")): + await self._inbound_delete(rel) + + self.assertNotEqual(self.ns._echo_hashes.get(rel), "_DELETED") + + target.unlink() + await self.ns.push_delete(rel) + + actions = _sent_actions(self.engine) + self.assertEqual( + actions.count("NoteDelete"), 1, + f"expected local delete after failed inbound delete, got {actions}", + ) + + +class TestFileEchoCache(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.vault = Path(self._tmp.name) + self.engine = _make_engine(self.vault) + self.fs = FileSync(self.engine) + + async def asyncTearDown(self): + self._tmp.cleanup() + + async def _inbound_update_text(self, rel: str, content: str) -> None: + msg = _wrap(ACTION_FILE_SYNC_UPDATE, { + "path": rel, "content": content, "mtime": 0, + }) + await self.fs._on_sync_update(msg) + + async def _inbound_delete(self, rel: str) -> None: + msg = _wrap(ACTION_FILE_SYNC_DELETE, {"path": rel}) + await self.fs._on_sync_delete(msg) + + async def test_revert_after_push_still_pushes(self): + rel = "config.json" + target = self.vault / rel + + await self._inbound_update_text(rel, "A") + self.assertEqual(target.read_text(encoding="utf-8"), "A") + + target.write_text("B", encoding="utf-8") + await self.fs.push_upload(rel) + + target.write_text("A", encoding="utf-8") + await self.fs.push_upload(rel) + + actions = _sent_actions(self.engine) + self.assertEqual( + actions.count("FileUploadCheck"), 2, + f"expected two FileUploadCheck pushes, got {actions}", + ) + + async def test_delete_after_recreate_still_pushes(self): + rel = "pic.png" + target = self.vault / rel + target.write_bytes(b"initial") + + await self._inbound_delete(rel) + self.assertFalse(target.exists()) + + target.write_bytes(b"fresh") + await self.fs.push_upload(rel) + + target.unlink() + await self.fs.push_delete(rel) + + actions = _sent_actions(self.engine) + self.assertIn("FileUploadCheck", actions) + self.assertEqual( + actions.count("FileDelete"), 1, + f"expected the final delete to be pushed, got {actions}", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_file_sync.py b/tests/test_file_sync.py index cefccca..c32ddcc 100644 --- a/tests/test_file_sync.py +++ b/tests/test_file_sync.py @@ -179,11 +179,18 @@ async def test_unexpected_content_type_does_not_stall(self): class TestFileSyncWatcherIgnore(unittest.TestCase): """Watcher on_moved must honour is_ignored to avoid echo-backs.""" - def _make_handler(self, vault: Path, ignored: set[str]): + def _make_handler( + self, + vault: Path, + ignored: set[str], + excluded: set[str] | None = None, + ): from fns_cli.watcher import _VaultEventHandler engine = _make_engine(vault) engine.is_ignored = lambda rel: rel in ignored + excluded = excluded or set() + engine.is_excluded = lambda rel: rel in excluded loop = asyncio.new_event_loop() handler = _VaultEventHandler(engine, loop) return handler, engine, loop @@ -229,6 +236,52 @@ def test_move_not_ignored_is_scheduled(self): handler._pending["mv:old.png:new.png"].cancel() loop.close() + def test_move_into_excluded_schedules_delete(self): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + handler, engine, loop = self._make_handler( + vault, + set(), + {"trash/new.png"}, + ) + + from watchdog.events import FileMovedEvent + ev = FileMovedEvent( + str(vault / "old.png"), + str(vault / "trash" / "new.png"), + ) + handler.on_moved(ev) + + self.assertIn("del:old.png", handler._pending) + handler._pending["del:old.png"].cancel() + loop.close() + + def test_directory_move_into_excluded_schedules_child_deletes(self): + with tempfile.TemporaryDirectory() as tmp: + vault = Path(tmp) + (vault / "trash" / "renamed").mkdir(parents=True) + (vault / "trash" / "renamed" / "a.txt").write_text("a", encoding="utf-8") + (vault / "trash" / "renamed" / "b.txt").write_text("b", encoding="utf-8") + + handler, engine, loop = self._make_handler( + vault, + set(), + {"trash/renamed/a.txt", "trash/renamed/b.txt"}, + ) + + from watchdog.events import DirMovedEvent + ev = DirMovedEvent( + str(vault / "old"), + str(vault / "trash" / "renamed"), + ) + handler.on_moved(ev) + + self.assertIn("del:old/a.txt", handler._pending) + self.assertIn("del:old/b.txt", handler._pending) + handler._pending["del:old/a.txt"].cancel() + handler._pending["del:old/b.txt"].cancel() + loop.close() + if __name__ == "__main__": unittest.main()