diff --git a/lib/highLevelConsumer.js b/lib/highLevelConsumer.js index ab2f636e..cdd475c2 100644 --- a/lib/highLevelConsumer.js +++ b/lib/highLevelConsumer.js @@ -9,8 +9,6 @@ var util = require('util'), Offset = require('./offset'), async = require("async"), errors = require('./errors'), - zk = require('./zookeeper'), - ZookeeperConsumerMappings = zk.ZookeeperConsumerMappings, retry = require('retry'), debug = require('debug')('kafka-node:HighLevelConsumer'); @@ -22,6 +20,7 @@ var DEFAULTS = { autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, + rebalanceDelayMs: 0, paused: false, maxNumSegments: 1000, fetchMinBytes: 1, @@ -48,6 +47,7 @@ var HighLevelConsumer = function (client, topics, options) { this.ready = false; this.paused = this.options.paused; this.rebalancing = false; + this.pending = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); this.payloads = this.buildPayloads(topics); this.topicPayloads = this.buildTopicPayloads(topics); @@ -143,8 +143,6 @@ HighLevelConsumer.prototype.connect = function () { function rebalance() { if (!self.rebalancing) { - deregister(); - self.emit('rebalancing'); self.rebalancing = true; @@ -172,45 +170,43 @@ HighLevelConsumer.prototype.connect = function () { return p.topic; }); self.client.refreshMetadata(topicNames, function (err) { - register(); self.rebalancing = false; if (err) { - self.emit('error', err); + self.emit('error', err); } else { - self.emit('rebalanced'); + self.emit('rebalanced'); + } + if (self.pending) { + self.pending = false; + rebalance(); } }); } }); }); + } else { + self.pending = true; } } - // Wait for the consumer to be ready - this.on('registered', function () { - rebalance(); - }); - - function register() { + function registerListeners() { debug("Registered listeners"); // Register for re-balances (broker or consumer changes) self.client.zk.on('consumersChanged', rebalance); self.client.on('brokersChanged', rebalance); } - - function deregister() { - debug("Deregistered listeners"); - // Register for re-balances (broker or consumer changes) - self.client.zk.removeListener('consumersChanged', rebalance); - self.client.removeListener('brokersChanged', rebalance); - } + self.on('registered', function () { + setTimeout(function () { + registerListeners(); + rebalance(); + }, self.options.rebalanceDelayMs); + }); function attachZookeeperErrorListener() { self.client.zk.on('error', function (err) { - self.emit('error', err); - }); + self.emit('error', err); + }); } - attachZookeeperErrorListener(); this.client.on('zkReconnect', function () { @@ -249,7 +245,6 @@ HighLevelConsumer.prototype.connect = function () { }); }); - // 'done' will be emit when a message fetch request complete this.on('done', function (topics) { self.updateOffsets(topics); @@ -443,7 +438,6 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) { }; HighLevelConsumer.prototype.init = function () { - var self = this; if (!self.topicPayloads.length) { @@ -452,11 +446,10 @@ HighLevelConsumer.prototype.init = function () { self.registerConsumer(function (err) { if (err) { - return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString())); + self.emit('error', new errors.FailedToRegisterConsumerError(err.toString())); + return; } - - // Close the - return self.emit('registered'); + self.emit('registered'); }); }; @@ -530,11 +523,13 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) { * @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered */ HighLevelConsumer.prototype.registerConsumer = function (cb) { - this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) { + var zk = this.client.zk; + var groupId = this.options.groupId; + zk.registerConsumer(groupId, this.id, this.payloads, function (err) { if (err) return cb(err); + zk.listConsumers(groupId); cb(); }); - this.client.zk.listConsumers(this.options.groupId); }; HighLevelConsumer.prototype.addTopics = function (topics, cb) { diff --git a/lib/zookeeper.js b/lib/zookeeper.js index b05554b9..8ac0fa93 100644 --- a/lib/zookeeper.js +++ b/lib/zookeeper.js @@ -248,13 +248,13 @@ Zookeeper.prototype.listBrokers = function (cb) { Zookeeper.prototype.listConsumers = function (groupId) { - var that = this; + var self = this; var path = '/consumers/' + groupId + '/ids'; this.client.getChildren( path, function () { - if (!that.closed) { - that.listConsumers(groupId); + if (!self.closed) { + self.listConsumers(groupId); } }, function (error, children) { @@ -262,10 +262,10 @@ Zookeeper.prototype.listConsumers = function (groupId) { debug(error); // Ignore NO_NODE error here #157 if (error.name !== 'NO_NODE') { - that.emit('error', error); + self.emit('error', error); } } else { - that.emit('consumersChanged'); + self.emit('consumersChanged'); } } );