Skip to content

Commit

Permalink
BB-466 Dealing with expired zookeeper sessions
Browse files Browse the repository at this point in the history
Introducing the ZookeeperManager Class, which offers:

* Connection Management: This class efficiently manages connections to the ZooKeeper server. It ensures continuity by handling reconnections whenever the session expires.


* Error and State Management: The class handles errors and meticulously logs state changes. It is essential for debugging.


* Abstraction Layer: It creates an abstraction over the database-specific code, simplifying the integration of application logic. This layer not only facilitates an easier transition to different databases in the future but also aids in simplifying the mocking and testing processes.
  • Loading branch information
nicolas2bert committed Nov 16, 2023
1 parent 55b7a0b commit e9ac8af
Show file tree
Hide file tree
Showing 14 changed files with 676 additions and 83 deletions.
8 changes: 3 additions & 5 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const BucketClient = require('bucketclient').RESTClient;

const BackbeatProducer = require('../../../lib/BackbeatProducer');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const zookeeperHelper = require('../../../lib/clients/zookeeper');
const ZookeeperManager = require('../../../lib/clients/ZookeeperManager');
const KafkaBacklogMetrics = require('../../../lib/KafkaBacklogMetrics');
const { authTypeAssumeRole } = require('../../../lib/constants');
const VaultClientCache = require('../../../lib/clients/VaultClientCache');
Expand Down Expand Up @@ -493,9 +493,7 @@ class LifecycleConductor {
process.nextTick(cb);
return;
}
this._zkClient = zookeeperHelper.createClient(
this.zkConfig.connectionString);
this._zkClient.connect();
this._zkClient = new ZookeeperManager(this.zkConfig.connectionString, null, this.logger);
this._zkClient.once('error', cb);
this._zkClient.once('ready', () => {
// just in case there would be more 'error' events
Expand Down Expand Up @@ -552,7 +550,7 @@ class LifecycleConductor {
// just in case there would be more 'error' events emitted
this._kafkaBacklogMetrics.removeAllListeners('error');
this._kafkaBacklogMetrics.on('error', err => {
this._log.error('error from kafka topic metrics', {
this.logger.error('error from kafka topic metrics', {
error: err.message,
method: 'LifecycleConductor._initKafkaBacklogMetrics',
});
Expand Down
8 changes: 4 additions & 4 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const errors = require('arsenal').errors;

const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const NotificationDestination = require('../destination');
const zookeeper = require('../../../lib/clients/zookeeper');
const ZookeeperManager = require('../../../lib/clients/ZookeeperManager');
const configUtil = require('../utils/config');
const messageUtil = require('../utils/message');
const NotificationConfigManager = require('../NotificationConfigManager');
Expand Down Expand Up @@ -72,10 +72,10 @@ class QueueProcessor extends EventEmitter {
`${this.zkConfig.connectionString}${populatorZkPath}`;
this.logger.info('opening zookeeper connection for reading ' +
'bucket notification configuration', { zookeeperUrl });
this.zkClient = zookeeper.createClient(zookeeperUrl, {
this.zkClient = new ZookeeperManager(zookeeperUrl, {
autoCreateNamespace: this.zkConfig.autoCreateNamespace,
});
this.zkClient.connect();
}, this.logger);

this.zkClient.once('error', done);
this.zkClient.once('ready', () => {
// just in case there would be more 'error' events emitted
Expand Down
5 changes: 2 additions & 3 deletions lib/KafkaBacklogMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const zookeeper = require('node-zookeeper-client');
const Logger = require('werelogs').Logger;
const { errors, metrics } = require('arsenal');

const zookeeperHelper = require('./clients/zookeeper');
const ZookeeperManager = require('./clients/ZookeeperManager');
const { readUInt64BE } = require('./util/buffer');
const { promMetricNames } = require('./constants').kafkaBacklogMetrics;

Expand Down Expand Up @@ -58,8 +58,7 @@ class KafkaBacklogMetrics extends EventEmitter {
}

_initZookeeperClient() {
this._zookeeper = zookeeperHelper.createClient(this._zookeeperEndpoint);
this._zookeeper.connect();
this._zookeeper = new ZookeeperManager(this._zookeeperEndpoint, null, this._log);
this._zookeeper.on('error', err => {
this.emit('error', err);
});
Expand Down
18 changes: 4 additions & 14 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'; // eslint-disable-line strict

const async = require('async');
const zookeeper = require('node-zookeeper-client');

const { errors } = require('arsenal');
const { RedisClient } = require('arsenal').metrics;
Expand All @@ -17,6 +16,7 @@ const Healthcheck = require('./Healthcheck');
const routes = require('./routes');
const { getSortedSetKey, getSortedSetMember } =
require('../util/sortedSetHelper');
const ZookeeperManager = require('../clients/ZookeeperManager');

// StatsClient constant defaults
// TODO: This should be moved to constants file
Expand Down Expand Up @@ -182,15 +182,6 @@ class BackbeatAPI {
&& this._crrStatusProducer.isReady();
}

/**
* Check if Zookeeper and Producer are connected
* @return {boolean} true/false
*/
isConnected() {
return this._zkClient.getState().name === 'SYNC_CONNECTED'
&& this._checkProducersReady();
}

/**
* Get Kafka healthcheck
* @param {object} details - route details from lib/api/routes.js
Expand Down Expand Up @@ -1079,13 +1070,12 @@ class BackbeatAPI {
const zookeeperUrl =
`${this._zkConfig.connectionString}${populatorZkPath}`;

const zkClient = zookeeper.createClient(zookeeperUrl, {
const zkClient = new ZookeeperManager(zookeeperUrl, {
autoCreateNamespace: this._zkConfig.autoCreateNamespace,
});
zkClient.connect();
}, this._logger);

zkClient.once('error', cb);
zkClient.once('connected', () => {
zkClient.once('ready', () => {
zkClient.removeAllListeners('error');
this._zkClient = zkClient;
return cb();
Expand Down
2 changes: 1 addition & 1 deletion lib/api/Healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Healthcheck {
/**
* @constructor
* @param {object} repConfig - extensions.replication configs
* @param {node-zookeeper-client.Client} zkClient - zookeeper client
* @param {ZookeeperManager} zkClient - zookeeper client manager
* @param {BackbeatProducer} crrProducer - producer for CRR topic
* @param {BackbeatProducer} crrStatusProducer - CRR status producer
* @param {BackbeatProducer} metricProducer - producer for metric
Expand Down
Loading

0 comments on commit e9ac8af

Please sign in to comment.