-
Notifications
You must be signed in to change notification settings - Fork 42
feat: support forward ws #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds forward WebSocket support to aiocqhttp, enabling the library to act as a WebSocket client that connects to OneBot servers. This complements the existing HTTP and reverse WebSocket modes.
Changes:
- Implemented
WebSocketForwardApiclass for forward WebSocket connections with automatic connection management and dual message routing - Updated
UnifiedApito support three connection modes with priority: Forward WebSocket > Reverse WebSocket > HTTP - Added optional
websockets>=8.0dependency with new extras_require configurations
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| aiocqhttp/api_impl.py | Adds WebSocketForwardApi class with connection management, message routing, and updates UnifiedApi priority logic |
| aiocqhttp/init.py | Integrates WebSocket URL detection and configures appropriate API based on api_root type |
| setup.py | Adds websockets dependency to extras_require for forward-ws and all extras |
| test_forward_ws.py | Basic functionality tests for HTTP mode, WebSocket detection, and backward compatibility |
| test_stability.py | Tests for early API access, inheritance safety, and reconfiguration scenarios |
| demo_forward_ws.py | Demo script showing forward WebSocket usage and migration examples |
| FORWARD_WEBSOCKET.md | Complete user documentation for forward WebSocket support |
| IMPLEMENTATION_SUMMARY.md | Technical implementation summary and verification notes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except Exception as e: | ||
| self._logger.error(f'Error in message loop: {e}') |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a general exception occurs in the message loop (line 359), it's logged but the loop continues. However, if the exception is due to a serious issue with the connection object itself, continuing to loop could lead to repeated errors and log spam. Consider breaking the loop or adding a backoff mechanism for repeated errors. Additionally, the exception context doesn't include enough information to diagnose issues - consider logging the message data or connection state.
| await self._event_handler(data) | ||
| except Exception as e: | ||
| self._logger.error(f'Error handling event: {e}') | ||
| asyncio.create_task(_run_handler()) |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event handler task is created but not awaited or tracked. If an exception occurs in the event handler after the task is created, it will be silently logged but the task reference is lost. This could lead to issues with proper cleanup and debugging. Consider tracking these tasks or using a task group for better lifecycle management.
| asyncio.create_task(_run_handler()) | |
| # Track handler tasks to avoid losing references and improve lifecycle management | |
| if not hasattr(self, "_handler_tasks"): | |
| self._handler_tasks = set() | |
| task = asyncio.create_task(_run_handler()) | |
| self._handler_tasks.add(task) | |
| task.add_done_callback(self._handler_tasks.discard) |
| async def _ensure_connected(self): | ||
| if self._is_connection_closed(): | ||
| async with self._connect_lock: | ||
| if self._is_connection_closed(): | ||
| await self._connect() |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition here. If multiple concurrent API calls are made before the connection is established, they could all pass the _is_connection_closed() check before any of them acquire the lock, potentially leading to multiple connection attempts. While the lock prevents actual concurrent connects, the futures created before connection establishment could timeout. Consider checking connection status again after acquiring the lock but before calling _ensure_connected() in the call_action method.
| if 'echo' in data and data['echo'] in self._response_futures: | ||
| future = self._response_futures[data['echo']] |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message routing logic checks if data['echo'] is in self._response_futures, but data['echo'] could be any type (string, int, dict, etc.). If the OneBot server echoes back a complex structure (like a dict) that happens to match the string UUID key used for API responses, this could lead to unexpected behavior. Consider adding type checking to ensure data['echo'] is a string before looking it up in the futures dict.
| if 'echo' in data and data['echo'] in self._response_futures: | |
| future = self._response_futures[data['echo']] | |
| echo = data.get('echo') | |
| if isinstance(echo, str) and echo in self._response_futures: | |
| future = self._response_futures[echo] |
| def _schedule_auto_connect(self) -> None: | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| try: | ||
| loop = asyncio.get_event_loop() | ||
| except RuntimeError: | ||
| loop = None | ||
|
|
||
| if not loop: | ||
| self._logger.warning('No event loop available for forward WebSocket auto-connect') | ||
| return | ||
|
|
||
| try: | ||
| self._auto_connect_task = loop.create_task(self._auto_connect()) | ||
| except Exception as e: | ||
| self._logger.error(f'Failed to schedule forward WebSocket auto-connect: {e}') | ||
|
|
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The auto-connect task created in _schedule_auto_connect is not properly tracked or awaited. If the bot is shut down before this task completes, it may lead to warnings about unawaited tasks or incomplete cleanup. Consider storing the task and ensuring it's cancelled/awaited in the close() method.
| timeout=self._timeout_sec | ||
| ) | ||
| self._running = True | ||
| asyncio.create_task(self._message_loop()) |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task created for the message loop is not stored or tracked. If the WebSocket connection closes unexpectedly or the bot shuts down, this task may not be properly cleaned up. Consider storing the task reference and cancelling it in the close() method to ensure graceful shutdown.
| async def close(self): | ||
| self._running = False | ||
| if self._connection: | ||
| await self._connection.close() | ||
| self._connection = None |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the close() method, the auto-connect task (if it exists) should also be cancelled. Currently, only the connection is closed, but the _auto_connect_task created in line 257 may still be running, which could lead to attempts to reconnect after the API has been closed.
| event_handler=self._handle_event | ||
| ) | ||
| except ImportError as e: | ||
| self.logger.error(f"Failed to create WebSocketForwardApi: {e}") |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the ImportError is raised and re-raised here, the WebSocketForwardApi initialization fails but leaves the bot in an inconsistent state. The _api instance already exists but won't have a properly configured _wsf_api. Consider whether the error should be caught and the bot should fall back to HTTP mode instead, or document that forward WebSocket is a hard requirement when a ws:// URL is provided.
| self.logger.error(f"Failed to create WebSocketForwardApi: {e}") | |
| self.logger.error(f"Failed to create WebSocketForwardApi: {e}") | |
| # Avoid leaving the unified API in a potentially inconsistent state | |
| # if forward WebSocket support cannot be initialized. | |
| self._api = None |
| raise ApiNotAvailable | ||
|
|
||
| echo = str(uuid.uuid4()) | ||
| future = asyncio.get_event_loop().create_future() |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using asyncio.get_event_loop() is deprecated in Python 3.10+ and can cause issues. Consider using asyncio.get_running_loop() instead, which is the recommended approach for code already running in an async context. This matches the pattern used elsewhere in the codebase (e.g., line 108 in ResultStore.fetch).
| future = asyncio.get_event_loop().create_future() | |
| future = asyncio.get_running_loop().create_future() |
| async def call_action(self, action: str, **params) -> Any: | ||
| await self._ensure_connected() | ||
| if not self._connection: | ||
| raise ApiNotAvailable | ||
|
|
||
| echo = str(uuid.uuid4()) | ||
| future = asyncio.get_event_loop().create_future() | ||
| self._response_futures[echo] = future | ||
|
|
||
| request = { | ||
| 'action': action, | ||
| 'params': params, | ||
| 'echo': echo | ||
| } | ||
|
|
||
| try: | ||
| await self._connection.send(json.dumps(request, ensure_ascii=False)) | ||
| response = await asyncio.wait_for(future, timeout=self._timeout_sec) | ||
| return _handle_api_result(response) | ||
| except asyncio.TimeoutError: | ||
| raise NetworkError('WebSocket API call timeout') | ||
| finally: | ||
| self._response_futures.pop(echo, None) |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the connection fails during _ensure_connected() and raises a NetworkError, the future in _response_futures will never be resolved, leading to a timeout. While the timeout is handled, the future remains in the dict until the timeout occurs. Consider handling connection errors before creating the future, or ensure proper cleanup of the future if connection fails after it's created.
| timeout_sec: float, event_handler: Optional[Callable]): | ||
| super().__init__() | ||
| if not websockets: | ||
| raise ImportError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这种情况不应该 raise ImportError,而是 RuntimeError 吧
| access_token=access_token, | ||
| timeout_sec=api_timeout_sec, | ||
| event_handler=self._handle_event) | ||
| except ImportError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里建议 catch 所有 Exception 然后日志里会显示具体原因
| self._auto_connect_task = None | ||
| self._schedule_auto_connect() | ||
|
|
||
| def _schedule_auto_connect(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个在非 async context 下会无法启动,是不是应该通过 before serving 来启动更保险?同时可以使用 asyncio.create_task
| raise ApiNotAvailable | ||
|
|
||
| echo = str(uuid.uuid4()) | ||
| future = asyncio.get_event_loop().create_future() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该是 running loop
| def _is_connection_closed(self) -> bool: | ||
| if not self._connection: | ||
| return True | ||
| if hasattr(self._connection, "closed"): | ||
| return bool(self._connection.closed) | ||
| if hasattr(self._connection, "close_code"): | ||
| return self._connection.close_code is not None | ||
| state = getattr(self._connection, "state", None) | ||
| if state is not None: | ||
| return str(state).lower() in {"closing", "closed"} | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么会这样
| extra_headers=headers, | ||
| timeout=self._timeout_sec) | ||
| self._running = True | ||
| asyncio.create_task(self._message_loop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个task没有被track到,会被丢失
| except Exception as e: | ||
| self._logger.error(f'Error handling event: {e}') | ||
|
|
||
| asyncio.create_task(_run_handler()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同样的,task 没有被 track 到
Co-authored-by: Ju4tCode <[email protected]>
变更
测试