diff --git a/main.ts b/main.ts index 252149d..50a1c3c 100644 --- a/main.ts +++ b/main.ts @@ -180,22 +180,20 @@ class EagerEngineScheduler { class ClusterManager { private nodes: Map; private localId: string; - private localAddr: string; - constructor(localId: string, localAddr: string) { + constructor(localId: string, _localAddr: string) { this.nodes = new Map(); this.localId = localId; - this.localAddr = localAddr; - } - - getAllOtherNodes(): NodeInfo[] { - return this.getAllNodes().filter((node) => node.id !== this.localId); } getAllNodes(): NodeInfo[] { return Array.from(this.nodes.values()); } + getAllOtherNodes(): NodeInfo[] { + return this.getAllNodes().filter((node) => node.id !== this.localId); + } + getAllNodeInfo(): Node[] { return this.getAllNodes().map((node) => ({ id: node.id, addr: node.addr })); } @@ -243,13 +241,11 @@ interface NetworkHandlerCallbacks { class NetworkHandler { private server: grpc.Server; - private id: string; private addr: string; private callbacks: NetworkHandlerCallbacks; - constructor(addr: string, id: string, callbacks: NetworkHandlerCallbacks) { + constructor(addr: string, _id: string, callbacks: NetworkHandlerCallbacks) { this.server = new grpc.Server(); - this.id = id; this.addr = addr; this.callbacks = callbacks; } @@ -408,27 +404,22 @@ class EagerNode { async joinCluster(joinAddr: string): Promise { const client = this.clusterManager.createNodeClient(joinAddr); - try { - const nodes = await this.networkHandler.callList(client); - const currentNode = { id: this.id, addr: this.addr }; + const nodes = await this.networkHandler.callList(client); + const currentNode = { id: this.id, addr: this.addr }; - for (const node of nodes) { - if (node.id !== this.id) { - const { client: nodeClient } = this.clusterManager.addNode(node.id, node.addr); - await this.networkHandler.callJoin(currentNode, nodeClient); + for (const node of nodes) { + if (node.id !== this.id) { + const { client: nodeClient } = this.clusterManager.addNode(node.id, node.addr); + await this.networkHandler.callJoin(currentNode, nodeClient); - await this.syncDataWithNode(nodeClient); + await this.syncDataWithNode(nodeClient); - console.log(`Joined node: ${node.id} at ${node.addr}`); - } + console.log(`Joining node: ${node.id} at ${node.addr}`); } - - client.cluster.close(); - client.engine.close(); - } catch (error) { - console.error(`Failed to join cluster at ${joinAddr}:`, error); - throw error; } + + client.cluster.close(); + client.engine.close(); } private async leaveCluster(): Promise { @@ -436,13 +427,9 @@ class EagerNode { const otherNodes = this.clusterManager.getAllOtherNodes(); for (const node of otherNodes) { - try { - await this.networkHandler.callLeave(currentNode, node.client); - this.clusterManager.removeNode(node.id); - console.log(`Left node: ${node.id}`); - } catch (error) { - console.error(`Failed to leave node ${node.id}:`, error); - } + await this.networkHandler.callLeave(currentNode, node.client); + this.clusterManager.removeNode(node.id); + console.log(`Leaving node: ${node.id}`); } } @@ -503,28 +490,20 @@ class EagerNode { } const otherNodes = this.clusterManager.getAllOtherNodes(); for (const node of otherNodes) { - try { - await this.networkHandler.callPushData(results, node.client); - } catch (error) { - console.error(`Failed to push data to node ${node.id}:`, error); - } + await this.networkHandler.callPushData(results, node.client); } } private async syncDataWithNode(client: NodeClient): Promise { - try { - const localData = this.engineScheduler.getData(); - if (localData.length > 0) { - await this.networkHandler.callPushData(localData, client); - } + const localData = this.engineScheduler.getData(); + if (localData.length > 0) { + await this.networkHandler.callPushData(localData, client); + } - const remoteData = await this.networkHandler.callPullData(client); - if (remoteData.length > 0) { - this.engineScheduler.addDataBatch(remoteData); - console.log(`Synced ${remoteData.length} data items from remote`); - } - } catch (error) { - console.error("Data sync failed:", error); + const remoteData = await this.networkHandler.callPullData(client); + if (remoteData.length > 0) { + this.engineScheduler.addDataBatch(remoteData); + console.log(`Synced ${remoteData.length} data items from remote`); } } @@ -545,11 +524,7 @@ class EagerNode { const otherNodes = this.clusterManager.getAllOtherNodes(); for (const node of otherNodes) { - try { - await this.networkHandler.callPushData([formatted], node.client); - } catch (error) { - console.error(`Failed to push input to node ${node.id}:`, error); - } + await this.networkHandler.callPushData([formatted], node.client); } } }); @@ -586,10 +561,6 @@ class EagerNode { console.log("========================\n"); }); } - - getNodeInfo(): { id: string; addr: string } { - return { id: this.id, addr: this.addr }; - } } function isIntegerString(str: string): boolean { @@ -614,23 +585,17 @@ async function main() { const engine = new AtsdsSearchEngine(); const node = new EagerNode(engine, listenAddr); - try { - await node.start(); - - if (process.argv.length === 4) { - const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1"); - console.log(`Joining cluster at ${joinAddr}...`); - await node.joinCluster(joinAddr); - } else { - console.log("Starting as first node in new cluster..."); - } + await node.start(); - process.stdin.resume(); - } catch (error) { - console.error("Failed to start node:", error); - await node.stop(); - process.exit(1); + if (process.argv.length === 4) { + const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1"); + console.log(`Joining cluster at ${joinAddr}...`); + await node.joinCluster(joinAddr); + } else { + console.log("Starting as first node in new cluster..."); } + + process.stdin.resume(); } if (import.meta.main) {