From 0e418b4c39b9995f19a03e966a7b1b40b84638cc Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 17:42:05 +0100 Subject: [PATCH 01/11] BB-727: Fix comment typo --- extensions/notification/NotificationQueuePopulator.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/notification/NotificationQueuePopulator.js b/extensions/notification/NotificationQueuePopulator.js index d36e22c8a..60edb1362 100644 --- a/extensions/notification/NotificationQueuePopulator.js +++ b/extensions/notification/NotificationQueuePopulator.js @@ -108,7 +108,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { this.bnConfigManager.setConfig(bucketName, bnConfig); return undefined; } - // bucket was deleter or notification conf has been removed, so remove zk node + // bucket was deleted or notification conf has been removed, so remove zk node this.bnConfigManager.removeConfig(bucketName || bucket); return undefined; } From e665d0f1898516a377d0e555ad953d9ac5dd0f2e Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 17:42:47 +0100 Subject: [PATCH 02/11] BB-727: Better print buffer in logs Fix `key: { type: 'Buffer', data: [ 104, 101, 108, 108, 111 ] }` To `key: 'hello'` --- lib/BackbeatConsumer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index c6dbb97c8..b5afd3332 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -505,7 +505,7 @@ class BackbeatConsumer extends EventEmitter { const { topic, partition, offset, key, timestamp } = entry; this._log.debug('finished processing of consumed entry', { method: 'BackbeatConsumer.subscribe', - entry: { topic, partition, offset, key, timestamp }, + entry: { topic, partition, offset, key: key?.toString?.() || key, timestamp }, groupId: this._groupId, }); if (err) { From c8cd7e6bc8eaacfb2eb8d1e9032f24e6cd9ae913 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 17:45:42 +0100 Subject: [PATCH 03/11] BB-727: removeConfig only locally on NODE_DELETED The watcher NODE_DELETED already calls removeConfig with false Because it's already removed from zk if the event is received There is no need to send a request to zk --- .../notification/configManager/ZookeeperConfigManager.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index 603106ab8..1836b4677 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -402,16 +402,20 @@ class ZookeeperConfigManager extends BaseConfigManager { * Remove bucket notification configuration * * @param {String} bucket - bucket + * @param {boolean} [emitToZk = true] - whether to emit the event to zookeeper * @return {boolean} - true if removed */ - removeConfig(bucket) { + removeConfig(bucket, emitToZk = true) { try { this.log.debug('remove config', { method: 'ZookeeperConfigManager.removeConfig', bucket, + emitToZk, }); this._configs.delete(bucket); - this._emitter.emit('removeConfig', bucket); + if (emitToZk) { + this._emitter.emit('removeConfig', bucket); + } return true; } catch (err) { const errMsg From 5361492c4ae7fe91c01fae87c058ec0112091168 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 17:50:20 +0100 Subject: [PATCH 04/11] BB-727: Log bucket for invalid config error This kind of error can happen if data is undefined when the read is triggered too late when the znode is already removed --- .../notification/configManager/ZookeeperConfigManager.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index 1836b4677..428ed10e3 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -132,10 +132,10 @@ class ZookeeperConfigManager extends BaseConfigManager { return `/${constants.zkConfigParentNode}/${bucket}`; } - _getConfigDataFromBuffer(data) { + _getConfigDataFromBuffer(data, bucket) { const { error, result } = safeJsonParse(data); if (error) { - this.log.error('invalid config', { error, config: data }); + this.log.error('invalid config', { error, config: data, bucket }); return undefined; } return result; @@ -349,7 +349,7 @@ class ZookeeperConfigManager extends BaseConfigManager { if (err) { return next(err); } - const configObject = this._getConfigDataFromBuffer(data); + const configObject = this._getConfigDataFromBuffer(data, bucket); if (configObject) { this._configs.set(bucket, configObject); } From cea258a405f94088a09c6bb7db36016e35ff193a Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 18:34:48 +0100 Subject: [PATCH 05/11] BB-727: Check and warn event emitter leak early Otherwise we need a few buckets and bucket operations to trigger the event emitter listener limit warning at 10 listeners --- .../configManager/ZookeeperConfigManager.js | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index 428ed10e3..e2387256e 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -50,6 +50,27 @@ class ZookeeperConfigManager extends BaseConfigManager { this._setupEventListeners(); } + // https://github.com/alexguan/node-zookeeper-client/blob/master/lib/WatcherManager.js#L13-L39 + // watchers should be reapplied only when their event is triggered, otherwise they will be duplicated + // and the lib will keep adding listeners + // (even if the functions are the same, they are js object so they don't match) + // warn early to catch leak more easily instead of waiting event emitter limit at 10 listeners. + _warnZkWatchersLeak(type, path) { + const watcherManager = this._zkClient?.client?.connectionManager?.watcherManager; + + const watchers = watcherManager?.[`${type}Watchers`]?.[path]; + if (!watchers) { + return; + } + + const listeners = watchers.listenerCount('notification'); + if (listeners > 0) { + process.emitWarning(`${type}Watchers[${path}] has already ${listeners} listeners`, { + code: 'ZkWatchersLeak', + }); + } + } + _errorListener(error, listener) { this.log.error('ZookeeperConfigManager.emitter.error', { listener, @@ -150,6 +171,7 @@ class ZookeeperConfigManager extends BaseConfigManager { bucket, zkPath, }); + this._warnZkWatchersLeak('data', zkPath); return this._zkClient.getData(zkPath, event => { this.log.debug('zookeeper getData watcher triggered', { zkPath, @@ -319,6 +341,7 @@ class ZookeeperConfigManager extends BaseConfigManager { const method = 'ZookeeperConfigManager._listBucketsWithConfig'; const zkPath = `/${constants.zkConfigParentNode}`; + this._warnZkWatchersLeak('child', zkPath); this._zkClient.getChildren(zkPath, event => { this.log.debug('zookeeper getChildren watcher triggered', { zkPath, From c663635fd3de2500547d7e69c736795596d04ffe Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 18:37:20 +0100 Subject: [PATCH 06/11] BB-727: Log as debug the list of buckets --- .../notification/configManager/ZookeeperConfigManager.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index e2387256e..7d185dec4 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -362,6 +362,11 @@ class ZookeeperConfigManager extends BaseConfigManager { }); this._callbackHandler(cb, error); } + this.log.debug('list of buckets', { + zkPath, + method, + buckets, + }); this._callbackHandler(cb, null, buckets); }); } From 7b2d4da7290f1137eb2dd4e776117f0582027b65 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Fri, 14 Nov 2025 18:38:32 +0100 Subject: [PATCH 07/11] BB-727: Separate list and get configs The getConfig should not trigger a listing with watcher as it duplicates an already existing watcher. This can have an exponential effect of adding even more watchers when events are triggered. A clear separation prevent this effect: - only NODE_CHILDREN_CHANGED should trigger a new listing to reapply a watcher - getConfig should be contained to a single bucket (discovered by listing, or from watcher NODE_DATA_CHANGED) --- .../configManager/ZookeeperConfigManager.js | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index 7d185dec4..e3dc8b817 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -93,28 +93,29 @@ class ZookeeperConfigManager extends BaseConfigManager { }); } - _getConfigListener(updatedBucket = '') { + _getConfigListener(bucket) { this.log.debug('ZookeeperConfigManager.emitter.getConfig', { event: 'getConfig', + bucket, + }); + this._updateLocalStore([bucket]); + } + + _listConfigsListener() { + this.log.debug('ZookeeperConfigManager.emitter.listConfigs', { + event: 'listConfigs', }); this._listBucketsWithConfig((err, buckets) => { if (err) { - this._emitter.emit('error', err, 'getConfigListener'); + this._emitter.emit('error', err, 'listConfigsListener'); return undefined; } - this.log.debug('bucket config to be updated in map', { - bucket: updatedBucket, - }); const newBuckets = this._getNewBucketNodes(buckets); this.log.debug('new bucket configs to be added to map', { buckets: newBuckets, }); - const bucketsToMap = updatedBucket ? [updatedBucket, ...newBuckets] : newBuckets; - this.log.debug('bucket configs to be added/updated to map', { - buckets: bucketsToMap, - }); - if (bucketsToMap.length > 0) { - this._updateLocalStore(bucketsToMap); + if (newBuckets.length > 0) { + this._updateLocalStore(newBuckets); } return undefined; }); @@ -139,6 +140,7 @@ class ZookeeperConfigManager extends BaseConfigManager { .on('setConfig', (bucket, config) => this._setConfigListener(bucket, config)) .on('getConfig', bucket => this._getConfigListener(bucket)) + .on('listConfigs', () => this._listConfigsListener()) .on('removeConfig', bucket => this._removeConfigListener(bucket)); } @@ -349,7 +351,7 @@ class ZookeeperConfigManager extends BaseConfigManager { event, }); if (event.type === zookeeper.Event.NODE_CHILDREN_CHANGED) { - this._emitter.emit('getConfig'); + this._emitter.emit('listConfigs'); } }, (error, buckets) => { if (error) { From 66d8b1af28a1d2ae133ddf12ef447018748dfed2 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Sat, 15 Nov 2025 00:35:50 +0100 Subject: [PATCH 08/11] BB-727: Watch created config in queuePopulator --- .../configManager/ZookeeperConfigManager.js | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index e3dc8b817..2561a5539 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -237,11 +237,20 @@ class ZookeeperConfigManager extends BaseConfigManager { (exists, next) => { if (!exists) { return this._createBucketNotifConfigNode(bucket, - err => next(err)); + err => next(err, exists)); + } + return next(null, exists); + }, + (exists, next) => this._zkClient.setData(zkPath, Buffer.from(data), -1, err => next(err, exists)), + (exists, next) => { + if (!exists) { + // if znode is created, run getData to set a watcher on the bucket config + // in case another node becomes leader on the raft and modifies the config + // while the current process keeps running + return this._updateLocalStore([bucket], next); } return next(); }, - next => this._zkClient.setData(zkPath, Buffer.from(data), -1, next), ], err => { if (err) { this.log.error('error saving config', { method, zkPath, data }); From 3a1a21cb7807cc27383a00ddde15682fc2b1b700 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 5 Jan 2026 11:47:02 +0100 Subject: [PATCH 09/11] BB-727: Improve mkdirp to create znode with data Atomic mkdirp to prevent trigger a watcher for znode creation before data is set on the znode --- .../configManager/ZookeeperConfigManager.js | 32 ++++++------ lib/clients/ZookeeperManager.js | 49 +++++++++++++++++++ 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js index 2561a5539..01ff9038e 100644 --- a/extensions/notification/configManager/ZookeeperConfigManager.js +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -235,22 +235,12 @@ class ZookeeperConfigManager extends BaseConfigManager { return async.waterfall([ next => this._checkNodeExists(zkPath, next), (exists, next) => { - if (!exists) { - return this._createBucketNotifConfigNode(bucket, - err => next(err, exists)); - } - return next(null, exists); - }, - (exists, next) => this._zkClient.setData(zkPath, Buffer.from(data), -1, err => next(err, exists)), - (exists, next) => { - if (!exists) { - // if znode is created, run getData to set a watcher on the bucket config - // in case another node becomes leader on the raft and modifies the config - // while the current process keeps running - return this._updateLocalStore([bucket], next); + if (exists) { + return this._zkClient.setData(zkPath, Buffer.from(data), -1, next); + } else { + return this._createBucketNotifConfigNode(bucket, data, next); } - return next(); - }, + } ], err => { if (err) { this.log.error('error saving config', { method, zkPath, data }); @@ -288,7 +278,7 @@ class ZookeeperConfigManager extends BaseConfigManager { }); } - _createBucketNotifConfigNode(bucket, cb) { + _createBucketNotifConfigNode(bucket, data, cb) { const method = 'ZookeeperConfigManager._createBucketNotifConfigNode'; const zkPath = this._getBucketNodeZkPath(bucket); @@ -297,7 +287,10 @@ class ZookeeperConfigManager extends BaseConfigManager { bucket, zkPath, }); - return this._zkClient.mkdirp(zkPath, err => { + // mkdirp to ensure parent path exists, + // then atomically create the znode while setting data immediately + // to avoid other watchers to read the znode because data is set at creation + return this._zkClient.mkdirpWithChildDataOnly(zkPath, Buffer.from(data), err => { if (err) { this.log.error('Could not pre-create path in zookeeper', { method, @@ -306,7 +299,10 @@ class ZookeeperConfigManager extends BaseConfigManager { }); return this._callbackHandler(cb, err); } - return this._callbackHandler(cb); + // if znode is created, run getData to set a watcher on the bucket config + // in case another node becomes leader on the raft and modifies the config + // while the current process keeps running + return this._updateLocalStore([bucket], cb => this._callbackHandler(cb)); }); } diff --git a/lib/clients/ZookeeperManager.js b/lib/clients/ZookeeperManager.js index f35d34469..8a3ba9dc4 100644 --- a/lib/clients/ZookeeperManager.js +++ b/lib/clients/ZookeeperManager.js @@ -319,6 +319,55 @@ class ZookeeperManager extends EventEmitter { return cb(); }); } + + /** + * The lib's mkdirp apply data to all the parent znodes in path. + * This helper uses mkdirp without data for parent znodes, and applies data only to the last node. + * + * @method mkdirpWithChildDataOnly + * @param {String} path - The node path. + * @param {Buffer} [data=undefined] - The data buffer. + * @param {Array|Function} [acls=ACL.OPEN_ACL_UNSAFE] - The array of ACL object, or callback if omitted. + * @param {CreateMode|Function} [mode=CreateMode.PERSISTENT] - The creation mode, or callback if omitted. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + mkdirpWithChildDataOnly(path, data, acls, mode, callback) { + // Handle optional parameters (acls and mode) + /* eslint-disable no-param-reassign */ + if (typeof acls === 'function') { + callback = acls; + acls = undefined; + mode = undefined; + } else if (typeof mode === 'function') { + callback = mode; + mode = undefined; + } + /* eslint-enable no-param-reassign */ + + // Remove the empty string + const nodes = path.split('/').slice(1); + nodes.pop(); // Remove the last node + const parentPath = nodes.length > 0 ? `/${nodes.join('/')}` : ''; + + async.waterfall([ + next => { + if (!parentPath) { + return next(); + } + return this.client.mkdirp(parentPath, null, acls, mode, err => { + // Ignore NODE_EXISTS error - parent already exists + if (err && err.name === 'NODE_EXISTS') { + return next(); + } + return next(err); + }); + }, + next => this.client.create(path, data, acls, mode, next), + ], callback); + + return; + } } module.exports = ZookeeperManager; From bdfdaff475afa4b34a49a5c6a5b824e91d2fd113 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 5 Jan 2026 15:05:50 +0100 Subject: [PATCH 10/11] BB-727: Update tests to use ZookeeperManager --- .../ZookeeperConfigManager.spec.js | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js b/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js index 75791775c..59dfa2862 100644 --- a/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js +++ b/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js @@ -94,21 +94,40 @@ function checkParentConfigZkNode(manager, cb) { describe('ZookeeperConfigManager', () => { const zk = new ZookeeperMock({ doLog: false }); - const zkClient = zk.createClient(); - const params = { - zkClient, - zkConcurrency: 10, - logger, - }; + const zkMock = zk.createClient(); + let zkClient; + let params; + + before(() => { + // Stub _connect before instantiation to inject mock client + sinon.stub(ZookeeperManager.prototype, '_connect').callsFake(function () { + this.client = zkMock; + }); + + zkClient = new ZookeeperManager('localhost:2181', {}, logger); + + params = { + zkClient, + zkConcurrency: 10, + logger, + }; + }); + + after(() => { + sinon.restore(); + zk._resetState(); + }); describe('Constructor', () => { + let stub; function checkConfigParentNodeStub(cb) { return cb(new Error('error checking config parent node')); } - after(() => { - sinon.restore(); - zk._resetState(); + afterEach(() => { + if (stub) { + stub.restore(); + } }); it('constructor and setup checks', done => { @@ -128,7 +147,7 @@ describe('ZookeeperConfigManager', () => { it('should return error if checkConfigParentNode fails', done => { const manager = new ZookeeperConfigManager(params); - sinon.stub(manager, '_checkConfigurationParentNode') + stub = sinon.stub(manager, '_checkConfigurationParentNode') .callsFake(checkConfigParentNodeStub); manager.setup(err => { assert(err); @@ -250,7 +269,7 @@ describe('ZookeeperConfigManager', () => { sinon.stub(manager, '_updateLocalStore').yields(null); manager.setup(err => { assert.ifError(err); - assert(!(manager._zkClient instanceof ZookeeperManager)); + assert.strictEqual(manager._zkClient, zkClient); done(); }); }); From f4614f2cfb44f72d745508381e972ab9642ae4f3 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 5 Jan 2026 15:27:23 +0100 Subject: [PATCH 11/11] BB-727: Unit test mkdirpWithChildDataOnly --- tests/unit/clients/ZookeeperManager.js | 39 ++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/unit/clients/ZookeeperManager.js b/tests/unit/clients/ZookeeperManager.js index 574f86e2d..4d1dbd02c 100644 --- a/tests/unit/clients/ZookeeperManager.js +++ b/tests/unit/clients/ZookeeperManager.js @@ -17,6 +17,7 @@ describe('ZookeeperManager', () => { connect: sinon.stub(), getData: sinon.stub(), mkdirp: sinon.stub(), + create: sinon.stub(), close: sinon.stub(), }; @@ -88,4 +89,42 @@ describe('ZookeeperManager', () => { done(); }); }); + + it('should create parent path without data and child with data using mkdirpWithChildDataOnly', done => { + mockClient.mkdirp.callsArgWith(4, null); + mockClient.create.callsArgWith(4, null); + + zkClient = new ZookeeperManager('localhost:2181', {}, log); + + const testPath = '/config/bucket1'; + const testData = Buffer.from('test-data'); + + zkClient.mkdirpWithChildDataOnly(testPath, testData, err => { + assert.ifError(err); + sinon.assert.calledOnce(mockClient.mkdirp); + sinon.assert.calledWith(mockClient.mkdirp, '/config', null); + sinon.assert.calledOnce(mockClient.create); + sinon.assert.calledWith(mockClient.create, testPath, testData); + done(); + }); + }); + + it('should handle NODE_EXISTS error on parent path in mkdirpWithChildDataOnly', done => { + mockClient.mkdirp.callsArgWith(4, { name: 'NODE_EXISTS' }); + mockClient.create.callsArgWith(4, null); + + zkClient = new ZookeeperManager('localhost:2181', {}, log); + + const testPath = '/config/bucket1'; + const testData = Buffer.from('test-data'); + + zkClient.mkdirpWithChildDataOnly(testPath, testData, err => { + assert.ifError(err); + sinon.assert.calledOnce(mockClient.mkdirp); + sinon.assert.calledWith(mockClient.mkdirp, '/config', null); + sinon.assert.calledOnce(mockClient.create); + sinon.assert.calledWith(mockClient.create, testPath, testData); + done(); + }); + }); });