Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High-level consumer rebalance algorithm having issues #124

Closed
dmitrig01 opened this issue Nov 17, 2014 · 5 comments
Closed

High-level consumer rebalance algorithm having issues #124

dmitrig01 opened this issue Nov 17, 2014 · 5 comments

Comments

@dmitrig01
Copy link

Hey all, I saw a number of issues with the rebalancing algorithm in the high-level consumer. Namely, when several new consumers joined the cluster at the same time, the algorithm had trouble deciding how many partitions each consumer should be responsible for. This resulted in issues wherein partitions would become unassigned, and, more importantly, issues where somehow the consumer would not resolve it's conflicts within Zookeeper but read messages from its partitions in Kafka anyway, so multiple consumers would end up reading messages from the same partition(s). This was not good.

Additionally, I ran into #90, wherein multiple consumers repeatedly try to claim the same partition – I think this is simply due to the fact that the number of retries is not high enough, and all the consumers retry at the same time, creating contention. Using the retry library, I was able to increase the number of retries without more nesting, and I was able to randomize the timeout to hopefully reduce contention.

I also ran into #112, which arises due to the fact that in the rebalance code, the consumer will unassign itself from its partition successfully, but fail to claim a new partition. It will then retry the rebalance code, and try again to unassign itself from its partitions, even though it has already been assigned. I solved this by retrying each part of the rebalance code separately.

Anyway, due to these four issues (unassigned partitions, double-assigned partitions, NODE_EXISTS, and NO_NODE), I rewrote the rebalance code for my purposes and it seems to work quite well. I do not have the time to make it a full pull request, but I am attaching the code here in hopes that it will be useful to you. Perhaps at some point in the future I can make a more formal PR.

Two brief notes: (a) I used promises instead of bluebird for async flow for consistency with the rest of my codebase, (b) in my logic, each consumer only claims one partition from each topic. I simply run the same number of consumers as I have partitions – though this definitely would not be ideal for everyone. Perhaps the logic here can be extended for those who need more partitions than consumers.

var B = require('bluebird'), retry = require('retry');

function retryAndPromise(func) {
    return new B(function(resolve, reject) {
        var operation = retry.operation({
            retries: 10,
            minTimeout: 10,
            maxTimeout: 200,
            randomize: true
        });

        operation.attempt(function() {
            func().then(resolve, function(err) {
                if (!operation.retry(err)) { reject(err); }
            });
        });
    });
}

    function rebalance() {
        if (!self.rebalancing) {
            self.emit('rebalancing');
            self.rebalancing = true;

            var oldTopicPayloads = JSON.parse(JSON.stringify(self.topicPayloads));
            self.topicPayloads = [];

            log.info("HighLevelConsumer " + self.id + " is attempting to rebalance");
            retryAndPromise(function(cb) {
                return new B(function(resolve, reject) {
                    log.info("HighLevelConsumer " + self.id + " stopping data read during rebalance");
                    self.stop(function() { resolve(); });
                });
            }).then(function() {
                return retryAndPromise(function() {
                    log.info("HighLevelConsumer " + self.id + " releasing current partitions during rebalance");
                    return B.map(oldTopicPayloads, function(tp) {
                        return new B(function(resolve, reject) {
                            if (tp.partition !== undefined) {
                                console.log(self.options.groupId, tp.topic, tp.partition);
                                self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, function(err) {
                                    if (err) { reject(err); }
                                    else { resolve(); }
                                });
                            }
                            else { resolve(); }
                        });
                    });
                });
            }).then(function() {
                return retryAndPromise(function() {
                    log.info("HighLevelConsumer " + self.id + " determining the partitions to own during rebalance");

                    var newTopicPayloads = [];
                    return B.map(_.uniq(_.pluck(self.originalTopics, 'topic')), function(topic) {
                        return new B(function(resolve, reject) {
                            self.client.zk.client.getChildren('/consumers/' + self.options.groupId + '/owners/' + topic, function(err, takenKeys) {
                                if (err) { return reject(err); }

                                self.client.zk.client.getData('/brokers/topics/' + topic, function(err, data) {
                                    if (err) { return reject(err); }
                                    var allKeys = Object.keys(JSON.parse(String(data)).partitions);

                                    var availableKeys = _.difference(allKeys, takenKeys);
                                    if (!availableKeys.length) { return resolve(); }
                                    newTopicPayloads.push({
                                        topic: topic,
                                        partition: availableKeys[0],
                                        offset: 0,
                                        maxBytes: self.options.fetchMaxBytes,
                                        metadata: 'm'
                                    });
                                    resolve();
                                });
                            });
                        });
                    }).then(function() {
                        log.info("HighLevelConsumer " + self.id + " attempting to claim partitions " + _.pluck(newTopicPayloads, 'partition'));

                        return B.map(newTopicPayloads, function(tp) {
                            return new B(function(resolve, reject) {
                                if (tp.partition !== undefined) {
                                    // In the case that two processes try to claim the same partition, one will error out.
                                    self.client.zk.addPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function(err) {
                                        if (err) { reject(err); }
                                        else { console.log('Owned partition', self.id, self.options.groupId, tp.topic, tp.partition); resolve(); }
                                    });
                                }
                                else { resolve(); }
                            });
                        });
                    }).then(function() {
                        log.info("HighLevelConsumer " + self.id + " rebalance attempt was successful");
                        self.topicPayloads = newTopicPayloads;
                    });
                });

            }).then(function() {
                self.rebalancing = false;
                self.ready = true;
                return self.emit('ready');
            }, function(err) {
                self.rebalancing = false;
                return self.emit('error', new errors.FailedToRebalanceConsumerError(err.toString()));
            });
        }
    }
@jezzalaycock
Copy link
Contributor

Rebalancing in kafka relies on an assumption that all consumers try at the same time: see below for an extract from the kafka docs.

"Each consumer does the following during rebalancing:

  1. For each topic T that Ci subscribes to
  2. let PT be all partitions producing topic T
  3. let CG be all consumers in the same group as Ci that consume topic T
  4. sort PT (so partitions on the same broker are clustered together)
  5. sort CG
  6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)
  7. assign partitions from i_N to (i+1)_N - 1 to consumer Ci
  8. remove current entries owned by Ci from the partition owner registry
  9. add newly assigned partitions to the partition owner registry
    (we may need to re-try this until the original partition owner releases its ownership)

When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers *within the same group about the same time."

Essentially zk must know about all consumers in a group at the same time to ensure partitions are allocated appropriately. Rebalncing in kafka is a pain - see issues such as https://issues.apache.org/jira/browse/KAFKA-242
Note version 0.9.0 consumer re-design moves all this logic to the broker and will result in the highLevel consumer being far simpler!

I would suggest that the client code manually manage offsets - though this in itself is complicated as you must keep track of rebalances and commits.

@dmitrig01
Copy link
Author

Ah - this is very interesting – I searched for a while but could not find that document on this rebalancing algorithm. However – that still doesn't explain the issue I was seeing: when I ran kafka.tools.ConsumerOffsetChecker, it showed that several partitions were unassigned (several minutes after a rebalance, which should have been plenty of time to allow for the full rebalance), and several consumers were actively consuming messages from the same partitions. Thus I don't actually think this is an issue with tracking offsets – but more an issue with the way rebalancing is happening.

@jezzalaycock
Copy link
Contributor

I have issued a pull request using retry to retry the rebalance. The management of the offsets I do in my client - I've got a multinode zk and broker which has been running 24/7 with no issues. I will try to share my client code with you.

I heavily rely on pm2 to manage restarts as there are some circumstances that I simply want the client to die and re-connect - for instance when I lose connection or brokers die. Using pm2 it will simply retry and then reconnect and consume messages from the last committed offset.

If you want the consumer to co-exist with other consumers (not necessarily a node client) then you must follow the regime set out in the kafka docs.

@dmitrig01
Copy link
Author

Ahh, very interesting. I searched for a while but could not find the documentation on how consumers are supposed to handle rebalance.

I actually ended up writing a Java client that makes an HTTP request to node, and that seems to be a lot more stable.

PM2 sounds interesting - thanks.

@wanming
Copy link

wanming commented Nov 2, 2015

@jezzalaycock Can you share the code to me? thx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants