Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 39 additions & 74 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,20 @@ class EagerEngineScheduler {
class ClusterManager {
private nodes: Map<string, NodeInfo>;
private localId: string;
private localAddr: string;

constructor(localId: string, localAddr: string) {
constructor(localId: string, _localAddr: string) {
this.nodes = new Map();
this.localId = localId;
this.localAddr = localAddr;
}

getAllOtherNodes(): NodeInfo[] {
return this.getAllNodes().filter((node) => node.id !== this.localId);
}

getAllNodes(): NodeInfo[] {
return Array.from(this.nodes.values());
}

getAllOtherNodes(): NodeInfo[] {
return this.getAllNodes().filter((node) => node.id !== this.localId);
}

getAllNodeInfo(): Node[] {
return this.getAllNodes().map((node) => ({ id: node.id, addr: node.addr }));
}
Expand Down Expand Up @@ -243,13 +241,11 @@ interface NetworkHandlerCallbacks {

class NetworkHandler {
private server: grpc.Server;
private id: string;
private addr: string;
private callbacks: NetworkHandlerCallbacks;

constructor(addr: string, id: string, callbacks: NetworkHandlerCallbacks) {
constructor(addr: string, _id: string, callbacks: NetworkHandlerCallbacks) {
this.server = new grpc.Server();
this.id = id;
this.addr = addr;
this.callbacks = callbacks;
}
Expand Down Expand Up @@ -408,41 +404,32 @@ class EagerNode {
async joinCluster(joinAddr: string): Promise<void> {
const client = this.clusterManager.createNodeClient(joinAddr);

try {
const nodes = await this.networkHandler.callList(client);
const currentNode = { id: this.id, addr: this.addr };
const nodes = await this.networkHandler.callList(client);
const currentNode = { id: this.id, addr: this.addr };

for (const node of nodes) {
if (node.id !== this.id) {
const { client: nodeClient } = this.clusterManager.addNode(node.id, node.addr);
await this.networkHandler.callJoin(currentNode, nodeClient);
for (const node of nodes) {
if (node.id !== this.id) {
const { client: nodeClient } = this.clusterManager.addNode(node.id, node.addr);
await this.networkHandler.callJoin(currentNode, nodeClient);

await this.syncDataWithNode(nodeClient);
await this.syncDataWithNode(nodeClient);

console.log(`Joined node: ${node.id} at ${node.addr}`);
}
console.log(`Joining node: ${node.id} at ${node.addr}`);
}

client.cluster.close();
client.engine.close();
} catch (error) {
console.error(`Failed to join cluster at ${joinAddr}:`, error);
throw error;
}

client.cluster.close();
client.engine.close();
}

private async leaveCluster(): Promise<void> {
const currentNode = { id: this.id, addr: this.addr };
const otherNodes = this.clusterManager.getAllOtherNodes();

for (const node of otherNodes) {
try {
await this.networkHandler.callLeave(currentNode, node.client);
this.clusterManager.removeNode(node.id);
console.log(`Left node: ${node.id}`);
} catch (error) {
console.error(`Failed to leave node ${node.id}:`, error);
}
await this.networkHandler.callLeave(currentNode, node.client);
this.clusterManager.removeNode(node.id);
console.log(`Leaving node: ${node.id}`);
}
}

Expand Down Expand Up @@ -503,28 +490,20 @@ class EagerNode {
}
const otherNodes = this.clusterManager.getAllOtherNodes();
for (const node of otherNodes) {
try {
await this.networkHandler.callPushData(results, node.client);
} catch (error) {
console.error(`Failed to push data to node ${node.id}:`, error);
}
await this.networkHandler.callPushData(results, node.client);
}
}

private async syncDataWithNode(client: NodeClient): Promise<void> {
try {
const localData = this.engineScheduler.getData();
if (localData.length > 0) {
await this.networkHandler.callPushData(localData, client);
}
const localData = this.engineScheduler.getData();
if (localData.length > 0) {
await this.networkHandler.callPushData(localData, client);
}

const remoteData = await this.networkHandler.callPullData(client);
if (remoteData.length > 0) {
this.engineScheduler.addDataBatch(remoteData);
console.log(`Synced ${remoteData.length} data items from remote`);
}
} catch (error) {
console.error("Data sync failed:", error);
const remoteData = await this.networkHandler.callPullData(client);
if (remoteData.length > 0) {
this.engineScheduler.addDataBatch(remoteData);
console.log(`Synced ${remoteData.length} data items from remote`);
}
}

Expand All @@ -545,11 +524,7 @@ class EagerNode {

const otherNodes = this.clusterManager.getAllOtherNodes();
for (const node of otherNodes) {
try {
await this.networkHandler.callPushData([formatted], node.client);
} catch (error) {
console.error(`Failed to push input to node ${node.id}:`, error);
}
await this.networkHandler.callPushData([formatted], node.client);
}
}
});
Expand Down Expand Up @@ -586,10 +561,6 @@ class EagerNode {
console.log("========================\n");
});
}

getNodeInfo(): { id: string; addr: string } {
return { id: this.id, addr: this.addr };
}
}

function isIntegerString(str: string): boolean {
Expand All @@ -614,23 +585,17 @@ async function main() {
const engine = new AtsdsSearchEngine();
const node = new EagerNode(engine, listenAddr);

try {
await node.start();

if (process.argv.length === 4) {
const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1");
console.log(`Joining cluster at ${joinAddr}...`);
await node.joinCluster(joinAddr);
} else {
console.log("Starting as first node in new cluster...");
}
await node.start();

process.stdin.resume();
} catch (error) {
console.error("Failed to start node:", error);
await node.stop();
process.exit(1);
if (process.argv.length === 4) {
const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1");
console.log(`Joining cluster at ${joinAddr}...`);
await node.joinCluster(joinAddr);
} else {
console.log("Starting as first node in new cluster...");
}

process.stdin.resume();
}

if (import.meta.main) {
Expand Down
Loading