Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit e810e2c

Browse files
Copilothzhangxyz
andauthored
Decouple EagerNode and Search via EagerEngine interface (#18)
* Initial plan * Decouple ClusterNode and Search by introducing EagerEngine interface - Renamed ClusterNode to EagerNode - Created EagerEngine interface with input, output, and meta methods - Updated Search class to implement EagerEngine interface - Changed EagerNode constructor to accept EagerEngine instance - Removed direct coupling between EagerNode and Search - Fixed all non-null assertion usages to properly handle null values Co-authored-by: hzhangxyz <[email protected]> * Update main.ts. --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: hzhangxyz <[email protected]> Co-authored-by: Hao Zhang <[email protected]>
1 parent 4751c00 commit e810e2c

File tree

1 file changed

+94
-50
lines changed

1 file changed

+94
-50
lines changed

main.ts

Lines changed: 94 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,38 @@ interface NodeInfoWithClient {
4646
client: NodeClient;
4747
}
4848

49+
/**
50+
* 积极引擎接口
51+
* 定义引擎必须实现的基本操作
52+
*/
53+
interface EagerEngine {
54+
/**
55+
* 接收数据
56+
* @param {string} data - 输入数据
57+
* @returns {string | null} - 如果接受返回格式化后的数据,否则返回 null
58+
*/
59+
input(data: string): string | null;
60+
61+
/**
62+
* 执行一轮搜索
63+
* @param {function} callback - 处理结果的回调函数
64+
* @returns {number} - 搜索结果数量
65+
*/
66+
output(callback: (result: string) => void): number;
67+
68+
/**
69+
* 获取引擎元数据
70+
* @returns {Object} - 包含输入和输出模式的对象
71+
*/
72+
meta(): { input: string[]; output: string[] };
73+
}
74+
4975
/**
5076
* 搜索引擎类
5177
* 扩展 ATSDS 的 Search 类,提供字符串形式的规则处理
78+
* 实现 EagerEngine 接口
5279
*/
53-
class Search extends Search_ {
80+
class Search extends Search_ implements EagerEngine {
5481
/**
5582
* 构造函数
5683
* @param {number} limit_size - Size of the buffer for storing the final objects (rules/facts) in the knowledge base (default: 1000)
@@ -99,28 +126,27 @@ class Search extends Search_ {
99126
}
100127

101128
/**
102-
* 集群节点类
129+
* 积极节点类
103130
* 管理分布式搜索引擎集群中的单个节点
104131
*/
105-
class ClusterNode {
106-
private id: string;
132+
class EagerNode {
133+
private engine: EagerEngine;
107134
private addr: string;
108-
private engine: Search;
135+
private id: string;
109136
private server: grpc.Server;
110137
private nodes: Map<string, NodeInfoWithClient>;
111138
private data: Set<string>;
112139

113140
/**
114141
* 构造函数
142+
* @param {EagerEngine} engine - 实现 EagerEngine 接口的引擎实例
115143
* @param {string} addr - 节点绑定地址
116144
* @param {string} id - 节点唯一标识符,默认随机生成
117-
* @param {number} limit_size - 搜索引擎的限制大小参数,默认 1000
118-
* @param {number} buffer_size - 搜索引擎的缓冲区大小参数,默认 10000
119145
*/
120-
constructor(addr: string, id: string = randomUUID(), limit_size: number = 1000, buffer_size: number = 10000) {
121-
this.id = id;
146+
constructor(engine: EagerEngine, addr: string, id: string = randomUUID()) {
147+
this.engine = engine;
122148
this.addr = addr;
123-
this.engine = new Search(limit_size, buffer_size);
149+
this.id = id;
124150
this.server = new grpc.Server();
125151
this.nodes = new Map();
126152
this.data = new Set();
@@ -148,11 +174,13 @@ class ClusterNode {
148174
if (data.length > 0) {
149175
for (const id of this.nodes.keys()) {
150176
if (id !== this.id) {
151-
const node = this.nodes.get(id)!;
152-
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
153-
node.client.engine.pushData,
154-
).bind(node.client.engine);
155-
await pushDataAsync({ data });
177+
const node = this.nodes.get(id);
178+
if (node) {
179+
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
180+
node.client.engine.pushData,
181+
).bind(node.client.engine);
182+
await pushDataAsync({ data });
183+
}
156184
}
157185
}
158186
}
@@ -186,11 +214,13 @@ class ClusterNode {
186214
console.log(`Received input: ${formattedLine}`);
187215
for (const id of this.nodes.keys()) {
188216
if (id !== this.id) {
189-
const node = this.nodes.get(id)!;
190-
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
191-
node.client.engine.pushData,
192-
).bind(node.client.engine);
193-
await pushDataAsync({ data: [formattedLine] });
217+
const node = this.nodes.get(id);
218+
if (node) {
219+
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
220+
node.client.engine.pushData,
221+
).bind(node.client.engine);
222+
await pushDataAsync({ data: [formattedLine] });
223+
}
194224
}
195225
}
196226
});
@@ -237,9 +267,9 @@ class ClusterNode {
237267
/**
238268
* 启动节点监听服务
239269
* 创建 gRPC 服务器并注册集群管理和引擎服务
240-
* @returns {ClusterNode} 返回当前节点实例
270+
* @returns {EagerNode} 返回当前节点实例
241271
*/
242-
async listen(): Promise<ClusterNode> {
272+
async listen(): Promise<EagerNode> {
243273
this.server.addService(ClusterService, {
244274
/**
245275
* 处理节点加入请求
@@ -249,11 +279,13 @@ class ClusterNode {
249279
call: grpc.ServerUnaryCall<JoinRequest, JoinResponse>,
250280
callback: grpc.sendUnaryData<JoinResponse>,
251281
) => {
252-
const node = call.request.node!;
253-
const { id, addr } = node;
254-
if (!this.nodes.has(id)) {
255-
this.nodes.set(id, this.nodeInfo(id, addr));
256-
console.log(`Joined node: ${id} at ${addr}`);
282+
const node = call.request.node;
283+
if (node) {
284+
const { id, addr } = node;
285+
if (!this.nodes.has(id)) {
286+
this.nodes.set(id, this.nodeInfo(id, addr));
287+
console.log(`Joined node: ${id} at ${addr}`);
288+
}
257289
}
258290
callback(null, {});
259291
},
@@ -265,11 +297,13 @@ class ClusterNode {
265297
call: grpc.ServerUnaryCall<LeaveRequest, LeaveResponse>,
266298
callback: grpc.sendUnaryData<LeaveResponse>,
267299
) => {
268-
const node = call.request.node!;
269-
const { id, addr } = node;
270-
if (this.nodes.has(id)) {
271-
this.nodes.delete(id);
272-
console.log(`Left node: ${id} at ${addr}`);
300+
const node = call.request.node;
301+
if (node) {
302+
const { id, addr } = node;
303+
if (this.nodes.has(id)) {
304+
this.nodes.delete(id);
305+
console.log(`Left node: ${id} at ${addr}`);
306+
}
273307
}
274308
callback(null, {});
275309
},
@@ -315,11 +349,15 @@ class ClusterNode {
315349
call: grpc.ServerUnaryCall<PushDataRequest, PushDataResponse>,
316350
callback: grpc.sendUnaryData<PushDataResponse>,
317351
) => {
318-
const data = call.request.data!;
319-
for (const item of data) {
320-
const formattedItem = this.engine.input(item)!;
321-
this.data.add(formattedItem);
322-
console.log(`Received data: ${item}`);
352+
const data = call.request.data;
353+
if (data) {
354+
for (const item of data) {
355+
const formattedItem = this.engine.input(item);
356+
if (formattedItem !== null) {
357+
this.data.add(formattedItem);
358+
console.log(`Received data: ${item}`);
359+
}
360+
}
323361
}
324362
callback(null, {});
325363
},
@@ -389,9 +427,11 @@ class ClusterNode {
389427
const dataResponse = await pullAsync({});
390428
if (dataResponse.data) {
391429
for (const item of dataResponse.data) {
392-
const formattedItem = this.engine.input(item)!;
393-
this.data.add(formattedItem);
394-
console.log(`Receiving data: ${item}`);
430+
const formattedItem = this.engine.input(item);
431+
if (formattedItem !== null) {
432+
this.data.add(formattedItem);
433+
console.log(`Receiving data: ${item}`);
434+
}
395435
}
396436
}
397437
console.log(`Joining node ${node.id} at ${node.addr}`);
@@ -405,14 +445,16 @@ class ClusterNode {
405445
async leave(): Promise<void> {
406446
for (const id of this.nodes.keys()) {
407447
if (id !== this.id) {
408-
const node = this.nodes.get(id)!;
409-
const leaveAsync = promisify<LeaveRequest, LeaveResponse>(node.client.cluster.leave).bind(
410-
node.client.cluster,
411-
);
412-
await leaveAsync({ node: { id: this.id, addr: this.addr } });
413-
node.client.cluster.close();
414-
node.client.engine.close();
415-
console.log(`Leaving node ${node.id} at ${node.addr}`);
448+
const node = this.nodes.get(id);
449+
if (node) {
450+
const leaveAsync = promisify<LeaveRequest, LeaveResponse>(node.client.cluster.leave).bind(
451+
node.client.cluster,
452+
);
453+
await leaveAsync({ node: { id: this.id, addr: this.addr } });
454+
node.client.cluster.close();
455+
node.client.engine.close();
456+
console.log(`Leaving node ${node.id} at ${node.addr}`);
457+
}
416458
}
417459
}
418460
}
@@ -464,7 +506,8 @@ if (process.argv.length === 4) {
464506
const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0");
465507
const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1");
466508
console.log(`Starting node at ${listenAddr} and joining ${joinAddr}`);
467-
const node = new ClusterNode(listenAddr);
509+
const engine = new Search();
510+
const node = new EagerNode(engine, listenAddr);
468511
await node.listen();
469512
await node.join(await node.list(joinAddr));
470513
}
@@ -476,6 +519,7 @@ if (process.argv.length === 4) {
476519
if (process.argv.length === 3) {
477520
const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0");
478521
console.log(`Starting node at ${listenAddr}`);
479-
const node = new ClusterNode(listenAddr);
522+
const engine = new Search();
523+
const node = new EagerNode(engine, listenAddr);
480524
await node.listen();
481525
}

0 commit comments

Comments
 (0)