Skip to content
Open
2 changes: 1 addition & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
this.bnConfigManager.setConfig(bucketName, bnConfig);
return undefined;
}
// bucket was deleter or notification conf has been removed, so remove zk node
// bucket was deleted or notification conf has been removed, so remove zk node
this.bnConfigManager.removeConfig(bucketName || bucket);
return undefined;
}
Expand Down
91 changes: 65 additions & 26 deletions extensions/notification/configManager/ZookeeperConfigManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ class ZookeeperConfigManager extends BaseConfigManager {
this._setupEventListeners();
}

// https://github.com/alexguan/node-zookeeper-client/blob/master/lib/WatcherManager.js#L13-L39
// watchers should be reapplied only when their event is triggered, otherwise they will be duplicated
// and the lib will keep adding listeners
// (even if the functions are the same, they are js object so they don't match)
// warn early to catch leak more easily instead of waiting event emitter limit at 10 listeners.
_warnZkWatchersLeak(type, path) {
const watcherManager = this._zkClient?.client?.connectionManager?.watcherManager;

const watchers = watcherManager?.[`${type}Watchers`]?.[path];
if (!watchers) {
return;
}

const listeners = watchers.listenerCount('notification');
if (listeners > 0) {
process.emitWarning(`${type}Watchers[${path}] has already ${listeners} listeners`, {
code: 'ZkWatchersLeak',
});
}
}

_errorListener(error, listener) {
this.log.error('ZookeeperConfigManager.emitter.error', {
listener,
Expand All @@ -72,28 +93,29 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
}

_getConfigListener(updatedBucket = '') {
_getConfigListener(bucket) {
this.log.debug('ZookeeperConfigManager.emitter.getConfig', {
event: 'getConfig',
bucket,
});
this._updateLocalStore([bucket]);
}

_listConfigsListener() {
this.log.debug('ZookeeperConfigManager.emitter.listConfigs', {
event: 'listConfigs',
});
this._listBucketsWithConfig((err, buckets) => {
if (err) {
this._emitter.emit('error', err, 'getConfigListener');
this._emitter.emit('error', err, 'listConfigsListener');
return undefined;
}
this.log.debug('bucket config to be updated in map', {
bucket: updatedBucket,
});
const newBuckets = this._getNewBucketNodes(buckets);
this.log.debug('new bucket configs to be added to map', {
buckets: newBuckets,
});
const bucketsToMap = updatedBucket ? [updatedBucket, ...newBuckets] : newBuckets;
this.log.debug('bucket configs to be added/updated to map', {
buckets: bucketsToMap,
});
if (bucketsToMap.length > 0) {
this._updateLocalStore(bucketsToMap);
if (newBuckets.length > 0) {
this._updateLocalStore(newBuckets);
}
return undefined;
});
Expand All @@ -118,6 +140,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
.on('setConfig',
(bucket, config) => this._setConfigListener(bucket, config))
.on('getConfig', bucket => this._getConfigListener(bucket))
.on('listConfigs', () => this._listConfigsListener())
.on('removeConfig', bucket => this._removeConfigListener(bucket));
}

Expand All @@ -132,10 +155,10 @@ class ZookeeperConfigManager extends BaseConfigManager {
return `/${constants.zkConfigParentNode}/${bucket}`;
}

_getConfigDataFromBuffer(data) {
_getConfigDataFromBuffer(data, bucket) {
const { error, result } = safeJsonParse(data);
if (error) {
this.log.error('invalid config', { error, config: data });
this.log.error('invalid config', { error, config: data, bucket });
return undefined;
}
return result;
Expand All @@ -150,6 +173,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
bucket,
zkPath,
});
this._warnZkWatchersLeak('data', zkPath);
return this._zkClient.getData(zkPath, event => {
this.log.debug('zookeeper getData watcher triggered', {
zkPath,
Expand Down Expand Up @@ -211,13 +235,12 @@ class ZookeeperConfigManager extends BaseConfigManager {
return async.waterfall([
next => this._checkNodeExists(zkPath, next),
(exists, next) => {
if (!exists) {
return this._createBucketNotifConfigNode(bucket,
err => next(err));
if (exists) {
return this._zkClient.setData(zkPath, Buffer.from(data), -1, next);
} else {
return this._createBucketNotifConfigNode(bucket, data, next);
}
return next();
},
next => this._zkClient.setData(zkPath, Buffer.from(data), -1, next),
}
], err => {
if (err) {
this.log.error('error saving config', { method, zkPath, data });
Expand Down Expand Up @@ -255,7 +278,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
}

_createBucketNotifConfigNode(bucket, cb) {
_createBucketNotifConfigNode(bucket, data, cb) {
const method
= 'ZookeeperConfigManager._createBucketNotifConfigNode';
const zkPath = this._getBucketNodeZkPath(bucket);
Expand All @@ -264,7 +287,10 @@ class ZookeeperConfigManager extends BaseConfigManager {
bucket,
zkPath,
});
return this._zkClient.mkdirp(zkPath, err => {
// mkdirp to ensure parent path exists,
// then atomically create the znode while setting data immediately
// to avoid other watchers to read the znode because data is set at creation
return this._zkClient.mkdirpWithChildDataOnly(zkPath, Buffer.from(data), err => {
if (err) {
this.log.error('Could not pre-create path in zookeeper', {
method,
Expand All @@ -273,7 +299,10 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
return this._callbackHandler(cb, err);
}
return this._callbackHandler(cb);
// if znode is created, run getData to set a watcher on the bucket config
// in case another node becomes leader on the raft and modifies the config
// while the current process keeps running
return this._updateLocalStore([bucket], cb => this._callbackHandler(cb));
});
}

Expand Down Expand Up @@ -319,14 +348,15 @@ class ZookeeperConfigManager extends BaseConfigManager {
const method
= 'ZookeeperConfigManager._listBucketsWithConfig';
const zkPath = `/${constants.zkConfigParentNode}`;
this._warnZkWatchersLeak('child', zkPath);
this._zkClient.getChildren(zkPath, event => {
this.log.debug('zookeeper getChildren watcher triggered', {
zkPath,
method,
event,
});
if (event.type === zookeeper.Event.NODE_CHILDREN_CHANGED) {
this._emitter.emit('getConfig');
this._emitter.emit('listConfigs');
}
}, (error, buckets) => {
if (error) {
Expand All @@ -339,6 +369,11 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
this._callbackHandler(cb, error);
}
this.log.debug('list of buckets', {
zkPath,
method,
buckets,
});
this._callbackHandler(cb, null, buckets);
});
}
Expand All @@ -349,7 +384,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
if (err) {
return next(err);
}
const configObject = this._getConfigDataFromBuffer(data);
const configObject = this._getConfigDataFromBuffer(data, bucket);
if (configObject) {
this._configs.set(bucket, configObject);
}
Expand Down Expand Up @@ -402,16 +437,20 @@ class ZookeeperConfigManager extends BaseConfigManager {
* Remove bucket notification configuration
*
* @param {String} bucket - bucket
* @param {boolean} [emitToZk = true] - whether to emit the event to zookeeper
* @return {boolean} - true if removed
*/
removeConfig(bucket) {
removeConfig(bucket, emitToZk = true) {
try {
this.log.debug('remove config', {
method: 'ZookeeperConfigManager.removeConfig',
bucket,
emitToZk,
});
this._configs.delete(bucket);
this._emitter.emit('removeConfig', bucket);
if (emitToZk) {
this._emitter.emit('removeConfig', bucket);
}
return true;
} catch (err) {
const errMsg
Expand Down
2 changes: 1 addition & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ class BackbeatConsumer extends EventEmitter {
const { topic, partition, offset, key, timestamp } = entry;
this._log.debug('finished processing of consumed entry', {
method: 'BackbeatConsumer.subscribe',
entry: { topic, partition, offset, key, timestamp },
entry: { topic, partition, offset, key: key?.toString?.() || key, timestamp },
groupId: this._groupId,
});
if (err) {
Expand Down
49 changes: 49 additions & 0 deletions lib/clients/ZookeeperManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,55 @@ class ZookeeperManager extends EventEmitter {
return cb();
});
}

/**
* The lib's mkdirp apply data to all the parent znodes in path.
* This helper uses mkdirp without data for parent znodes, and applies data only to the last node.
*
* @method mkdirpWithChildDataOnly
* @param {String} path - The node path.
* @param {Buffer} [data=undefined] - The data buffer.
* @param {Array|Function} [acls=ACL.OPEN_ACL_UNSAFE] - The array of ACL object, or callback if omitted.
* @param {CreateMode|Function} [mode=CreateMode.PERSISTENT] - The creation mode, or callback if omitted.
* @param {Function} callback - The callback function.
* @return {undefined}
*/
mkdirpWithChildDataOnly(path, data, acls, mode, callback) {
// Handle optional parameters (acls and mode)
/* eslint-disable no-param-reassign */
if (typeof acls === 'function') {
callback = acls;
acls = undefined;
mode = undefined;
} else if (typeof mode === 'function') {
callback = mode;
mode = undefined;
}
/* eslint-enable no-param-reassign */

// Remove the empty string
const nodes = path.split('/').slice(1);
nodes.pop(); // Remove the last node
const parentPath = nodes.length > 0 ? `/${nodes.join('/')}` : '';

async.waterfall([
next => {
if (!parentPath) {
return next();
}
return this.client.mkdirp(parentPath, null, acls, mode, err => {
// Ignore NODE_EXISTS error - parent already exists
if (err && err.name === 'NODE_EXISTS') {
return next();
}
return next(err);
});
},
next => this.client.create(path, data, acls, mode, next),
], callback);

return;
}
}

module.exports = ZookeeperManager;
39 changes: 39 additions & 0 deletions tests/unit/clients/ZookeeperManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('ZookeeperManager', () => {
connect: sinon.stub(),
getData: sinon.stub(),
mkdirp: sinon.stub(),
create: sinon.stub(),
close: sinon.stub(),
};

Expand Down Expand Up @@ -88,4 +89,42 @@ describe('ZookeeperManager', () => {
done();
});
});

it('should create parent path without data and child with data using mkdirpWithChildDataOnly', done => {
mockClient.mkdirp.callsArgWith(4, null);
mockClient.create.callsArgWith(4, null);

zkClient = new ZookeeperManager('localhost:2181', {}, log);

const testPath = '/config/bucket1';
const testData = Buffer.from('test-data');

zkClient.mkdirpWithChildDataOnly(testPath, testData, err => {
assert.ifError(err);
sinon.assert.calledOnce(mockClient.mkdirp);
sinon.assert.calledWith(mockClient.mkdirp, '/config', null);
sinon.assert.calledOnce(mockClient.create);
sinon.assert.calledWith(mockClient.create, testPath, testData);
done();
});
});

it('should handle NODE_EXISTS error on parent path in mkdirpWithChildDataOnly', done => {
mockClient.mkdirp.callsArgWith(4, { name: 'NODE_EXISTS' });
mockClient.create.callsArgWith(4, null);

zkClient = new ZookeeperManager('localhost:2181', {}, log);

const testPath = '/config/bucket1';
const testData = Buffer.from('test-data');

zkClient.mkdirpWithChildDataOnly(testPath, testData, err => {
assert.ifError(err);
sinon.assert.calledOnce(mockClient.mkdirp);
sinon.assert.calledWith(mockClient.mkdirp, '/config', null);
sinon.assert.calledOnce(mockClient.create);
sinon.assert.calledWith(mockClient.create, testPath, testData);
done();
});
});
});
Loading
Loading