@@ -113,6 +113,66 @@ impl<A: Adapter> Client<A> {
113113 }
114114 }
115115
116+ //just add latency to connect for test purpose
117+ #[ cfg( feature = "__test_harness" ) ]
118+ fn sock_connect_for_test (
119+ self : & Arc < Self > ,
120+ auth : Option < Value > ,
121+ ns_path : & str ,
122+ esocket : & Arc < engineioxide:: Socket < SocketData < A > > > ,
123+ ) {
124+ #[ cfg( feature = "tracing" ) ]
125+ tracing:: debug!( "auth: {:?}" , auth) ;
126+ let protocol: ProtocolVersion = esocket. protocol . into ( ) ;
127+ let connect = async move |ns : Arc < Namespace < A > > , esocket : Arc < EIoSocket < SocketData < A > > > | {
128+ // add latency to connect
129+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 3 ) ) . await ;
130+ if ns. connect ( esocket. id , esocket. clone ( ) , auth) . await . is_ok ( ) {
131+ // cancel the connect timeout task for v5
132+ if let Some ( tx) = esocket. data . connect_recv_tx . lock ( ) . unwrap ( ) . take ( ) {
133+ tx. send ( ( ) ) . ok ( ) ;
134+ }
135+ }
136+ } ;
137+
138+ if let Some ( ns) = self . get_ns ( ns_path) {
139+ tokio:: spawn ( connect ( ns, esocket. clone ( ) ) ) ;
140+ } else if let Ok ( Match { value : ns_ctr, .. } ) = self . router . read ( ) . unwrap ( ) . at ( ns_path) {
141+ let path = Str :: copy_from_slice ( ns_path) ;
142+ let ns = ns_ctr. get_new_ns ( path. clone ( ) , & self . adapter_state , & self . config ) ;
143+ let this = self . clone ( ) ;
144+ let esocket = esocket. clone ( ) ;
145+ let adapter = ns. adapter . clone ( ) ;
146+ let on_success = move || {
147+ this. nsps . write ( ) . unwrap ( ) . insert ( path, ns. clone ( ) ) ;
148+ tokio:: spawn ( connect ( ns, esocket) ) ;
149+ } ;
150+ // We "ask" the adapter implementation to manage the init response itself
151+ socketioxide_core:: adapter:: Spawnable :: spawn ( adapter. init ( on_success) ) ;
152+ } else if protocol == ProtocolVersion :: V4 && ns_path == "/" {
153+ #[ cfg( feature = "tracing" ) ]
154+ tracing:: error!(
155+ "the root namespace \" /\" must be defined before any connection for protocol V4 (legacy)!"
156+ ) ;
157+ esocket. close ( EIoDisconnectReason :: TransportClose ) ;
158+ } else {
159+ let path = Str :: copy_from_slice ( ns_path) ;
160+ let packet = self
161+ . parser ( )
162+ . encode ( Packet :: connect_error ( path, "Invalid namespace" ) ) ;
163+ let _ = match packet {
164+ Value :: Str ( p, _) => esocket. emit ( p) . map_err ( |_e| {
165+ #[ cfg( feature = "tracing" ) ]
166+ tracing:: error!( "error while sending invalid namespace packet: {}" , _e) ;
167+ } ) ,
168+ Value :: Bytes ( p) => esocket. emit_binary ( p) . map_err ( |_e| {
169+ #[ cfg( feature = "tracing" ) ]
170+ tracing:: error!( "error while sending invalid namespace packet: {}" , _e) ;
171+ } ) ,
172+ } ;
173+ }
174+ }
175+
116176 /// Propagate a packet to its target namespace
117177 fn sock_propagate_packet ( & self , packet : Packet , sid : Sid ) -> Result < ( ) , Error > {
118178 if let Some ( ns) = self . get_ns ( & packet. ns ) {
@@ -426,6 +486,7 @@ mod test {
426486 use tokio:: sync:: mpsc;
427487
428488 use crate :: adapter:: LocalAdapter ;
489+ use std:: time:: Duration ;
429490 const CONNECT_TIMEOUT : std:: time:: Duration = std:: time:: Duration :: from_millis ( 50 ) ;
430491
431492 fn create_client ( ) -> Arc < super :: Client < LocalAdapter > > {
@@ -490,4 +551,50 @@ mod test {
490551 . await
491552 . unwrap_err ( ) ;
492553 }
554+ #[ derive( Debug , Clone ) ]
555+ struct MockConnectTimeoutHandler ( Arc < super :: Client < LocalAdapter > > ) ;
556+
557+ impl EngineIoHandler for MockConnectTimeoutHandler {
558+ type Data = <Client < LocalAdapter > as engineioxide:: handler:: EngineIoHandler >:: Data ;
559+
560+ fn on_connect ( self : Arc < Self > , socket : Arc < EIoSocket < Self :: Data > > ) {
561+ socket. data . io . set ( SocketIo :: from ( self . 0 . clone ( ) ) ) . ok ( ) ;
562+
563+ self . 0 . sock_connect_for_test ( None , "/" , & socket) ;
564+ }
565+
566+ fn on_disconnect ( & self , socket : Arc < EIoSocket < Self :: Data > > , reason : EIoDisconnectReason ) {
567+ self . 0 . clone ( ) . on_disconnect ( socket, reason) ;
568+ }
569+
570+ fn on_message ( self : & Arc < Self > , msg : Str , socket : Arc < EIoSocket < Self :: Data > > ) {
571+ self . 0 . clone ( ) . on_message ( msg, socket) ;
572+ }
573+
574+ fn on_binary ( self : & Arc < Self > , data : Bytes , socket : Arc < EIoSocket < Self :: Data > > ) {
575+ self . 0 . clone ( ) . on_binary ( data, socket) ;
576+ }
577+ }
578+
579+ #[ tokio:: test]
580+ async fn should_not_reserve_socket_if_connect_time_out ( ) {
581+ let client = create_client ( ) ;
582+ let client = Arc :: new ( MockConnectTimeoutHandler ( client) ) ;
583+ let sid = Sid :: new ( ) ;
584+ let sock = EIoSocket :: new_dummy ( sid, Box :: new ( move |_, _| { } ) ) ;
585+ // connect to ns but spawned fn `connect` will be stuck
586+ client. clone ( ) . on_connect ( sock. clone ( ) ) ;
587+ //spawned fn `connect` is stuck ,so client don't keep heartbeat
588+ client
589+ . clone ( )
590+ . on_disconnect ( sock. clone ( ) , EIoDisconnectReason :: HeartbeatTimeout ) ;
591+ //equal to engineio.close_session(_,_)
592+ sock. close_internal_rx ( ) . await ;
593+ //wait for spawned fn `connect(crates/socketioxide/src/client.rs:129)` finish
594+ tokio:: time:: sleep ( Duration :: from_secs ( 4 ) ) . await ;
595+ let guard = client. 0 . nsps . read ( ) . unwrap ( ) ;
596+ let ns = guard. get ( "/" ) . unwrap ( ) ;
597+ // ns should not reserve socket
598+ assert ! ( ns. get_socket( sid) . is_err( ) ) ;
599+ }
493600}
0 commit comments