|
11 | 11 | from ably.util.eventemitter import EventEmitter |
12 | 12 | from ably.util.exceptions import AblyException |
13 | 13 | from ably.util.helper import Timer, unix_time_ms |
14 | | -from websockets.client import WebSocketClientProtocol, connect as ws_connect |
| 14 | +try: |
| 15 | + # websockets 15+ preferred imports |
| 16 | + from websockets import ClientConnection as WebSocketClientProtocol, connect as ws_connect |
| 17 | +except ImportError: |
| 18 | + # websockets 14 and earlier fallback |
| 19 | + from websockets.client import WebSocketClientProtocol, connect as ws_connect |
| 20 | + |
15 | 21 | from websockets.exceptions import ConnectionClosedOK, WebSocketException |
16 | 22 |
|
17 | 23 | if TYPE_CHECKING: |
@@ -73,24 +79,33 @@ def on_ws_connect_done(self, task: asyncio.Task): |
73 | 79 |
|
74 | 80 | async def ws_connect(self, ws_url, headers): |
75 | 81 | try: |
76 | | - async with ws_connect(ws_url, extra_headers=headers) as websocket: |
77 | | - log.info(f'ws_connect(): connection established to {ws_url}') |
78 | | - self._emit('connected') |
79 | | - self.websocket = websocket |
80 | | - self.read_loop = self.connection_manager.options.loop.create_task(self.ws_read_loop()) |
81 | | - self.read_loop.add_done_callback(self.on_read_loop_done) |
82 | | - try: |
83 | | - await self.read_loop |
84 | | - except WebSocketException as err: |
85 | | - if not self.is_disposed: |
86 | | - await self.dispose() |
87 | | - self.connection_manager.deactivate_transport(err) |
| 82 | + # Use additional_headers for websockets 15+, fallback to extra_headers for older versions |
| 83 | + try: |
| 84 | + async with ws_connect(ws_url, additional_headers=headers) as websocket: |
| 85 | + await self._handle_websocket_connection(ws_url, websocket) |
| 86 | + except TypeError: |
| 87 | + # Fallback for websockets 14 and earlier |
| 88 | + async with ws_connect(ws_url, extra_headers=headers) as websocket: |
| 89 | + await self._handle_websocket_connection(ws_url, websocket) |
88 | 90 | except (WebSocketException, socket.gaierror) as e: |
89 | 91 | exception = AblyException(f'Error opening websocket connection: {e}', 400, 40000) |
90 | 92 | log.exception(f'WebSocketTransport.ws_connect(): Error opening websocket connection: {exception}') |
91 | 93 | self._emit('failed', exception) |
92 | 94 | raise exception |
93 | 95 |
|
| 96 | + async def _handle_websocket_connection(self, ws_url, websocket): |
| 97 | + log.info(f'ws_connect(): connection established to {ws_url}') |
| 98 | + self._emit('connected') |
| 99 | + self.websocket = websocket |
| 100 | + self.read_loop = self.connection_manager.options.loop.create_task(self.ws_read_loop()) |
| 101 | + self.read_loop.add_done_callback(self.on_read_loop_done) |
| 102 | + try: |
| 103 | + await self.read_loop |
| 104 | + except WebSocketException as err: |
| 105 | + if not self.is_disposed: |
| 106 | + await self.dispose() |
| 107 | + self.connection_manager.deactivate_transport(err) |
| 108 | + |
94 | 109 | async def on_protocol_message(self, msg): |
95 | 110 | self.on_activity() |
96 | 111 | log.debug(f'WebSocketTransport.on_protocol_message(): received protocol message: {msg}') |
|
0 commit comments