Skip to content

Conversation

@a0gent
Copy link

@a0gent a0gent commented Feb 1, 2026

变更

  • 文档补充正向 WebSocket 使用方式与依赖安装说明
  • README 与 API docstring 同步更新为支持正向 WebSocket

测试

  • 已运行 yapf/flake8
  • 已经在 AstrBot 测试。可以收发消息

Copilot AI review requested due to automatic review settings February 1, 2026 07:03
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds forward WebSocket support to aiocqhttp, enabling the library to act as a WebSocket client that connects to OneBot servers. This complements the existing HTTP and reverse WebSocket modes.

Changes:

  • Implemented WebSocketForwardApi class for forward WebSocket connections with automatic connection management and dual message routing
  • Updated UnifiedApi to support three connection modes with priority: Forward WebSocket > Reverse WebSocket > HTTP
  • Added optional websockets>=8.0 dependency with new extras_require configurations

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
aiocqhttp/api_impl.py Adds WebSocketForwardApi class with connection management, message routing, and updates UnifiedApi priority logic
aiocqhttp/init.py Integrates WebSocket URL detection and configures appropriate API based on api_root type
setup.py Adds websockets dependency to extras_require for forward-ws and all extras
test_forward_ws.py Basic functionality tests for HTTP mode, WebSocket detection, and backward compatibility
test_stability.py Tests for early API access, inheritance safety, and reconfiguration scenarios
demo_forward_ws.py Demo script showing forward WebSocket usage and migration examples
FORWARD_WEBSOCKET.md Complete user documentation for forward WebSocket support
IMPLEMENTATION_SUMMARY.md Technical implementation summary and verification notes

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +358 to +359
except Exception as e:
self._logger.error(f'Error in message loop: {e}')
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a general exception occurs in the message loop (line 359), it's logged but the loop continues. However, if the exception is due to a serious issue with the connection object itself, continuing to loop could lead to repeated errors and log spam. Consider breaking the loop or adding a backoff mechanism for repeated errors. Additionally, the exception context doesn't include enough information to diagnose issues - consider logging the message data or connection state.

Copilot uses AI. Check for mistakes.
await self._event_handler(data)
except Exception as e:
self._logger.error(f'Error handling event: {e}')
asyncio.create_task(_run_handler())
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event handler task is created but not awaited or tracked. If an exception occurs in the event handler after the task is created, it will be silently logged but the task reference is lost. This could lead to issues with proper cleanup and debugging. Consider tracking these tasks or using a task group for better lifecycle management.

Suggested change
asyncio.create_task(_run_handler())
# Track handler tasks to avoid losing references and improve lifecycle management
if not hasattr(self, "_handler_tasks"):
self._handler_tasks = set()
task = asyncio.create_task(_run_handler())
self._handler_tasks.add(task)
task.add_done_callback(self._handler_tasks.discard)

Copilot uses AI. Check for mistakes.
Comment on lines +291 to +295
async def _ensure_connected(self):
if self._is_connection_closed():
async with self._connect_lock:
if self._is_connection_closed():
await self._connect()
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition here. If multiple concurrent API calls are made before the connection is established, they could all pass the _is_connection_closed() check before any of them acquire the lock, potentially leading to multiple connection attempts. While the lock prevents actual concurrent connects, the futures created before connection establishment could timeout. Consider checking connection status again after acquiring the lock but before calling _ensure_connected() in the call_action method.

Copilot uses AI. Check for mistakes.
Comment on lines +366 to +367
if 'echo' in data and data['echo'] in self._response_futures:
future = self._response_futures[data['echo']]
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message routing logic checks if data['echo'] is in self._response_futures, but data['echo'] could be any type (string, int, dict, etc.). If the OneBot server echoes back a complex structure (like a dict) that happens to match the string UUID key used for API responses, this could lead to unexpected behavior. Consider adding type checking to ensure data['echo'] is a string before looking it up in the futures dict.

Suggested change
if 'echo' in data and data['echo'] in self._response_futures:
future = self._response_futures[data['echo']]
echo = data.get('echo')
if isinstance(echo, str) and echo in self._response_futures:
future = self._response_futures[echo]

Copilot uses AI. Check for mistakes.
Comment on lines 243 to 260
def _schedule_auto_connect(self) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = None

if not loop:
self._logger.warning('No event loop available for forward WebSocket auto-connect')
return

try:
self._auto_connect_task = loop.create_task(self._auto_connect())
except Exception as e:
self._logger.error(f'Failed to schedule forward WebSocket auto-connect: {e}')

Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The auto-connect task created in _schedule_auto_connect is not properly tracked or awaited. If the bot is shut down before this task completes, it may lead to warnings about unawaited tasks or incomplete cleanup. Consider storing the task and ensuring it's cancelled/awaited in the close() method.

Copilot uses AI. Check for mistakes.
timeout=self._timeout_sec
)
self._running = True
asyncio.create_task(self._message_loop())
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task created for the message loop is not stored or tracked. If the WebSocket connection closes unexpectedly or the bot shuts down, this task may not be properly cleaned up. Consider storing the task reference and cancelling it in the close() method to ensure graceful shutdown.

Copilot uses AI. Check for mistakes.
Comment on lines +379 to +383
async def close(self):
self._running = False
if self._connection:
await self._connection.close()
self._connection = None
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the close() method, the auto-connect task (if it exists) should also be cancelled. Currently, only the connection is closed, but the _auto_connect_task created in line 257 may still be running, which could lead to attempts to reconnect after the API has been closed.

Copilot uses AI. Check for mistakes.
event_handler=self._handle_event
)
except ImportError as e:
self.logger.error(f"Failed to create WebSocketForwardApi: {e}")
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the ImportError is raised and re-raised here, the WebSocketForwardApi initialization fails but leaves the bot in an inconsistent state. The _api instance already exists but won't have a properly configured _wsf_api. Consider whether the error should be caught and the bot should fall back to HTTP mode instead, or document that forward WebSocket is a hard requirement when a ws:// URL is provided.

Suggested change
self.logger.error(f"Failed to create WebSocketForwardApi: {e}")
self.logger.error(f"Failed to create WebSocketForwardApi: {e}")
# Avoid leaving the unified API in a potentially inconsistent state
# if forward WebSocket support cannot be initialized.
self._api = None

Copilot uses AI. Check for mistakes.
raise ApiNotAvailable

echo = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.get_event_loop() is deprecated in Python 3.10+ and can cause issues. Consider using asyncio.get_running_loop() instead, which is the recommended approach for code already running in an async context. This matches the pattern used elsewhere in the codebase (e.g., line 108 in ResultStore.fetch).

Suggested change
future = asyncio.get_event_loop().create_future()
future = asyncio.get_running_loop().create_future()

Copilot uses AI. Check for mistakes.
Comment on lines 267 to 289
async def call_action(self, action: str, **params) -> Any:
await self._ensure_connected()
if not self._connection:
raise ApiNotAvailable

echo = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self._response_futures[echo] = future

request = {
'action': action,
'params': params,
'echo': echo
}

try:
await self._connection.send(json.dumps(request, ensure_ascii=False))
response = await asyncio.wait_for(future, timeout=self._timeout_sec)
return _handle_api_result(response)
except asyncio.TimeoutError:
raise NetworkError('WebSocket API call timeout')
finally:
self._response_futures.pop(echo, None)
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection fails during _ensure_connected() and raises a NetworkError, the future in _response_futures will never be resolved, leading to a timeout. While the timeout is handled, the future remains in the dict until the timeout occurs. Consider handling connection errors before creating the future, or ensure proper cleanup of the future if connection fails after it's created.

Copilot uses AI. Check for mistakes.
timeout_sec: float, event_handler: Optional[Callable]):
super().__init__()
if not websockets:
raise ImportError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种情况不应该 raise ImportError,而是 RuntimeError 吧

access_token=access_token,
timeout_sec=api_timeout_sec,
event_handler=self._handle_event)
except ImportError as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里建议 catch 所有 Exception 然后日志里会显示具体原因

self._auto_connect_task = None
self._schedule_auto_connect()

def _schedule_auto_connect(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个在非 async context 下会无法启动,是不是应该通过 before serving 来启动更保险?同时可以使用 asyncio.create_task

raise ApiNotAvailable

echo = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该是 running loop

Comment on lines +295 to +305
def _is_connection_closed(self) -> bool:
if not self._connection:
return True
if hasattr(self._connection, "closed"):
return bool(self._connection.closed)
if hasattr(self._connection, "close_code"):
return self._connection.close_code is not None
state = getattr(self._connection, "state", None)
if state is not None:
return str(state).lower() in {"closing", "closed"}
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么会这样

extra_headers=headers,
timeout=self._timeout_sec)
self._running = True
asyncio.create_task(self._message_loop())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个task没有被track到,会被丢失

except Exception as e:
self._logger.error(f'Error handling event: {e}')

asyncio.create_task(_run_handler())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同样的,task 没有被 track 到

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants