-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
consumer.receive will only process events at interval of ~2 seconds. How to increase rate? #25977
Comments
We have changed our test consumer to use the async version similar to this sample and found that it works as expected receiving hundreds of events within seconds instead of processing 1 event for 2 seconds. To me this does indicate a limitation with sync version. |
Thank you @mattmazzola for the detailed bug report and video showing the issue. I just had a few clarification questions:
I will update you as soon as I have information on this. Thanks |
Yes
No The creation of clients was using this snippet producer = EventHubProducerClient.from_connection_string(publish_subscribe_connection_string)
consumer = EventHubConsumerClient.from_connection_string(
conn_str=publish_subscribe_connection_string,
consumer_group=event_hub_consumer_group,
eventhub_name=event_hub_name
)
I have not tested this. |
Thank you for the info @mattmazzola . Can you also share the version of uamqp that is installed in your environment. |
Hi @mattmazzola - A few more follow-up questions:
|
Yes
I also haven't this but might get around to testing this when I test version 5.9 of library per suggestion above. Our team was delayed attempting various changes to diagnose this issue, write the testing tools to isolate the problem, and produce the video. Now that we have a solution using the async version, we are trying to catch up. Unlikely we would get to this quickly |
@mattmazzola - No problem. Appreciate you bringing this bug to our attention and apologies for the inconvenience! A few last questions to help us get to the bottom of this:
Thank you! |
The dotnet testing tool uses a slightly different testing pattern than the python since it has more advanced features with process and cancelation tokens. The dotnet sequence is this:
This means the consumer can use default/latest and it would still receive the events. The dotnet tool is actually available to see here: The python testing tool sequence is reversed since the subscribe methods block which means we can't start processing and then also execute other code without much more advanced threads or something.
The 10 parameter in the video was for limit size of sent batches not for limiting receiving. For receive it's one event at a time using the |
Hi, I wanted to provide some new information from our usage and also ask if there is any status update from the team maintaining this library. Background:1. sync consumerOriginally we were using this style of consumption: with self._consumer:
self._consumer.receive(
on_event=process_deserialized_event,
max_wait_time=None,
starting_position="@latest"
) This has the ~2 second delay issue because it taking significant time to rotate through each of the 32 partitions and receive events from the 1 partition that has them. 2. Async consumerWe changed to use the async version of library and this seemed to resolve issues async with self._consumer:
await self._consumer.receive(
on_event=process_raw_event,
max_wait_time=None,
starting_position="@latest"
) Producer Issues1. Basic sync ProducerWe had started using this basic pattern for publishing events self._producer = EventHubProducerClient.from_connection_string(publish_subscribe_connection_string)
...
self._producer.send_batch(event_batch) We noticed a similar issue where the producer would not publish events immediately. It would also take around ~2 sec between publishing. (We suspect a similar issue with partitions but are not sure) 2. Buffered mode, very small max_wait_timeWe changed the producer to user buffered mode with a very small self.producer = EventHubProducerClient.from_connection_string(
publish_subscribe_connection_string,
buffered_mode=True,
max_wait_time=0.001,
on_success=(lambda _, __: None),
on_error=(lambda _, __, e: _LOGGER.error("Error in producer: " + str(e))),
) This unblocked us but as we continued to add more features and other computations we saw significant slow downs of the publishing of events. 3. Separate ProcessesWe ended up having to move the EH consumer and produces into their own process that only transfer events two and from queues to share the events between processes. This allows the consumer and producer to run at different rates self.__outgoing_event_queue: Queue[EventData] = Queue()
self.__incoming_event_queue: Queue[EventData] = Queue()
self.consumer_process = MessageConsumer(
queue=self.__incoming_event_queue,
publish_subscribe_connection_string=publish_subscribe_connection_string,
consumer_group=consumer_group,
event_hub_name=event_hub_name,
)
self.consumer_process.start()
self.producer_process = MessageProducer(
queue=self.__outgoing_event_queue,
publish_subscribe_connection_string=publish_subscribe_connection_string,
event_hub_name=event_hub_name,
)
self.producer_process.start() FeedbackI think the amount of work we had to do to troubleshoot this library and get it to work correctly for our usage is too high. I think most people trying to work with the library might give up or use an alternative communication stack if they didn't have other reasons to use EventHub. I wanted to know if the team maintaining this library can provide any status update on investigation into this issue. Suggestion for DocumentationI think there could be updates to the documentation to warn users about these potential problems and inform them about the workarounds we have done in the examples above so they don't have to discover and solve the same issues we did. Also, there may be people who are using the library who have an existing performance issue due to the conditions similar to ours but yet are not aware and the update to documentation could at least alert them to the known problems. Mention: @tupini07 so he also gets updates. Interested to hear your thoughts. |
Hi Matt, Thank for the detailed feedback and apologies for the late reply. I had a chance to look back at your video and try a couple of things on my side to try to explain, remedy or even repro the scenarios described. ConsumerI noticed in the video that all the events were being consumed from the same partition ( #7 ) . Scrubbing the video back a little I also saw that the 100 events were sent using Here's what was happening:
If a large number partition EH is going to be used and only one partition will be populated it is better to tie the consumer to that one particular partition so that there is no spinning around the other empty ones especially if the sync consumer is being used. If over time that EH will be populated evenly amongst the partitions, then it will speed through as there will be events in every partition. ProducerThis scenario is a little bit more confusing. Simply sending, whether a single event or a batch is a matter of handing the events over to the service and the service deciding how to route them between partitions etc. The producer client doesn't have to go in sequence populating partitions like its consumer counterpart. Whether an EH has 1 partition or 32, it shouldn't affect the sending speed as the bulk of the work is not done by it. On my side I wasn't able to reproduce this at all, in fact I managed to consistently send 3 million events in less than 3 seconds as a batch. Buffered Producer is a little bit more CPU intensive by design as it has multiple queues that stores events per partition, which are cleared using background threads when certain criteria are met. It also favors using A few questions:
With that said we have just released a new version of the SDK, which uses a new AMQP stack written in python. We also have increased throughput and some other perf gains. Would really like to see you use it and get any sort of feedback as it would help us a lot in improving this new stack. |
Thanks for looking into this more and providing an update. Unfortunately, our team has moved on from the project that used event hub and we won't be able to invest much time to this anymore. I will try to answer some of the questions.
I wanted to give more background on setup which may give reason for our design choices here. Our situation may have been more rare than I originally thought. We have different sets of events that needed order guaranteed so each different set (we call game) was put in different partition. The server was written in Java and had responsibility to observe/consume an unbounded number of sets/games [0,N] For this, we maxed the standard tier partitions at 32 to have the most parallelism. This created a type of asymmetry / mismatch. The design choices of maxing partitions of EH for the server consumer, means the other clients which usually only care about 1 or 2 partitions suffered performance issues.
I can't remember. Although I believe we tried every combination the SDK supported and the issue persisted before we went to our own custom threading model
I don't think we tried this since we always had 32 partitions
Yes, there were significant activities.
Cool! If we ever go back to this project or work with EventHub again we'll try it out. Given we likely won't be active on the thread anymore and you have released a new version of SDK perhaps this issue can be closed. I won't be able to confirm if the issue is actually resolved, but I assume people could re-open if they observe it. |
Thank you @mattmazzola and again apologies for the late response. I might ping you on teams to learn a bit more about the design so that we can try to emulate it on our stress and performance frameworks that have been implemented since that time. Ill go ahead and close the issue, but if it arises again we can use this going forward. |
azure-eventhub
"^5.10.0"
Windows 11
Python 3.10.6
Describe the bug
I am trying to diagnose a latency issue when subscribing to EvenHub using python sdk. It is only processing received events at a rate interval of ~2 seconds (which is very suspicious it is some default configuration value)
I am using the
consumer.receive
method and seem to have follow the samples very closely.I have create two similar testing utilities and it seems for the same EvenHub the dotnet sdk works as expected, sends and receives hundreds of events in a few seconds. The python can publish /send large amounts of events on 1 or 2 seconds but when receiving it takes 1 or 2 seconds PER EVENT which makes it unusable for practical use. At this rate it would take so long to process all the events, they would expire before being processed.
I created a video to make it more clear
Video
Given the dotnet works and python doesnt for the same EventHub and Consumer Group, I see two results:
Hopefully someone can help identify option 1 since that is much easier for fix.
To Reproduce
Steps to reproduce the behavior:
azure-eventhub
library using the synchronous receive pattern.Expected behavior
Expected to receive ALL events available to the consumer group in very short amount of time (hundreds of events should be able to be processed within few seconds)
Screenshots
If applicable, add screenshots to help explain your problem.
See the video
Additional context
N/A
The text was updated successfully, but these errors were encountered: