Skip to content

Auto Offset Reset for existing Consumer Group #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mikesparr opened this issue Jun 30, 2016 · 4 comments
Closed

Auto Offset Reset for existing Consumer Group #11

mikesparr opened this issue Jun 30, 2016 · 4 comments

Comments

@mikesparr
Copy link

mikesparr commented Jun 30, 2016

Is there an equivalent to the --from-beginning flag in console consumer?

I'm migrating from kafka-python client to this one and in some initial tests, it's not behaving as expected. I start a consumer with group testgroup1 consuming topic test which I've populated with a dozen messages or so. I have auto.offset.reset set to smallest and expect it to replay the topic from beginning. The first time it plays all. I stop and restart it and it does not.

I assume that it starts back again with latest offset, but am confused of the purpose of auto.offset.reset values. I expect earliest | smallest to consume from beginning. I expect largest | latest to pick up where last offset.

import confluent_kafka as kafka

conf_consumer = {'bootstrap.servers': "localhost:9092",
        'group.id': "testgroup1",
        'default.topic.config': {'auto.offset.reset': 'smallest',
        'offset.store.sync.interval.ms': 5000}
       }

def consume(topics=['test'], group='testgroup2', conf=None):
    """Connects to topic and listens for incoming messages and prints them"""

    running = True

    if conf is not None and isinstance(conf, dict):
        if group:
            print "Setting consumer group to [{}]".format(group)
            conf['group.id'] = group

        try:
            c = kafka.Consumer(**conf)
        except:
            print "Error creating Consumer with config [{}]".format(conf)

        try:
            c.subscribe(topics)
        except:
            print "Error subscribing to topics [{}]".format(topics)
            c = None

        if c:
            print "Starting to poll topics [{}]...".format(topics)
            while running:
                try:
                    msg = c.poll()
                    if not msg.error():
                        print "Received message [{}]".format(msg.value().decode('utf-8'))
                    elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write( "{} [{}] reached end at offset {}".format(
                                        msg.topic(), msg.partition(), msg.offset()))
                    else:
                        print "Unknown error [{}]. Quitting...".format(msg.error())
                        raise kafka.KafkaException(msg.error())
                except:
                    running = False
                    print "Error polling messages."
            # end while running loop
        else:
            print "Consumer object missing. Nothing to do!"
    else:
        print "Missing configuration or not dictionary. Nothing to do!"

    try:
        c.close() # close connection
    except:
        print "Could not close connection (c). Nothing to do!"


def main():
    print "Consuming content..."
    consume(conf=conf_consumer)


if __name__ == '__main__':
    main()

If I run in shell the kafka-console-consumer and add the --from-beginning flag it will replay all messages every time. I checked the librdkafka config documentation to see if there's an extra flag I'm missing like offset.store.sync.interval thinking it has to reset or something.

What am I missing or is it possible to restart the script and re-run without changing the consumer group.id

@mikesparr
Copy link
Author

I wonder if there should be 3 options for offset config:

  • earliest (reset offset and start from beginning [start over])
  • resume (last offset for group if exists, otherwise beginning [where we left off])
  • latest (start and last index for topic [now onwards])

It seems odd that a configuration parameter that implies "reset offset" is simply ignored if an offset exists, forcing you to edit Zookeeper or use other tools (e.g. Burrow or CLI) to be able to replay a topic consumer for a given group, or having to create another group and leave stale data (false lag for former group if not deleted).

@ewencp
Copy link
Contributor

ewencp commented Jul 1, 2016

@mikesparr I think you might be misunderstanding the meaning of auto.offset.reset. It sets the policy for resetting offsets when there are no committed offsets. The point of tracking offsets in Kafka is that you can track the progress you've made, even if all your consumer instances fail, and pick up where you left off so you don't reprocess data unnecessarily. This is actually quite important -- if you start up a consumer and you find committed offsets for the .

Generally you want the reset you're talking about to be explicit. In fact, it sounds like what you actually want is to delete the group and start fresh (which you could do with Kafka's kafka-consumer-groups.sh command). You're using resetting offsets to the beginning of topic partitions as a shortcut, although you should be careful about doing that if your topic subscription list is dynamic since you may leave stale offset data in your offsets topic.

If you want to do this from your application, I would suggest that you can just pass a flag to your program and use the on_assign callback to seek to the beginning, but I think we're currently missing a seek method. We'll want to add this, in which case you can do this yourself (and this is the same approach the console consumer uses for handling the --from-beginning and --offset arguments).

@edenhill
Copy link
Contributor

edenhill commented Jul 2, 2016

The assign() method actually allows you to set the initial offset for each partition, so try something like this:

    def on_assign (c, ps):
        for p in ps:
            p.offset=-2
        c.assign(ps)

    c.subscribe(['test'], on_assign=on_assign)

There are currently no symbolic names for the special logical offsets (stored, invalid, beginning, end,..), so you'll need to use internal values for them: -2 for beginning and -1 for end. In the above example I set it to -2 to always start consuming from the earliest/beginning offset, but you can also set it to an absolute offset (e.g., 12352) if you are storing the offsets outside Kafka.

@mikesparr
Copy link
Author

Thanks @ewencp and @edenhill for the detailed response and workaround example. I do understand the thinking behind the offsets and I have a special case why I want to "reuse" the group but reset. My plan is to have several Python scripts to re-consume a source topic and publish to a sink topic. The sink topic is consumed by Logstash and indexed in Elasticsearch. I want ability to reindex Elasticsearch, so I can start these scripts via Supervisor from time to time; I don't want to have to change Supervisor config and restart it because it runs other apps as well, so just being able to call command to start the script is goal.

I'll be looking at Kafka Streams soon to replace the Python app, but use from time to time for quick solutions. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants