Skip to content

refactor: Support dynamic pool sizing in WebSocket ConnectionManager#2797

Open
singhanurag0317-bit wants to merge 1 commit into
ritesh-1918:mainfrom
singhanurag0317-bit:fix/issue-2631-websocket-pool-sizing
Open

refactor: Support dynamic pool sizing in WebSocket ConnectionManager#2797
singhanurag0317-bit wants to merge 1 commit into
ritesh-1918:mainfrom
singhanurag0317-bit:fix/issue-2631-websocket-pool-sizing

Conversation

@singhanurag0317-bit

@singhanurag0317-bit singhanurag0317-bit commented Jun 12, 2026

Copy link
Copy Markdown

Summary

Enables the WebSocket ConnectionManager to dynamically load connection pool parameters from environment variable values.

Changes

  • Created backend/services/websocket_manager.py with a ConnectionManager class
  • Pool bounds configurable via WS_POOL_MIN_SIZE (default: 10), WS_POOL_MAX_SIZE (default: 100), WS_POOL_CLEANUP_INTERVAL (default: 300s)
  • Singleton load() / get_instance() pattern for easy integration
  • register/unregister/broadcast/cleanup_stale methods with type-safe bounds
  • Automatic stale connection cleanup loop

Closes #2631

Summary by CodeRabbit

Release Notes

  • New Features
    • Added WebSocket connection manager with per-company connection pooling to optimize resource usage.
    • Implemented automatic detection and cleanup of inactive connections.
    • Configured dynamic connection capacity limits based on environment variables.
    • Established connection lifecycle management for improved real-time communication stability.

…a env vars

Implements a ConnectionManager that reads pool bounds from environment
variables (WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE, WS_POOL_CLEANUP_INTERVAL)
with type-safe defaults. Supports register/unregister/broadcast/cleanup.

Closes ritesh-1918#2631
@vercel

vercel Bot commented Jun 12, 2026

Copy link
Copy Markdown

Someone is attempting to deploy a commit to the ritesh Team on Vercel.

A member of the Team first needs to authorize it.

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

A new websocket_manager.py module adds per-company connection pooling with environment-driven capacity limits and asynchronous stale-connection cleanup. The module exports a singleton ConnectionManager instance that tracks active connections, enforces pool size constraints, broadcasts messages, and periodically removes stale connections.

Changes

WebSocket Connection Manager with Per-Company Pooling

Layer / File(s) Summary
Configuration and Setup
backend/services/websocket_manager.py
Module-level logger initialization and PoolConfig dataclass read pool parameters (WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE, WS_POOL_CLEANUP_INTERVAL) from environment via from_env().
Connection Manager Core Operations
backend/services/websocket_manager.py
ConnectionManager class maintains per-company_id connection sets, tracks _active_count, enforces max_size on register(), updates counts on unregister(), retrieves connections with get_connections(), and sends messages via broadcast() with per-connection error logging.
Stale Connection Cleanup System
backend/services/websocket_manager.py
cleanup_stale() iterates connections, identifies stale ones via a closed attribute, removes them from sets, adjusts active count, and logs cleanup events. start_cleanup_loop() launches a periodic background task that calls cleanup_stale() at cleanup_interval; stop_cleanup_loop() cancels the task.
Singleton Initialization and Accessor
backend/services/websocket_manager.py
Module-level _instance storage and load() initializer construct PoolConfig/ConnectionManager once from environment and log resolved configuration. get_instance() returns the singleton or None.

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 A manager for sockets, per company it pools,
With stale-connection cleanup and sensible rules,
Async tasks hop through the night,
Broadcast messages sent just right,
Environment-driven config keeps limits all cool! 🌳

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and concisely describes the main change: adding dynamic pool sizing support to the WebSocket ConnectionManager, which is the primary objective of this refactoring.
Linked Issues check ✅ Passed The PR fully implements all coding requirements from issue #2631: environment variable configuration for pool sizing, type safety bounds, optimized logic, and clean coding practices as described.
Out of Scope Changes check ✅ Passed All changes are directly scoped to the new websocket_manager.py module and align with the dynamic pool sizing and configuration refactoring objectives outlined in issue #2631.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/services/websocket_manager.py`:
- Around line 33-38: The from_env classmethod in PoolConfig currently trusts raw
env values and can crash or produce invalid configs; update PoolConfig.from_env
to validate and normalize the three env-derived fields (min_size, max_size,
cleanup_interval): parse integers safely with a fallback/default when parsing
fails, enforce min_size >= 0, cleanup_interval > 0 (replace non-positive with a
sane default like 300), and ensure min_size <= max_size (swap or clamp values if
necessary). Use the existing symbol names (from_env, PoolConfig,
WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE, WS_POOL_CLEANUP_INTERVAL) so callers remain
unchanged and add minimal logging or warnings when fallbacks are used.
- Around line 77-78: get_connections currently returns the live mutable set from
self._connections, letting callers mutate manager state; change it to return an
immutable snapshot (e.g., a frozenset) or a shallow copy of the set so callers
cannot alter internal state. Update the get_connections method to fetch the set
via self._connections.get(company_id, set()) and return a snapshot
(frozenset(...) or set(...)) instead of the original object, preserving the
original key and value types but preventing external mutation of the manager's
_connections.
- Around line 64-76: The _active_count is changed unconditionally in
register/unregister causing drift; update register(self, company_id, connection)
to only increment _active_count when the connection was not already present in
self._connections[company_id] (i.e., check membership before add), and update
unregister(self, company_id, connection) to only decrement _active_count when
discard actually removed an existing connection (i.e., check membership before
discard or detect removal), keeping _connections and _active_count consistent;
refer to the methods register and unregister and the attributes _connections and
_active_count when making the change.
- Around line 110-123: The cleanup loop lifecycle is not idempotent and
cancellation-unsafe: modify start_cleanup_loop to only create the background
task if self._cleanup_task is None or finished (check task.done()), and in
stop_cleanup_loop cancel and then await the task completion (wrap await in
try/except asyncio.CancelledError and optionally asyncio.TimeoutError) before
setting self._cleanup_task = None; ensure you handle Race conditions by checking
task existence again after awaiting and log appropriately — update the methods
start_cleanup_loop and stop_cleanup_loop accordingly (use self._cleanup_task,
_loop coroutine, and asyncio.create_task references).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7e3b97cd-2d50-4a91-8b8d-4957a4bac3cd

📥 Commits

Reviewing files that changed from the base of the PR and between da8faf2 and 9d85580.

📒 Files selected for processing (1)
  • backend/services/websocket_manager.py

Comment on lines +33 to +38
def from_env(cls) -> "PoolConfig":
return cls(
min_size=int(os.getenv("WS_POOL_MIN_SIZE", "10")),
max_size=int(os.getenv("WS_POOL_MAX_SIZE", "100")),
cleanup_interval=int(os.getenv("WS_POOL_CLEANUP_INTERVAL", "300")),
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate and normalize environment pool settings before using them.

from_env() currently trusts raw env values. Non-integer values can crash initialization, and non-positive bounds (especially cleanup interval) can lead to invalid runtime behavior.

💡 Suggested fix
@@
     `@classmethod`
     def from_env(cls) -> "PoolConfig":
-        return cls(
-            min_size=int(os.getenv("WS_POOL_MIN_SIZE", "10")),
-            max_size=int(os.getenv("WS_POOL_MAX_SIZE", "100")),
-            cleanup_interval=int(os.getenv("WS_POOL_CLEANUP_INTERVAL", "300")),
-        )
+        def _parse_int(name: str, default: int) -> int:
+            raw = os.getenv(name, str(default))
+            try:
+                return int(raw)
+            except ValueError:
+                logger.warning("Invalid %s=%r; using default=%d", name, raw, default)
+                return default
+
+        min_size = max(1, _parse_int("WS_POOL_MIN_SIZE", 10))
+        max_size = max(1, _parse_int("WS_POOL_MAX_SIZE", 100))
+        cleanup_interval = max(1, _parse_int("WS_POOL_CLEANUP_INTERVAL", 300))
+
+        if min_size > max_size:
+            logger.warning(
+                "WS_POOL_MIN_SIZE (%d) > WS_POOL_MAX_SIZE (%d); clamping min_size",
+                min_size, max_size
+            )
+            min_size = max_size
+
+        return cls(min_size=min_size, max_size=max_size, cleanup_interval=cleanup_interval)

Also applies to: 113-113

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/services/websocket_manager.py` around lines 33 - 38, The from_env
classmethod in PoolConfig currently trusts raw env values and can crash or
produce invalid configs; update PoolConfig.from_env to validate and normalize
the three env-derived fields (min_size, max_size, cleanup_interval): parse
integers safely with a fallback/default when parsing fails, enforce min_size >=
0, cleanup_interval > 0 (replace non-positive with a sane default like 300), and
ensure min_size <= max_size (swap or clamp values if necessary). Use the
existing symbol names (from_env, PoolConfig, WS_POOL_MIN_SIZE, WS_POOL_MAX_SIZE,
WS_POOL_CLEANUP_INTERVAL) so callers remain unchanged and add minimal logging or
warnings when fallbacks are used.

Comment on lines +64 to +76
if company_id not in self._connections:
self._connections[company_id] = set()
self._connections[company_id].add(connection)
self._active_count += 1
return True

def unregister(self, company_id: str, connection: object) -> None:
if company_id in self._connections:
self._connections[company_id].discard(connection)
if not self._connections[company_id]:
del self._connections[company_id]
self._active_count = max(0, self._active_count - 1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix _active_count drift in register/unregister paths.

_active_count is incremented/decremented without checking whether set membership actually changed. This can desync capacity tracking and reject valid connections.

💡 Suggested fix
@@
     def register(self, company_id: str, connection: object) -> bool:
@@
-        if company_id not in self._connections:
-            self._connections[company_id] = set()
-        self._connections[company_id].add(connection)
+        company_pool = self._connections.setdefault(company_id, set())
+        if connection in company_pool:
+            return True
+        company_pool.add(connection)
         self._active_count += 1
         return True
@@
     def unregister(self, company_id: str, connection: object) -> None:
-        if company_id in self._connections:
-            self._connections[company_id].discard(connection)
-            if not self._connections[company_id]:
-                del self._connections[company_id]
-            self._active_count = max(0, self._active_count - 1)
+        company_pool = self._connections.get(company_id)
+        if company_pool is None:
+            return
+        if connection in company_pool:
+            company_pool.remove(connection)
+            self._active_count = max(0, self._active_count - 1)
+        if not company_pool:
+            del self._connections[company_id]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/services/websocket_manager.py` around lines 64 - 76, The
_active_count is changed unconditionally in register/unregister causing drift;
update register(self, company_id, connection) to only increment _active_count
when the connection was not already present in self._connections[company_id]
(i.e., check membership before add), and update unregister(self, company_id,
connection) to only decrement _active_count when discard actually removed an
existing connection (i.e., check membership before discard or detect removal),
keeping _connections and _active_count consistent; refer to the methods register
and unregister and the attributes _connections and _active_count when making the
change.

Comment on lines +77 to +78
def get_connections(self, company_id: str) -> Set[object]:
return self._connections.get(company_id, set())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not expose the internal mutable connection set.

Returning the live set lets callers mutate manager state directly, bypassing accounting and cleanup invariants. Return a snapshot instead.

💡 Suggested fix
     def get_connections(self, company_id: str) -> Set[object]:
-        return self._connections.get(company_id, set())
+        return set(self._connections.get(company_id, set()))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/services/websocket_manager.py` around lines 77 - 78, get_connections
currently returns the live mutable set from self._connections, letting callers
mutate manager state; change it to return an immutable snapshot (e.g., a
frozenset) or a shallow copy of the set so callers cannot alter internal state.
Update the get_connections method to fetch the set via
self._connections.get(company_id, set()) and return a snapshot (frozenset(...)
or set(...)) instead of the original object, preserving the original key and
value types but preventing external mutation of the manager's _connections.

Comment on lines +110 to +123
async def start_cleanup_loop(self):
async def _loop():
while True:
await asyncio.sleep(self.config.cleanup_interval)
await self.cleanup_stale()

self._cleanup_task = asyncio.create_task(_loop())
logger.info(f"Cleanup loop started (interval={self.config.cleanup_interval}s)")

async def stop_cleanup_loop(self):
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
logger.info("Cleanup loop stopped")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make cleanup-loop lifecycle idempotent and cancellation-safe.

start_cleanup_loop() can spawn multiple background tasks on repeated calls. stop_cleanup_loop() cancels without awaiting completion.

💡 Suggested fix
@@
 import asyncio
+from contextlib import suppress
@@
     async def start_cleanup_loop(self):
+        if self._cleanup_task and not self._cleanup_task.done():
+            return
         async def _loop():
             while True:
                 await asyncio.sleep(self.config.cleanup_interval)
                 await self.cleanup_stale()
@@
     async def stop_cleanup_loop(self):
-        if self._cleanup_task:
-            self._cleanup_task.cancel()
-            self._cleanup_task = None
-            logger.info("Cleanup loop stopped")
+        task = self._cleanup_task
+        if not task:
+            return
+        self._cleanup_task = None
+        task.cancel()
+        with suppress(asyncio.CancelledError):
+            await task
+        logger.info("Cleanup loop stopped")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/services/websocket_manager.py` around lines 110 - 123, The cleanup
loop lifecycle is not idempotent and cancellation-unsafe: modify
start_cleanup_loop to only create the background task if self._cleanup_task is
None or finished (check task.done()), and in stop_cleanup_loop cancel and then
await the task completion (wrap await in try/except asyncio.CancelledError and
optionally asyncio.TimeoutError) before setting self._cleanup_task = None;
ensure you handle Race conditions by checking task existence again after
awaiting and log appropriately — update the methods start_cleanup_loop and
stop_cleanup_loop accordingly (use self._cleanup_task, _loop coroutine, and
asyncio.create_task references).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

refactor: Support dynamic pool sizing in WebSocket ConnectionManager

1 participant