-
-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka test hangs on "await subscriber.get()" #42
Comments
Maybe unrelated, but there's something strange about the subscribe/unsubscribe implementations in the Kafka backend: https://github.com/encode/broadcaster/blob/master/broadcaster/_backends/kafka.py Essentially when we subscribe to a new channel, we add it to a set of channels, then get the Kafka consumer to subscribe to all those channels (topics). And when we unsubscribe from a specific channel, we unsubscribe, period. I wonder if this can cause issues when subscribing from two or more channels independently. Probably not related to the test though since we only subscribe to a si single "chatroom" channel there. Anyway, one way to narrow it down would be to look if it hangs on getting an item from the internal subscriber queue, or perhaps more specifically waiting for the "getone()" operation on the Kafka consumer to return an item. In the latter case it might be that the topic hasn't actually received the message, ie that the publish() operation didn't work correctly. If I remember correctly Kafka producers typically have an internal buffer that they don't necessarily flush right away (Kafka was designed for high volume streaming use cases). Perhaps we need to force-flush that, or something? |
Some experimentation result mentioned in #44 (comment) |
* group together fixtures for backend testing * Add per-backend setup to allow slow initialization. This is specially important as kafka subscribe can return while the consumer is not ready. See related test setup upstream: - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/test_consumer.py#L433 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L376 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L364 fixes encode#42
We now have tests suite courtesy of #11, but test for Kafka backend hands indefinitely on
await subscriber.get()
, causing tests suite to timeout:broadcaster/tests/test_broadcast.py
Line 43 in 435c35e
Sadly, I'm out of my depth here, and I'm unable to diagnose if this is problem with Kafka backend or test setup.
Help is welcome.
The text was updated successfully, but these errors were encountered: