refactor: Support dynamic pool sizing in WebSocket ConnectionManager#2797
Conversation
…a env vars Implements a ConnectionManager that reads pool bounds from environment variables (WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE, WS_POOL_CLEANUP_INTERVAL) with type-safe defaults. Supports register/unregister/broadcast/cleanup. Closes ritesh-1918#2631
|
Someone is attempting to deploy a commit to the ritesh Team on Vercel. A member of the Team first needs to authorize it. |
📝 WalkthroughWalkthroughA new ChangesWebSocket Connection Manager with Per-Company Pooling
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/services/websocket_manager.py`:
- Around line 33-38: The from_env classmethod in PoolConfig currently trusts raw
env values and can crash or produce invalid configs; update PoolConfig.from_env
to validate and normalize the three env-derived fields (min_size, max_size,
cleanup_interval): parse integers safely with a fallback/default when parsing
fails, enforce min_size >= 0, cleanup_interval > 0 (replace non-positive with a
sane default like 300), and ensure min_size <= max_size (swap or clamp values if
necessary). Use the existing symbol names (from_env, PoolConfig,
WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE, WS_POOL_CLEANUP_INTERVAL) so callers remain
unchanged and add minimal logging or warnings when fallbacks are used.
- Around line 77-78: get_connections currently returns the live mutable set from
self._connections, letting callers mutate manager state; change it to return an
immutable snapshot (e.g., a frozenset) or a shallow copy of the set so callers
cannot alter internal state. Update the get_connections method to fetch the set
via self._connections.get(company_id, set()) and return a snapshot
(frozenset(...) or set(...)) instead of the original object, preserving the
original key and value types but preventing external mutation of the manager's
_connections.
- Around line 64-76: The _active_count is changed unconditionally in
register/unregister causing drift; update register(self, company_id, connection)
to only increment _active_count when the connection was not already present in
self._connections[company_id] (i.e., check membership before add), and update
unregister(self, company_id, connection) to only decrement _active_count when
discard actually removed an existing connection (i.e., check membership before
discard or detect removal), keeping _connections and _active_count consistent;
refer to the methods register and unregister and the attributes _connections and
_active_count when making the change.
- Around line 110-123: The cleanup loop lifecycle is not idempotent and
cancellation-unsafe: modify start_cleanup_loop to only create the background
task if self._cleanup_task is None or finished (check task.done()), and in
stop_cleanup_loop cancel and then await the task completion (wrap await in
try/except asyncio.CancelledError and optionally asyncio.TimeoutError) before
setting self._cleanup_task = None; ensure you handle Race conditions by checking
task existence again after awaiting and log appropriately — update the methods
start_cleanup_loop and stop_cleanup_loop accordingly (use self._cleanup_task,
_loop coroutine, and asyncio.create_task references).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7e3b97cd-2d50-4a91-8b8d-4957a4bac3cd
📒 Files selected for processing (1)
backend/services/websocket_manager.py
| def from_env(cls) -> "PoolConfig": | ||
| return cls( | ||
| min_size=int(os.getenv("WS_POOL_MIN_SIZE", "10")), | ||
| max_size=int(os.getenv("WS_POOL_MAX_SIZE", "100")), | ||
| cleanup_interval=int(os.getenv("WS_POOL_CLEANUP_INTERVAL", "300")), | ||
| ) |
There was a problem hiding this comment.
Validate and normalize environment pool settings before using them.
from_env() currently trusts raw env values. Non-integer values can crash initialization, and non-positive bounds (especially cleanup interval) can lead to invalid runtime behavior.
💡 Suggested fix
@@
`@classmethod`
def from_env(cls) -> "PoolConfig":
- return cls(
- min_size=int(os.getenv("WS_POOL_MIN_SIZE", "10")),
- max_size=int(os.getenv("WS_POOL_MAX_SIZE", "100")),
- cleanup_interval=int(os.getenv("WS_POOL_CLEANUP_INTERVAL", "300")),
- )
+ def _parse_int(name: str, default: int) -> int:
+ raw = os.getenv(name, str(default))
+ try:
+ return int(raw)
+ except ValueError:
+ logger.warning("Invalid %s=%r; using default=%d", name, raw, default)
+ return default
+
+ min_size = max(1, _parse_int("WS_POOL_MIN_SIZE", 10))
+ max_size = max(1, _parse_int("WS_POOL_MAX_SIZE", 100))
+ cleanup_interval = max(1, _parse_int("WS_POOL_CLEANUP_INTERVAL", 300))
+
+ if min_size > max_size:
+ logger.warning(
+ "WS_POOL_MIN_SIZE (%d) > WS_POOL_MAX_SIZE (%d); clamping min_size",
+ min_size, max_size
+ )
+ min_size = max_size
+
+ return cls(min_size=min_size, max_size=max_size, cleanup_interval=cleanup_interval)Also applies to: 113-113
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/services/websocket_manager.py` around lines 33 - 38, The from_env
classmethod in PoolConfig currently trusts raw env values and can crash or
produce invalid configs; update PoolConfig.from_env to validate and normalize
the three env-derived fields (min_size, max_size, cleanup_interval): parse
integers safely with a fallback/default when parsing fails, enforce min_size >=
0, cleanup_interval > 0 (replace non-positive with a sane default like 300), and
ensure min_size <= max_size (swap or clamp values if necessary). Use the
existing symbol names (from_env, PoolConfig, WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE,
WS_POOL_CLEANUP_INTERVAL) so callers remain unchanged and add minimal logging or
warnings when fallbacks are used.
| if company_id not in self._connections: | ||
| self._connections[company_id] = set() | ||
| self._connections[company_id].add(connection) | ||
| self._active_count += 1 | ||
| return True | ||
|
|
||
| def unregister(self, company_id: str, connection: object) -> None: | ||
| if company_id in self._connections: | ||
| self._connections[company_id].discard(connection) | ||
| if not self._connections[company_id]: | ||
| del self._connections[company_id] | ||
| self._active_count = max(0, self._active_count - 1) | ||
|
|
There was a problem hiding this comment.
Fix _active_count drift in register/unregister paths.
_active_count is incremented/decremented without checking whether set membership actually changed. This can desync capacity tracking and reject valid connections.
💡 Suggested fix
@@
def register(self, company_id: str, connection: object) -> bool:
@@
- if company_id not in self._connections:
- self._connections[company_id] = set()
- self._connections[company_id].add(connection)
+ company_pool = self._connections.setdefault(company_id, set())
+ if connection in company_pool:
+ return True
+ company_pool.add(connection)
self._active_count += 1
return True
@@
def unregister(self, company_id: str, connection: object) -> None:
- if company_id in self._connections:
- self._connections[company_id].discard(connection)
- if not self._connections[company_id]:
- del self._connections[company_id]
- self._active_count = max(0, self._active_count - 1)
+ company_pool = self._connections.get(company_id)
+ if company_pool is None:
+ return
+ if connection in company_pool:
+ company_pool.remove(connection)
+ self._active_count = max(0, self._active_count - 1)
+ if not company_pool:
+ del self._connections[company_id]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/services/websocket_manager.py` around lines 64 - 76, The
_active_count is changed unconditionally in register/unregister causing drift;
update register(self, company_id, connection) to only increment _active_count
when the connection was not already present in self._connections[company_id]
(i.e., check membership before add), and update unregister(self, company_id,
connection) to only decrement _active_count when discard actually removed an
existing connection (i.e., check membership before discard or detect removal),
keeping _connections and _active_count consistent; refer to the methods register
and unregister and the attributes _connections and _active_count when making the
change.
| def get_connections(self, company_id: str) -> Set[object]: | ||
| return self._connections.get(company_id, set()) |
There was a problem hiding this comment.
Do not expose the internal mutable connection set.
Returning the live set lets callers mutate manager state directly, bypassing accounting and cleanup invariants. Return a snapshot instead.
💡 Suggested fix
def get_connections(self, company_id: str) -> Set[object]:
- return self._connections.get(company_id, set())
+ return set(self._connections.get(company_id, set()))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/services/websocket_manager.py` around lines 77 - 78, get_connections
currently returns the live mutable set from self._connections, letting callers
mutate manager state; change it to return an immutable snapshot (e.g., a
frozenset) or a shallow copy of the set so callers cannot alter internal state.
Update the get_connections method to fetch the set via
self._connections.get(company_id, set()) and return a snapshot (frozenset(...)
or set(...)) instead of the original object, preserving the original key and
value types but preventing external mutation of the manager's _connections.
| async def start_cleanup_loop(self): | ||
| async def _loop(): | ||
| while True: | ||
| await asyncio.sleep(self.config.cleanup_interval) | ||
| await self.cleanup_stale() | ||
|
|
||
| self._cleanup_task = asyncio.create_task(_loop()) | ||
| logger.info(f"Cleanup loop started (interval={self.config.cleanup_interval}s)") | ||
|
|
||
| async def stop_cleanup_loop(self): | ||
| if self._cleanup_task: | ||
| self._cleanup_task.cancel() | ||
| self._cleanup_task = None | ||
| logger.info("Cleanup loop stopped") |
There was a problem hiding this comment.
Make cleanup-loop lifecycle idempotent and cancellation-safe.
start_cleanup_loop() can spawn multiple background tasks on repeated calls. stop_cleanup_loop() cancels without awaiting completion.
💡 Suggested fix
@@
import asyncio
+from contextlib import suppress
@@
async def start_cleanup_loop(self):
+ if self._cleanup_task and not self._cleanup_task.done():
+ return
async def _loop():
while True:
await asyncio.sleep(self.config.cleanup_interval)
await self.cleanup_stale()
@@
async def stop_cleanup_loop(self):
- if self._cleanup_task:
- self._cleanup_task.cancel()
- self._cleanup_task = None
- logger.info("Cleanup loop stopped")
+ task = self._cleanup_task
+ if not task:
+ return
+ self._cleanup_task = None
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+ logger.info("Cleanup loop stopped")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/services/websocket_manager.py` around lines 110 - 123, The cleanup
loop lifecycle is not idempotent and cancellation-unsafe: modify
start_cleanup_loop to only create the background task if self._cleanup_task is
None or finished (check task.done()), and in stop_cleanup_loop cancel and then
await the task completion (wrap await in try/except asyncio.CancelledError and
optionally asyncio.TimeoutError) before setting self._cleanup_task = None;
ensure you handle Race conditions by checking task existence again after
awaiting and log appropriately — update the methods start_cleanup_loop and
stop_cleanup_loop accordingly (use self._cleanup_task, _loop coroutine, and
asyncio.create_task references).
Summary
Enables the WebSocket ConnectionManager to dynamically load connection pool parameters from environment variable values.
Changes
backend/services/websocket_manager.pywith aConnectionManagerclassWS_POOL_MIN_SIZE(default: 10),WS_POOL_MAX_SIZE(default: 100),WS_POOL_CLEANUP_INTERVAL(default: 300s)load()/get_instance()pattern for easy integrationCloses #2631
Summary by CodeRabbit
Release Notes