Fix realtime sync startup and directory deletes#11
Conversation
Reviewer's GuideImproves realtime sync robustness by deferring lastTime commits until all details/downloads are complete, adding upload worker concurrency limits, enhancing watcher behavior for directory and cross-boundary moves/deletes, introducing folder-level sync handlers, and making WS client reconnect and send behavior non-blocking and serialized. Sequence diagram for file upload queue and workerssequenceDiagram
participant Server
participant Client as WSClient
participant FileSync
participant UploadQueue
participant Worker as UploadWorker
Server->>Client: ACTION_FILE_UPLOAD (FileUpload)
Client->>FileSync: _on_upload_session(msg)
FileSync->>FileSync: _ensure_upload_workers()
FileSync->>UploadQueue: put(session_id, chunk_size, rel_path, full, completion)
FileSync->>Worker: start _upload_queue_worker()
FileSync->>FileSync: _await_upload_completion(completion)
loop per queued upload
Worker->>UploadQueue: get()
Worker->>FileSync: _upload_session_worker(session_id, chunk_size, rel_path, full)
FileSync->>FileSync: read_bytes(full)
loop for each chunk
FileSync->>Client: send_bytes(binary_chunk)
end
Worker-->>FileSync: completion result
end
Server-->>Client: ACTION_FILE_UPLOAD_ACK (FileUploadAck)
Client->>FileSync: _on_upload_ack(msg)
FileSync->>FileSync: log ack and continue
Sequence diagram for initial file sync with stall detection and lastTime commitsequenceDiagram
participant Server
participant Client as WSClient
participant FileSync
participant Engine as SyncEngine
participant Watcher
Engine->>FileSync: request_sync()
FileSync->>Client: send FileSyncRequest
Server-->>Client: FileSyncUpdate / FileSyncDelete / FileSyncChunkDownload
Client->>FileSync: _on_sync_update / _on_sync_delete / _on_chunk_download_start
FileSync->>FileSync: _mark_sync_activity()
FileSync->>FileSync: track _download_sessions and _pending_download_paths
Server-->>Client: FileSyncEnd(lastTime, counts)
Client->>FileSync: _on_sync_end(msg)
FileSync->>FileSync: store _pending_last_time
FileSync->>FileSync: set expected counts
FileSync->>FileSync: _check_complete()
alt all details and downloads complete
FileSync->>FileSync: _check_complete()
FileSync->>FileSync: _commit_last_time()
FileSync->>Engine: is_sync_complete = True
else still pending downloads or missing details
loop until complete or stalled
Engine->>FileSync: is_sync_complete?
Engine->>FileSync: is_stalled(stale_seconds=5)?
end
alt stalled before completion
Engine->>Engine: log stall warning
end
end
Engine->>Watcher: start observer and enable realtime sync
Sequence diagram for watcher handling cross-boundary moves and file trackingsequenceDiagram
participant FS as Filesystem
participant Watcher
participant Engine as SyncEngine
FS-->>Watcher: on_created(event)
Watcher->>Watcher: _rel(event.src_path)
Watcher->>Watcher: _track_file(rel)
Watcher->>Engine: schedule on_local_change(rel)
FS-->>Watcher: on_deleted(event)
alt file delete
Watcher->>Watcher: _rel(event.src_path)
Watcher->>Watcher: _untrack_file(rel)
Watcher->>Engine: schedule on_local_delete(rel)
else directory delete
Watcher->>Watcher: _handle_directory_delete(event)
Watcher->>Watcher: enumerate victims from _known_files
loop each victim
Watcher->>Watcher: _untrack_file(rel)
Watcher->>Engine: schedule on_local_delete(rel)
end
end
FS-->>Watcher: on_moved(event)
Watcher->>Watcher: old_rel = _rel_or_none(src_path)
Watcher->>Watcher: new_rel = _rel_or_none(dest_path)
alt old_rel None, new_rel not None
Watcher->>Watcher: _track_file(new_rel)
Watcher->>Engine: schedule on_local_change(new_rel)
else new_rel None, old_rel not None
Watcher->>Watcher: _untrack_file(old_rel)
Watcher->>Engine: schedule on_local_delete(old_rel)
else both not None and are files
Watcher->>Watcher: _schedule_move_transition(old_rel, new_rel)
Watcher->>Watcher: update _known_files
Watcher->>Engine: schedule on_local_rename(new_rel, old_rel)
end
Updated class diagram for sync and client componentsclassDiagram
class SyncEngine {
+vault_path: Path
+config: AppConfig
+state: SyncState
+note_sync: NoteSync
+file_sync: FileSync
+folder_sync: FolderSync
+ws_client: Client
+on_local_change(rel_path: str) async
+on_local_delete(rel_path: str) async
+on_local_rename(new_rel: str, old_rel: str) async
+run() async
+_wait_file_sync(timeout: float) async
}
class SyncConfig {
+sync_notes: bool
+sync_files: bool
+sync_config: bool
+upload_concurrency: int
+exclude_patterns: list~str~
}
class FileSync {
+engine: SyncEngine
+vault_path: Path
+_sync_complete: bool
+_download_sessions: dict~str, _DownloadSession~
+_pending_download_paths: set~str~
+_expected_modify: int
+_expected_delete: int
+_received_modify: int
+_received_delete: int
+_got_end: bool
+_pending_last_time: int
+_last_sync_activity_monotonic: float
+_upload_tasks: set~Task~
+_upload_worker_count: int
+_upload_workers: set~Task~
+_upload_queue: asyncio.Queue
+register_handlers() void
+request_sync() async
+is_sync_complete: bool
+is_stalled(stale_seconds: float) bool
+_on_upload_session(msg: WSMessage) async
+_on_upload_ack(msg: WSMessage) async
+_upload_queue_worker() async
+_upload_session_worker(session_id: str, chunk_size: int, rel_path: str, full: Path) async
+_on_sync_update(msg: WSMessage) async
+_on_sync_delete(msg: WSMessage) async
+_on_sync_rename(msg: WSMessage) async
+_on_sync_mtime(msg: WSMessage) async
+_on_chunk_download_start(msg: WSMessage) async
+_on_binary_chunk(session_id: str, chunk_index: int, data: bytes) async
+_finalize_download(session_id: str, session: _DownloadSession) async
+_finalize_empty_download(rel_path: str) async
+_on_sync_end(msg: WSMessage) async
+_check_complete() void
+_commit_last_time() void
}
class NoteSync {
+engine: SyncEngine
+vault_path: Path
+_sync_complete: bool
+_expected_modify: int
+_expected_delete: int
+_received_modify: int
+_received_delete: int
+_got_end: bool
+_pending_last_time: int
+_echo_hashes: dict~str, str~
+register_handlers() void
+request_sync() async
+_on_sync_modify(msg: WSMessage) async
+_on_sync_delete(msg: WSMessage) async
+_on_sync_rename(msg: WSMessage) async
+_on_sync_mtime(msg: WSMessage) async
+_on_sync_need_push(msg: WSMessage) async
+_on_sync_end(msg: WSMessage) async
+_reset_counters() void
+_check_all_received() void
+_commit_last_time() void
}
class FolderSync {
+engine: SyncEngine
+vault_path: Path
+register_handlers() void
+_on_sync_modify(msg: WSMessage) async
+_on_sync_delete(msg: WSMessage) async
+_on_sync_rename(msg: WSMessage) async
}
class Watcher {
+engine: SyncEngine
+loop: asyncio.AbstractEventLoop
+_pending: dict~str, TimerHandle~
+_known_files: set~str~
+on_created(event) void
+on_modified(event) void
+on_deleted(event) void
+on_moved(event) void
+_seed_known_files() void
+_track_file(rel_path: str) void
+_untrack_file(rel_path: str) void
+_handle_directory_delete(event) void
+_handle_directory_move(event) void
+_schedule_move_transition(old_rel: str, new_rel: str) void
}
class Client {
+config: AppConfig
+ws: WebSocketClientProtocol
+_handlers: dict~str, Callable~
+_binary_handler: Callable
+_on_reconnect: Callable
+_reconnect_task: asyncio.Task
+_msg_queue: list~str|bytes~
+_ready_event: asyncio.Event
+_send_lock: asyncio.Lock
+connect() async
+send_json(data: dict) async
+send_bytes(data: bytes) async
+_raw_send(data: str|bytes) async
+_flush_queue() async
+_on_auth_response(msg: WSMessage) async
+_run_reconnect_handler() async
+wait_ready(timeout: float) async bool
+close() async
}
SyncEngine --> SyncConfig : uses
SyncEngine *-- FileSync
SyncEngine *-- NoteSync
SyncEngine *-- FolderSync
SyncEngine *-- Watcher
SyncEngine *-- Client
FileSync ..> WSMessage
NoteSync ..> WSMessage
FolderSync ..> WSMessage
Watcher ..> SyncEngine
Client ..> WSMessage
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The new FileSync upload worker pool (
_upload_queue_worker+_upload_workersand_upload_tasks) runs indefinitely and is never shut down or drained on client shutdown or reconnect; consider adding lifecycle management (e.g. cancellation/awaiting on close or reconnect) to avoid leaking background tasks tied to an old connection. - In
FolderSync._on_sync_rename, theelsebranch for a missingold_fullcallsnew_full.parent.mkdir(parents=True, exist_ok=True)and thennew_full.mkdir(parents=True, exist_ok=True), which redundantly recreates the parent chain and may be clearer and safer if the second call is justnew_full.mkdir(exist_ok=True).
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new FileSync upload worker pool (`_upload_queue_worker` + `_upload_workers` and `_upload_tasks`) runs indefinitely and is never shut down or drained on client shutdown or reconnect; consider adding lifecycle management (e.g. cancellation/awaiting on close or reconnect) to avoid leaking background tasks tied to an old connection.
- In `FolderSync._on_sync_rename`, the `else` branch for a missing `old_full` calls `new_full.parent.mkdir(parents=True, exist_ok=True)` and then `new_full.mkdir(parents=True, exist_ok=True)`, which redundantly recreates the parent chain and may be clearer and safer if the second call is just `new_full.mkdir(exist_ok=True)`.
## Individual Comments
### Comment 1
<location path="fns_cli/file_sync.py" line_range="80-83" />
<code_context>
+ self._pending_last_time = 0
+ self._last_sync_activity_monotonic = time.monotonic()
+ self._upload_tasks: set[asyncio.Task] = set()
+ upload_concurrency = getattr(self.config.sync, "upload_concurrency", 2)
+ if not isinstance(upload_concurrency, int) or upload_concurrency < 1:
+ upload_concurrency = 2
+ self._upload_worker_count = upload_concurrency
+ self._upload_workers: set[asyncio.Task] = set()
+ self._upload_queue: asyncio.Queue = asyncio.Queue()
</code_context>
<issue_to_address>
**suggestion (performance):** Consider capping upload_concurrency to avoid unbounded worker creation on pathological configs.
Since `upload_concurrency` is only validated as ≥1, a very large value (e.g. thousands) will create that many long-lived workers. Please clamp this to a sane upper bound (e.g. a small fixed max or something based on CPU count), and consider enforcing the same limit when reading it in `load_config`, to avoid resource exhaustion from a bad config value.
Suggested implementation:
```python
import asyncio
import os
```
```python
self._pending_last_time = 0
self._last_sync_activity_monotonic = time.monotonic()
self._upload_tasks: set[asyncio.Task] = set()
# Determine desired upload concurrency from config, with validation and capping.
upload_concurrency = getattr(self.config.sync, "upload_concurrency", 2)
if not isinstance(upload_concurrency, int) or upload_concurrency < 1:
upload_concurrency = 2
# Cap concurrency to avoid unbounded worker creation on pathological configs.
# Use a small multiple of CPU count as a sane upper bound, with a fallback default.
cpu_count = os.cpu_count() or 4
max_upload_concurrency = max(2, cpu_count * 4)
upload_concurrency = min(upload_concurrency, max_upload_concurrency)
self._upload_worker_count = upload_concurrency
self._upload_workers: set[asyncio.Task] = set()
self._upload_queue: asyncio.Queue = asyncio.Queue()
```
To fully implement your suggestion:
1. In the config-loading path (likely `load_config` or equivalent), apply the same validation and capping logic to `sync.upload_concurrency`, so an excessively large value is never accepted into the config object in the first place.
2. Consider centralizing the cap logic (e.g. a helper function like `get_capped_upload_concurrency(config_value: int | None) -> int`) to avoid divergence between config validation and runtime usage.
</issue_to_address>
### Comment 2
<location path="fns_cli/file_sync.py" line_range="220-225" />
<code_context>
+ async def _await_upload_completion(self, completion: asyncio.Future) -> None:
+ await completion
+
+ async def _upload_queue_worker(self) -> None:
+ while True:
+ session_id, chunk_size, rel_path, full, completion = await self._upload_queue.get()
+ try:
+ await self._upload_session_worker(session_id, chunk_size, rel_path, full)
+ except Exception as exc:
+ if not completion.done():
+ completion.set_exception(exc)
+ else:
+ if not completion.done():
+ completion.set_result(None)
+ finally:
+ self._upload_queue.task_done()
+
+ async def _upload_session_worker(
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Upload worker error handling surfaces the exception but drops context and logs nothing.
In `_upload_queue_worker`, exceptions from `_upload_session_worker` are only passed into `completion` and never logged. If the caller doesn’t log when awaiting `completion` (or never awaits it), failures may be effectively invisible and lack context like `rel_path`/`session_id`. Consider logging a short error with those identifiers inside the `except` so upload failures are traceable even when the awaiter doesn’t log them.
```suggestion
try:
await self._upload_session_worker(session_id, chunk_size, rel_path, full)
except Exception as exc:
log.exception(
"Upload failed for %s (sessionId=%s, chunkSize=%d)",
rel_path,
session_id[:8],
chunk_size,
)
if not completion.done():
completion.set_exception(exc)
else:
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| upload_concurrency = getattr(self.config.sync, "upload_concurrency", 2) | ||
| if not isinstance(upload_concurrency, int) or upload_concurrency < 1: | ||
| upload_concurrency = 2 | ||
| self._upload_worker_count = upload_concurrency |
There was a problem hiding this comment.
suggestion (performance): Consider capping upload_concurrency to avoid unbounded worker creation on pathological configs.
Since upload_concurrency is only validated as ≥1, a very large value (e.g. thousands) will create that many long-lived workers. Please clamp this to a sane upper bound (e.g. a small fixed max or something based on CPU count), and consider enforcing the same limit when reading it in load_config, to avoid resource exhaustion from a bad config value.
Suggested implementation:
import asyncio
import os self._pending_last_time = 0
self._last_sync_activity_monotonic = time.monotonic()
self._upload_tasks: set[asyncio.Task] = set()
# Determine desired upload concurrency from config, with validation and capping.
upload_concurrency = getattr(self.config.sync, "upload_concurrency", 2)
if not isinstance(upload_concurrency, int) or upload_concurrency < 1:
upload_concurrency = 2
# Cap concurrency to avoid unbounded worker creation on pathological configs.
# Use a small multiple of CPU count as a sane upper bound, with a fallback default.
cpu_count = os.cpu_count() or 4
max_upload_concurrency = max(2, cpu_count * 4)
upload_concurrency = min(upload_concurrency, max_upload_concurrency)
self._upload_worker_count = upload_concurrency
self._upload_workers: set[asyncio.Task] = set()
self._upload_queue: asyncio.Queue = asyncio.Queue()To fully implement your suggestion:
- In the config-loading path (likely
load_configor equivalent), apply the same validation and capping logic tosync.upload_concurrency, so an excessively large value is never accepted into the config object in the first place. - Consider centralizing the cap logic (e.g. a helper function like
get_capped_upload_concurrency(config_value: int | None) -> int) to avoid divergence between config validation and runtime usage.
| try: | ||
| await self._upload_session_worker(session_id, chunk_size, rel_path, full) | ||
| except Exception as exc: | ||
| if not completion.done(): | ||
| completion.set_exception(exc) | ||
| else: |
There was a problem hiding this comment.
suggestion (bug_risk): Upload worker error handling surfaces the exception but drops context and logs nothing.
In _upload_queue_worker, exceptions from _upload_session_worker are only passed into completion and never logged. If the caller doesn’t log when awaiting completion (or never awaits it), failures may be effectively invisible and lack context like rel_path/session_id. Consider logging a short error with those identifiers inside the except so upload failures are traceable even when the awaiter doesn’t log them.
| try: | |
| await self._upload_session_worker(session_id, chunk_size, rel_path, full) | |
| except Exception as exc: | |
| if not completion.done(): | |
| completion.set_exception(exc) | |
| else: | |
| try: | |
| await self._upload_session_worker(session_id, chunk_size, rel_path, full) | |
| except Exception as exc: | |
| log.exception( | |
| "Upload failed for %s (sessionId=%s, chunkSize=%d)", | |
| rel_path, | |
| session_id[:8], | |
| chunk_size, | |
| ) | |
| if not completion.done(): | |
| completion.set_exception(exc) | |
| else: |
Summary
Testing
Summary by Sourcery
Improve realtime sync robustness and responsiveness across files, notes, folders, and the WebSocket client.
New Features:
Bug Fixes:
Enhancements:
Tests: