Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ async def __get_transport_params(self) -> dict:
params["v"] = protocol_version
if self.connection_details:
params["resume"] = self.connection_details.connection_key
# RTN2a: Set format to msgpack if use_binary_protocol is enabled
if self.options.use_binary_protocol:
params["format"] = "msgpack"
return params

async def close_impl(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion ably/realtime/realtime_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ def _on_message(self, proto_msg: dict) -> None:
elif action == ProtocolMessageAction.MESSAGE:
messages = []
try:
messages = Message.from_encoded_array(proto_msg.get('messages'), context=self.__decoding_context)
messages = Message.from_encoded_array(proto_msg.get('messages'),
cipher=self.cipher, context=self.__decoding_context)
self.__decoding_context.last_message_id = messages[-1].id
self.__channel_serial = channel_serial
Comment on lines +561 to 564
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n ably/realtime/realtime_channel.py | sed -n '555,575p'

Repository: ably/ably-python

Length of output: 1352


🏁 Script executed:

rg -n --type=py '\.from_encoded_array\s*\(' --max-count=20

Repository: ably/ably-python

Length of output: 487


🏁 Script executed:

# Find the Message.from_encoded_array definition to understand its signature
ast-grep --pattern 'def from_encoded_array($$$)'

Repository: ably/ably-python

Length of output: 262


🏁 Script executed:

# Search for tests or comments about MESSAGE action with empty/missing messages
rg -n --type=py 'ProtocolMessageAction\.MESSAGE|proto_msg.*messages' -A 3 -B 3 | head -50

Repository: ably/ably-python

Length of output: 3967


🏁 Script executed:

# Check if there are any proto message construction examples in tests
rg -n --type=py 'messages.*\[\]|messages.*None' -B 2 -A 2 | head -40

Repository: ably/ably-python

Length of output: 2758


🏁 Script executed:

# Look at exception handling patterns in similar code sections
cat -n ably/realtime/realtime_channel.py | sed -n '540,580p'

Repository: ably/ably-python

Length of output: 2486


🏁 Script executed:

# Check if there's any validation or protocol specification for MESSAGE action messages field
rg -n --type=py 'MESSAGE|messages' ably/realtime/realtime_channel.py | grep -i 'protocol\|spec\|doc' | head -20

Repository: ably/ably-python

Length of output: 283


🏁 Script executed:

# Look at how proto_msg is constructed/parsed to understand if messages key is always present
rg -n --type=py 'def _on_message|proto_msg.*=' ably/realtime/realtime_channel.py | head -10

Repository: ably/ably-python

Length of output: 115


🏁 Script executed:

# Check if there are any edge cases in tests that exercise empty message scenarios
rg -n --type=py 'messages.*=.*\[\]' test/ -B 2 -A 2 | head -30

Repository: ably/ably-python

Length of output: 2124


Guard against missing/empty messages when updating last_message_id.
Line 561–563 can crash with TypeError (if proto_msg.get('messages') is None) or IndexError (if the list is empty). The exception handler at line 565 only catches AblyException, so these errors will propagate uncaught.

-            messages = Message.from_encoded_array(proto_msg.get('messages'),
-                                                  cipher=self.cipher, context=self.__decoding_context)
-            self.__decoding_context.last_message_id = messages[-1].id
+            raw_messages = proto_msg.get('messages') or []
+            messages = Message.from_encoded_array(
+                raw_messages,
+                cipher=self.cipher,
+                context=self.__decoding_context,
+            )
+            if messages:
+                self.__decoding_context.last_message_id = messages[-1].id
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
messages = Message.from_encoded_array(proto_msg.get('messages'),
cipher=self.cipher, context=self.__decoding_context)
self.__decoding_context.last_message_id = messages[-1].id
self.__channel_serial = channel_serial
raw_messages = proto_msg.get('messages') or []
messages = Message.from_encoded_array(
raw_messages,
cipher=self.cipher,
context=self.__decoding_context,
)
if messages:
self.__decoding_context.last_message_id = messages[-1].id
self.__channel_serial = channel_serial

except AblyException as e:
Expand Down
29 changes: 24 additions & 5 deletions ably/transport/websockettransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import IntEnum
from typing import TYPE_CHECKING

import msgpack

from ably.http.httputils import HttpUtils
from ably.types.connectiondetails import ConnectionDetails
from ably.util.eventemitter import EventEmitter
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic
self.is_disposed = False
self.host = host
self.params = params
self.format = params.get('format', 'json')
super().__init__()

def connect(self):
Expand Down Expand Up @@ -189,12 +192,23 @@ async def ws_read_loop(self):
raise AblyException('ws_read_loop started with no websocket', 500, 50000)
try:
async for raw in self.websocket:
msg = json.loads(raw)
task = asyncio.create_task(self.on_protocol_message(msg))
task.add_done_callback(self.on_protcol_message_handled)
# Decode based on format
try:
msg = self.decode_raw_websocket_frame(raw)
task = asyncio.create_task(self.on_protocol_message(msg))
task.add_done_callback(self.on_protcol_message_handled)
except Exception as e:
log.exception(
f"WebSocketTransport.decode(): Unexpected exception handling channel message: {e}"
)
except ConnectionClosedOK:
return

def decode_raw_websocket_frame(self, raw: str | bytes) -> dict:
if self.format == 'msgpack':
return msgpack.unpackb(raw, raw=False)
return json.loads(raw)

def on_protcol_message_handled(self, task):
try:
exception = task.exception()
Expand Down Expand Up @@ -231,8 +245,13 @@ async def close(self):
async def send(self, message: dict):
if self.websocket is None:
raise Exception()
raw_msg = json.dumps(message)
log.info(f'WebSocketTransport.send(): sending {raw_msg}')
# Encode based on format
if self.format == 'msgpack':
raw_msg = msgpack.packb(message, use_bin_type=True)
log.info(f'WebSocketTransport.send(): sending msgpack message (length: {len(raw_msg)} bytes)')
else:
raw_msg = json.dumps(message)
log.info(f'WebSocketTransport.send(): sending {raw_msg}')
await self.websocket.send(raw_msg)

def set_idle_timer(self, timeout: float):
Expand Down
Loading
Loading