@@ -180,22 +180,20 @@ class EagerEngineScheduler {
180180class ClusterManager {
181181 private nodes : Map < string , NodeInfo > ;
182182 private localId : string ;
183- private localAddr : string ;
184183
185- constructor ( localId : string , localAddr : string ) {
184+ constructor ( localId : string , _localAddr : string ) {
186185 this . nodes = new Map ( ) ;
187186 this . localId = localId ;
188- this . localAddr = localAddr ;
189- }
190-
191- getAllOtherNodes ( ) : NodeInfo [ ] {
192- return this . getAllNodes ( ) . filter ( ( node ) => node . id !== this . localId ) ;
193187 }
194188
195189 getAllNodes ( ) : NodeInfo [ ] {
196190 return Array . from ( this . nodes . values ( ) ) ;
197191 }
198192
193+ getAllOtherNodes ( ) : NodeInfo [ ] {
194+ return this . getAllNodes ( ) . filter ( ( node ) => node . id !== this . localId ) ;
195+ }
196+
199197 getAllNodeInfo ( ) : Node [ ] {
200198 return this . getAllNodes ( ) . map ( ( node ) => ( { id : node . id , addr : node . addr } ) ) ;
201199 }
@@ -243,13 +241,11 @@ interface NetworkHandlerCallbacks {
243241
244242class NetworkHandler {
245243 private server : grpc . Server ;
246- private id : string ;
247244 private addr : string ;
248245 private callbacks : NetworkHandlerCallbacks ;
249246
250- constructor ( addr : string , id : string , callbacks : NetworkHandlerCallbacks ) {
247+ constructor ( addr : string , _id : string , callbacks : NetworkHandlerCallbacks ) {
251248 this . server = new grpc . Server ( ) ;
252- this . id = id ;
253249 this . addr = addr ;
254250 this . callbacks = callbacks ;
255251 }
@@ -408,41 +404,32 @@ class EagerNode {
408404 async joinCluster ( joinAddr : string ) : Promise < void > {
409405 const client = this . clusterManager . createNodeClient ( joinAddr ) ;
410406
411- try {
412- const nodes = await this . networkHandler . callList ( client ) ;
413- const currentNode = { id : this . id , addr : this . addr } ;
407+ const nodes = await this . networkHandler . callList ( client ) ;
408+ const currentNode = { id : this . id , addr : this . addr } ;
414409
415- for ( const node of nodes ) {
416- if ( node . id !== this . id ) {
417- const { client : nodeClient } = this . clusterManager . addNode ( node . id , node . addr ) ;
418- await this . networkHandler . callJoin ( currentNode , nodeClient ) ;
410+ for ( const node of nodes ) {
411+ if ( node . id !== this . id ) {
412+ const { client : nodeClient } = this . clusterManager . addNode ( node . id , node . addr ) ;
413+ await this . networkHandler . callJoin ( currentNode , nodeClient ) ;
419414
420- await this . syncDataWithNode ( nodeClient ) ;
415+ await this . syncDataWithNode ( nodeClient ) ;
421416
422- console . log ( `Joined node: ${ node . id } at ${ node . addr } ` ) ;
423- }
417+ console . log ( `Joining node: ${ node . id } at ${ node . addr } ` ) ;
424418 }
425-
426- client . cluster . close ( ) ;
427- client . engine . close ( ) ;
428- } catch ( error ) {
429- console . error ( `Failed to join cluster at ${ joinAddr } :` , error ) ;
430- throw error ;
431419 }
420+
421+ client . cluster . close ( ) ;
422+ client . engine . close ( ) ;
432423 }
433424
434425 private async leaveCluster ( ) : Promise < void > {
435426 const currentNode = { id : this . id , addr : this . addr } ;
436427 const otherNodes = this . clusterManager . getAllOtherNodes ( ) ;
437428
438429 for ( const node of otherNodes ) {
439- try {
440- await this . networkHandler . callLeave ( currentNode , node . client ) ;
441- this . clusterManager . removeNode ( node . id ) ;
442- console . log ( `Left node: ${ node . id } ` ) ;
443- } catch ( error ) {
444- console . error ( `Failed to leave node ${ node . id } :` , error ) ;
445- }
430+ await this . networkHandler . callLeave ( currentNode , node . client ) ;
431+ this . clusterManager . removeNode ( node . id ) ;
432+ console . log ( `Leaving node: ${ node . id } ` ) ;
446433 }
447434 }
448435
@@ -503,28 +490,20 @@ class EagerNode {
503490 }
504491 const otherNodes = this . clusterManager . getAllOtherNodes ( ) ;
505492 for ( const node of otherNodes ) {
506- try {
507- await this . networkHandler . callPushData ( results , node . client ) ;
508- } catch ( error ) {
509- console . error ( `Failed to push data to node ${ node . id } :` , error ) ;
510- }
493+ await this . networkHandler . callPushData ( results , node . client ) ;
511494 }
512495 }
513496
514497 private async syncDataWithNode ( client : NodeClient ) : Promise < void > {
515- try {
516- const localData = this . engineScheduler . getData ( ) ;
517- if ( localData . length > 0 ) {
518- await this . networkHandler . callPushData ( localData , client ) ;
519- }
498+ const localData = this . engineScheduler . getData ( ) ;
499+ if ( localData . length > 0 ) {
500+ await this . networkHandler . callPushData ( localData , client ) ;
501+ }
520502
521- const remoteData = await this . networkHandler . callPullData ( client ) ;
522- if ( remoteData . length > 0 ) {
523- this . engineScheduler . addDataBatch ( remoteData ) ;
524- console . log ( `Synced ${ remoteData . length } data items from remote` ) ;
525- }
526- } catch ( error ) {
527- console . error ( "Data sync failed:" , error ) ;
503+ const remoteData = await this . networkHandler . callPullData ( client ) ;
504+ if ( remoteData . length > 0 ) {
505+ this . engineScheduler . addDataBatch ( remoteData ) ;
506+ console . log ( `Synced ${ remoteData . length } data items from remote` ) ;
528507 }
529508 }
530509
@@ -545,11 +524,7 @@ class EagerNode {
545524
546525 const otherNodes = this . clusterManager . getAllOtherNodes ( ) ;
547526 for ( const node of otherNodes ) {
548- try {
549- await this . networkHandler . callPushData ( [ formatted ] , node . client ) ;
550- } catch ( error ) {
551- console . error ( `Failed to push input to node ${ node . id } :` , error ) ;
552- }
527+ await this . networkHandler . callPushData ( [ formatted ] , node . client ) ;
553528 }
554529 }
555530 } ) ;
@@ -586,10 +561,6 @@ class EagerNode {
586561 console . log ( "========================\n" ) ;
587562 } ) ;
588563 }
589-
590- getNodeInfo ( ) : { id : string ; addr : string } {
591- return { id : this . id , addr : this . addr } ;
592- }
593564}
594565
595566function isIntegerString ( str : string ) : boolean {
@@ -614,23 +585,17 @@ async function main() {
614585 const engine = new AtsdsSearchEngine ( ) ;
615586 const node = new EagerNode ( engine , listenAddr ) ;
616587
617- try {
618- await node . start ( ) ;
619-
620- if ( process . argv . length === 4 ) {
621- const joinAddr = addAddressPrefixForPort ( process . argv [ 3 ] , "127.0.0.1" ) ;
622- console . log ( `Joining cluster at ${ joinAddr } ...` ) ;
623- await node . joinCluster ( joinAddr ) ;
624- } else {
625- console . log ( "Starting as first node in new cluster..." ) ;
626- }
588+ await node . start ( ) ;
627589
628- process . stdin . resume ( ) ;
629- } catch ( error ) {
630- console . error ( "Failed to start node:" , error ) ;
631- await node . stop ( ) ;
632- process . exit ( 1 ) ;
590+ if ( process . argv . length === 4 ) {
591+ const joinAddr = addAddressPrefixForPort ( process . argv [ 3 ] , "127.0.0.1" ) ;
592+ console . log ( `Joining cluster at ${ joinAddr } ...` ) ;
593+ await node . joinCluster ( joinAddr ) ;
594+ } else {
595+ console . log ( "Starting as first node in new cluster..." ) ;
633596 }
597+
598+ process . stdin . resume ( ) ;
634599}
635600
636601if ( import . meta. main ) {
0 commit comments