-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added controlled thread shutdown to example.py #1268
Conversation
Added better control over threads/processes, making them shutdown cleanly before the program terminates. Switched from the iterator to poll() style to do it. Should solve this issue: #1130
example.py
Outdated
print (message) | ||
|
||
while not self.stop_event.is_set(): | ||
message = consumer.poll(timeout_ms=0, max_records=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admit that I am not a huge fan of the poll
interface. We support it because that is how the java client works, but I think using the iterator is much more pythonic. Perhaps something like:
consumer = KafkaConsumer(...., consumer_timeout_ms=1000)
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not work. Needs the wakeup method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the iterator looks more pythonic, and believe me I've tried to use it instead. But it simply isn't controllable as currently implemented. The main problem being that it blocks indefinitely if it's called when there won't be any new messages for it. That for loop won't execute because it's waiting for the consumer to return a value for message, so neither the if statement nor the while loop checking the event will ever fire.
consumer_timeout_ms is not a great solution either because it just staticly sets the lifetime of the KafkaConsumer object. The consumer just closes on its own after that length of time even if you're actively pulling messages through it with the iterator. You can set consumer_timeout_ms for longer, but then you're just back to the same problem where the iterator call is blocking forever until that timer runs out.
poll() is the only way I've found to put clean consumer closure under programmer control. It lets you use the same consumer as long as you like without reinitializing it, and then lets you shut it down very quickly as well. My code, which poll()'s for maxrecords=1 and loops if no new messages, is the closest you can currently get to a controllable iterator that allows the programmer's code to end the request loop at will.
If I've somehow missed a command to shut down the iterator's internal request loop please let me know.
consumer_timeout_ms should simply raise StopIteration to signal to end the
for iteration. It should be fine to iterate again on the same consumer.
There is no auto close unless I'm missing something?
…On Oct 19, 2017 10:38 PM, "Benn Roth" ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In example.py
<#1268 (comment)>:
> def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
- for message in consumer:
- print (message)
-
+ while not self.stop_event.is_set():
+ message = consumer.poll(timeout_ms=0, max_records=1)
I agree the iterator looks more pythonic, and believe me I've tried to use
it instead. But it simply isn't controllable as currently implemented. The
main problem being that it blocks indefinitely if it's called when there
won't be any new messages for it.
consumer_timeout_ms is not a great solution either because it just
statically sets the lifetime of the KafkaConsumer object. The consumer just
closes on its own after that length of time even if you're actively pulling
messages through it with the iterator. You can set consumer_timeout_ms for
longer, but then you're just back to the same problem where the iterator
call is blocking forever until that timer runs out.
poll() is the only way I've found to put clean consumer closure under
programmer control. It lets you use the same consumer as long as you like
without reinitializing it, and then lets you shut it down very quickly as
well. My code, which poll()'s for maxrecords=1 and loops if no new
messages, is the closest you can currently get to a controllable iterator
that allows the programmer's code to end the request loop at will.
If I've somehow missed a command to shut down the iterator's internal
request loop please let me know.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#1268 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAzetLyJfOBylR22i705Cq2Nk6rWoKnWks5suDHfgaJpZM4P-d1F>
.
|
When I ran it, I got a "Closing connection" line in log, and then another "Closing connection" later when calling consumer.close(). Even if I'm misreading that, it still requires you to decide ahead of time when you're going to shut down the request loop. If you want to iterate indefinitely until an event signals a stop, you'll need to somehow have another thread come in and set consumer_timeout_ms=1 because the thread the iterator is in is blocked by the iterator. |
I played around with your code above and got it to work. I was wrong about consumer_timeout_ms closing the consumer. Thanks for your patience with me. I still think it's not ideal to need to nest 2 loops and to check the stop_event twice, but it does work. I haven't seen the iterator used with an outer while loop like this anywhere else. I think showing this structure in more of the examples would prevent a lot of headaches. I'll edit the pull request real quick... |
updated message request method per dpkp showed a way to get threading to work with the iterator which is more pythonic.
No problemo. I agree that it is not ideal. The way that that java consumer handles this is there is a consumer.wakeup() method that you can call from a different thread and it will cause the consumer to raise a WakeupException from a long-blocking poll(). We could implement something similar here, but it hasn't been a high priority. See #1148 for tracking that feature. |
Thanks for the PR! |
Added better control over threads/processes, making them shutdown cleanly before the program terminates. Switched from the iterator to poll() style to do it.
Should solve this issue: #1130