Skip to content

A simpler kafka consumer #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Dec 17, 2014
Merged

A simpler kafka consumer #234

merged 33 commits into from
Dec 17, 2014

Conversation

dpkp
Copy link
Owner

@dpkp dpkp commented Sep 11, 2014

kafka = KafkaConsumer('topic1')
for m in kafka:
    print m

kafka = KafkaConsumer('topic1', 'topic2',
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

for m in kafka:
    process_message(m)
    kafka.task_done(m)

@dpkp
Copy link
Owner Author

dpkp commented Sep 11, 2014

more details in docstring. general approach is to stick to the Java consumer configuration parameters and try to be as pythonic as possible otherwise. still needs a few more tweaks, but interested in feedback on approach / interface.

```
# more advanced consumer -- multiple topics w/ auto commit offset management
kafka = KafkaConsumer('topic1', 'topic2',
group_id='my_consumer_group',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather take an actual dictionary of formal kafka properties here. This would enable us to (maybe, someday?) share consumer props with a java process.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disagree. **kwargs seems to be the generally accepted pythonic interface. Plus if you wanted to load a dictionary of configs from a file or elsewhere, you could convert it to kwargs easily:

config = dict()
config.update(load_parameters_from_file())
kafka = KafkaConsumer('topic1', 'topic2', **config)

Unless there is something special about java properties (not really a dict?), I don't see the need to worry about this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok; I'd like to add a function for converting formal java prop names to kwarg names then. This can be done at a later time.

@wizzat
Copy link
Collaborator

wizzat commented Sep 11, 2014

I really like the approach and interface. Just to be clear: we are sacrificing the current ability to commit every N messages? There's probably more, but it's getting kinda late. :)

@dpkp
Copy link
Owner Author

dpkp commented Sep 11, 2014

also didn't see a config for auto commit by # of messages in the java api. It would be relatively easy to support. Perhaps auto_commit_interval_messages ?

@wizzat
Copy link
Collaborator

wizzat commented Sep 12, 2014

I'm not 100% positive that we actually need it, but it should be pretty easy to add. That seems like a reasonable name.

@dpkp dpkp added the consumer label Sep 14, 2014
@dpkp
Copy link
Owner Author

dpkp commented Sep 16, 2014

integration tests failing on python 3.3 and 3.4 -- will investigate

"""
Configuration settings can be passed to constructor,
otherwise defaults will be used:
client_id='kafka.consumer.kafka',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to dynamically construct this format string from the actual defaults. That would be Much Better(tm)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure how to do this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I was thinking you could pass the defaults in as a dictionary and .format() them. However, that primarily serves the purpose of making sure runtime docs are good. Reading the docs at the source code level would suffer. :-/

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do """default={key_name}""".format(**DEFAULT_CONSUMER_CONFIG) , but when i try to examine from shell via help(KafkaConsumer) i see empty docstring... does this work for you? testing generically via python shell also doesn't work. have you gotten something like this to work before?

@dpkp dpkp mentioned this pull request Nov 6, 2014
@wizzat
Copy link
Collaborator

wizzat commented Dec 12, 2014

Seems worth merging to me. I really like the interface a lot more.

@dpkp
Copy link
Owner Author

dpkp commented Dec 13, 2014

thanks -- in the spirit of not merging my own PR, i'll leave it to you or mumrah

@wizzat
Copy link
Collaborator

wizzat commented Dec 15, 2014

I can't merge your PR, because there's a merge conflict.

@dpkp
Copy link
Owner Author

dpkp commented Dec 15, 2014

strange -- it's rebased to master/HEAD . manual steps yield no merge conflicts. boo on github.

Dana Powers added 8 commits December 15, 2014 12:42
```
kafka = KafkaConsumer('topic1')
for m in kafka:
    print m

kafka = KafkaConsumer('topic1', 'topic2',
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

for m in kafka:
    process_message(m)
    kafka.task_done(m)
```
…tion list; add more type checks to set_topic_and_partitions
Dana Powers added 25 commits December 15, 2014 12:42
Log connection failures w/ traceback in kafka/client.py
@dpkp
Copy link
Owner Author

dpkp commented Dec 15, 2014

doh -- had to rebase against the offset PR that just landed. merge button should work now.

wizzat added a commit that referenced this pull request Dec 17, 2014
@wizzat wizzat merged commit 9c5216a into dpkp:master Dec 17, 2014
@dpkp dpkp deleted the high_level_consumer branch June 10, 2015 00:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants