1212from test .ably .utils import BaseAsyncTestCase , WaitableEvent , assert_waiter
1313
1414
15+ @pytest .mark .parametrize ("transport" , ["json" , "msgpack" ], ids = ["JSON" , "MsgPack" ])
1516class TestRealtimeChannelPublish (BaseAsyncTestCase ):
1617 """Tests for RTN7 spec - Message acknowledgment"""
1718
1819 @pytest .fixture (autouse = True )
19- async def setup (self ):
20+ async def setup (self , transport ):
2021 self .test_vars = await TestApp .get_test_vars ()
22+ self .use_binary_protocol = True if transport == 'msgpack' else False
2123
2224 # RTN7a - Basic ACK/NACK functionality
2325 async def test_publish_returns_ack_on_success (self ):
2426 """RTN7a: Verify that publish awaits ACK from server"""
25- ably = await TestApp .get_ably_realtime ()
27+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
2628 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
2729
2830 channel = ably .channels .get ('test_ack_channel' )
@@ -35,7 +37,7 @@ async def test_publish_returns_ack_on_success(self):
3537
3638 async def test_publish_raises_on_nack (self ):
3739 """RTN7a: Verify that publish raises exception when NACK is received"""
38- ably = await TestApp .get_ably_realtime ()
40+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
3941 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
4042
4143 channel = ably .channels .get ('test_nack_channel' )
@@ -77,7 +79,7 @@ async def send_and_nack(message):
7779 # RTN7b - msgSerial incrementing
7880 async def test_msgserial_increments_sequentially (self ):
7981 """RTN7b: Verify that msgSerial increments for each message"""
80- ably = await TestApp .get_ably_realtime ()
82+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
8183 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
8284
8385 channel = ably .channels .get ('test_msgserial_channel' )
@@ -109,7 +111,7 @@ async def capture_serial(message):
109111 # RTN7e - Fail pending messages on SUSPENDED, CLOSED, FAILED
110112 async def test_pending_messages_fail_on_suspended (self ):
111113 """RTN7e: Verify pending messages fail when connection enters SUSPENDED state"""
112- ably = await TestApp .get_ably_realtime ()
114+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
113115 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
114116
115117 channel = ably .channels .get ('test_suspended_channel' )
@@ -154,7 +156,7 @@ async def check_pending():
154156
155157 async def test_pending_messages_fail_on_failed (self ):
156158 """RTN7e: Verify pending messages fail when connection enters FAILED state"""
157- ably = await TestApp .get_ably_realtime ()
159+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
158160 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
159161
160162 channel = ably .channels .get ('test_failed_channel' )
@@ -196,7 +198,7 @@ async def check_pending():
196198 async def test_fail_on_disconnected_when_queue_messages_false (self ):
197199 """RTN7d: Verify pending messages fail on DISCONNECTED if queueMessages is false"""
198200 # Create client with queueMessages=False
199- ably = await TestApp .get_ably_realtime (queue_messages = False )
201+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol , queue_messages = False )
200202 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
201203
202204 channel = ably .channels .get ('test_disconnected_channel' )
@@ -237,7 +239,7 @@ async def check_pending():
237239 async def test_queue_on_disconnected_when_queue_messages_true (self ):
238240 """RTN7d: Verify messages are queued (not failed) on DISCONNECTED when queueMessages is true"""
239241 # Create client with queueMessages=True (default)
240- ably = await TestApp .get_ably_realtime (queue_messages = True )
242+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol , queue_messages = True )
241243 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
242244
243245 channel = ably .channels .get ('test_queue_channel' )
@@ -286,7 +288,7 @@ async def check_disconnected():
286288 # RTN19a2 - Reset msgSerial on new connectionId
287289 async def test_msgserial_resets_on_new_connection_id (self ):
288290 """RTN19a2: Verify msgSerial resets to 0 when connectionId changes"""
289- ably = await TestApp .get_ably_realtime ()
291+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
290292 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
291293
292294 channel = ably .channels .get ('test_reset_serial_channel' )
@@ -323,7 +325,7 @@ async def test_msgserial_resets_on_new_connection_id(self):
323325
324326 async def test_msgserial_not_reset_on_same_connection_id (self ):
325327 """RTN19a2: Verify msgSerial is NOT reset when connectionId stays the same"""
326- ably = await TestApp .get_ably_realtime ()
328+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
327329 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
328330
329331 channel = ably .channels .get ('test_same_connection_channel' )
@@ -361,7 +363,7 @@ async def test_msgserial_not_reset_on_same_connection_id(self):
361363 # Test that multiple messages get correct msgSerial values
362364 async def test_multiple_messages_concurrent (self ):
363365 """RTN7b: Test that multiple concurrent publishes get sequential msgSerials"""
364- ably = await TestApp .get_ably_realtime ()
366+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
365367 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
366368
367369 channel = ably .channels .get ('test_concurrent_channel' )
@@ -384,7 +386,7 @@ async def test_multiple_messages_concurrent(self):
384386 # RTN19a - Resend messages awaiting ACK on reconnect
385387 async def test_pending_messages_resent_on_reconnect (self ):
386388 """RTN19a: Verify messages awaiting ACK are resent when transport reconnects"""
387- ably = await TestApp .get_ably_realtime ()
389+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
388390 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
389391
390392 channel = ably .channels .get ('test_resend_channel' )
@@ -438,7 +440,7 @@ async def check_pending():
438440
439441 async def test_msgserial_preserved_on_resume (self ):
440442 """RTN19a2: Verify msgSerial counter is preserved when resuming (same connectionId)"""
441- ably = await TestApp .get_ably_realtime ()
443+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
442444 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
443445
444446 channel = ably .channels .get ('test_preserve_serial_channel' )
@@ -489,7 +491,7 @@ async def check_pending():
489491
490492 async def test_msgserial_reset_on_failed_resume (self ):
491493 """RTN19a2: Verify msgSerial counter is reset when resume fails (new connectionId)"""
492- ably = await TestApp .get_ably_realtime ()
494+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
493495 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
494496
495497 channel = ably .channels .get ('test_reset_serial_resume_channel' )
@@ -541,7 +543,7 @@ async def check_pending():
541543 # Test ACK with count > 1
542544 async def test_ack_with_multiple_count (self ):
543545 """RTN7a/RTN7b: Test that ACK with count > 1 completes multiple messages"""
544- ably = await TestApp .get_ably_realtime ()
546+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
545547 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
546548
547549 channel = ably .channels .get ('test_multi_ack_channel' )
@@ -590,7 +592,7 @@ async def check_pending():
590592 async def test_queued_messages_sent_before_channel_reattach (self ):
591593 """RTL3d + RTL6c2: Verify queued messages are sent immediately on reconnection,
592594 without waiting for channel reattachment to complete"""
593- ably = await TestApp .get_ably_realtime (queue_messages = True )
595+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol , queue_messages = True )
594596 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
595597
596598 channel = ably .channels .get ('test_rtl3d_rtl6c2_channel' )
@@ -682,7 +684,7 @@ async def check_sent_queued_messages():
682684 # RSL1i - Message size limit tests
683685 async def test_publish_message_exceeding_size_limit (self ):
684686 """RSL1i: Verify that publishing a message exceeding the size limit raises an exception"""
685- ably = await TestApp .get_ably_realtime ()
687+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
686688 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
687689
688690 channel = ably .channels .get ('test_size_limit_channel' )
@@ -703,7 +705,7 @@ async def test_publish_message_exceeding_size_limit(self):
703705
704706 async def test_publish_message_within_size_limit (self ):
705707 """RSL1i: Verify that publishing a message within the size limit succeeds"""
706- ably = await TestApp .get_ably_realtime ()
708+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
707709 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
708710
709711 channel = ably .channels .get ('test_size_ok_channel' )
@@ -721,7 +723,9 @@ async def test_publish_message_within_size_limit(self):
721723 # RTL6g - Client ID validation tests
722724 async def test_publish_with_matching_client_id (self ):
723725 """RTL6g2: Verify that publishing with explicit matching clientId succeeds"""
724- ably = await TestApp .get_ably_realtime (client_id = 'test_client_123' )
726+ ably = await TestApp .get_ably_realtime (
727+ use_binary_protocol = self .use_binary_protocol , client_id = 'test_client_123'
728+ )
725729 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
726730
727731 channel = ably .channels .get ('test_client_id_channel' )
@@ -737,7 +741,9 @@ async def test_publish_with_matching_client_id(self):
737741
738742 async def test_publish_with_null_client_id_when_identified (self ):
739743 """RTL6g1: Verify that publishing with null clientId gets populated by server when client is identified"""
740- ably = await TestApp .get_ably_realtime (client_id = 'test_client_456' )
744+ ably = await TestApp .get_ably_realtime (
745+ use_binary_protocol = self .use_binary_protocol , client_id = 'test_client_456'
746+ )
741747 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
742748
743749 channel = ably .channels .get ('test_null_client_id_channel' )
@@ -750,7 +756,9 @@ async def test_publish_with_null_client_id_when_identified(self):
750756
751757 async def test_publish_with_mismatched_client_id_fails (self ):
752758 """RTL6g3: Verify that publishing with mismatched clientId is rejected"""
753- ably = await TestApp .get_ably_realtime (client_id = 'test_client_789' )
759+ ably = await TestApp .get_ably_realtime (
760+ use_binary_protocol = self .use_binary_protocol , client_id = 'test_client_789'
761+ )
754762 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
755763
756764 channel = ably .channels .get ('test_mismatch_client_id_channel' )
@@ -770,7 +778,9 @@ async def test_publish_with_mismatched_client_id_fails(self):
770778
771779 async def test_publish_with_wildcard_client_id_fails (self ):
772780 """RTL6g3: Verify that publishing with wildcard clientId is rejected"""
773- ably = await TestApp .get_ably_realtime (client_id = 'test_client_wildcard' )
781+ ably = await TestApp .get_ably_realtime (
782+ use_binary_protocol = self .use_binary_protocol , client_id = 'test_client_wildcard'
783+ )
774784 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
775785
776786 channel = ably .channels .get ('test_wildcard_client_id_channel' )
@@ -791,7 +801,7 @@ async def test_publish_with_wildcard_client_id_fails(self):
791801 # RTL6i - Data type variation tests
792802 async def test_publish_with_string_data (self ):
793803 """RTL6i: Verify that publishing with string data succeeds"""
794- ably = await TestApp .get_ably_realtime ()
804+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
795805 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
796806
797807 channel = ably .channels .get ('test_string_data_channel' )
@@ -804,7 +814,7 @@ async def test_publish_with_string_data(self):
804814
805815 async def test_publish_with_json_object_data (self ):
806816 """RTL6i: Verify that publishing with JSON object data succeeds"""
807- ably = await TestApp .get_ably_realtime ()
817+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
808818 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
809819
810820 channel = ably .channels .get ('test_json_object_channel' )
@@ -823,7 +833,7 @@ async def test_publish_with_json_object_data(self):
823833
824834 async def test_publish_with_json_array_data (self ):
825835 """RTL6i: Verify that publishing with JSON array data succeeds"""
826- ably = await TestApp .get_ably_realtime ()
836+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
827837 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
828838
829839 channel = ably .channels .get ('test_json_array_channel' )
@@ -837,7 +847,7 @@ async def test_publish_with_json_array_data(self):
837847
838848 async def test_publish_with_null_data (self ):
839849 """RTL6i3: Verify that publishing with null data succeeds"""
840- ably = await TestApp .get_ably_realtime ()
850+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
841851 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
842852
843853 channel = ably .channels .get ('test_null_data_channel' )
@@ -850,7 +860,7 @@ async def test_publish_with_null_data(self):
850860
851861 async def test_publish_with_null_name (self ):
852862 """RTL6i3: Verify that publishing with null name succeeds"""
853- ably = await TestApp .get_ably_realtime ()
863+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
854864 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
855865
856866 channel = ably .channels .get ('test_null_name_channel' )
@@ -863,7 +873,7 @@ async def test_publish_with_null_name(self):
863873
864874 async def test_publish_message_array (self ):
865875 """RTL6i2: Verify that publishing an array of messages succeeds"""
866- ably = await TestApp .get_ably_realtime ()
876+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
867877 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
868878
869879 channel = ably .channels .get ('test_message_array_channel' )
@@ -882,7 +892,7 @@ async def test_publish_message_array(self):
882892 # RTL6c4 - Channel state validation tests
883893 async def test_publish_fails_on_suspended_channel (self ):
884894 """RTL6c4: Verify that publishing on a SUSPENDED channel fails"""
885- ably = await TestApp .get_ably_realtime ()
895+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
886896 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
887897
888898 channel = ably .channels .get ('test_suspended_channel' )
@@ -905,7 +915,7 @@ async def test_publish_fails_on_suspended_channel(self):
905915
906916 async def test_publish_fails_on_failed_channel (self ):
907917 """RTL6c4: Verify that publishing on a FAILED channel fails"""
908- ably = await TestApp .get_ably_realtime ()
918+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
909919 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
910920
911921 channel = ably .channels .get ('test_failed_channel' )
@@ -929,10 +939,10 @@ async def test_publish_fails_on_failed_channel(self):
929939 # RSL1k - Idempotent publishing test
930940 async def test_idempotent_realtime_publishing (self ):
931941 """RSL1k2, RSL1k5: Verify that messages with explicit IDs can be published for idempotent behavior"""
932- ably = await TestApp .get_ably_realtime ()
942+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
933943 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
934944
935- channel = ably .channels .get ('test_idempotent_channel ' )
945+ channel = ably .channels .get (f'test_idempotent_channel_ { self . use_binary_protocol } ' )
936946 await channel .attach ()
937947
938948 idempotent_id = 'test-msg-id-12345'
@@ -980,7 +990,7 @@ def on_message(message):
980990 async def test_publish_with_encryption (self ):
981991 """Verify that encrypted messages can be published and received correctly"""
982992 # Create connection with binary protocol enabled
983- ably = await TestApp .get_ably_realtime (use_binary_protocol = True )
993+ ably = await TestApp .get_ably_realtime (use_binary_protocol = self . use_binary_protocol )
984994 await asyncio .wait_for (ably .connection .once_async (ConnectionState .CONNECTED ), timeout = 5 )
985995
986996 # Get channel with encryption enabled
@@ -994,7 +1004,6 @@ async def test_publish_with_encryption(self):
9941004 def on_message (message ):
9951005 nonlocal received_data
9961006 try :
997- # message.decode()
9981007 received_data = message .data
9991008 data_received .finish ()
10001009 except Exception as e :
0 commit comments