Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question on EPH vs ECC #8873

Closed
kwontae opened this issue May 12, 2020 · 16 comments
Closed

Question on EPH vs ECC #8873

kwontae opened this issue May 12, 2020 · 16 comments
Assignees
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Docs Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Milestone

Comments

@kwontae
Copy link

kwontae commented May 12, 2020

I have a question regarding the differences between EPH and Event hub consumer client.
I see on the latest official documentation of azure event hub sdk for node js version 5 is recommending Event hub consumer client with no mentioning of EPH. Is the ECC a wrapper around EPH? or is there a different use case scenario?

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels May 12, 2020
@ramya-rao-a
Copy link
Contributor

EventHubConsumerClient replaces EPH.
Our migration guide has details around this.

We could make it more clearer by calling this out in the readme itself, we will make a note of that.

Please go though the above migration guide and let us know if it helps you in migrating from EPH to EventHubConsumerClient.

We are open to hearing any feedback you have to make this simpler

@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label May 12, 2020
@ramya-rao-a ramya-rao-a added this to the [2020] June milestone May 12, 2020
@ramya-rao-a ramya-rao-a self-assigned this May 12, 2020
@kwontae
Copy link
Author

kwontae commented May 19, 2020

Will do! Thank you

@kwontae
Copy link
Author

kwontae commented May 19, 2020

Another question. I found that with the consumer client, it takes a few minutes until the messages are consumed. I've followed the example at the very bottom of the migration doc and it seems like at the subscribing & processing events section it takes a while until the events are processed. Any thoughts?

@ramya-rao-a ramya-rao-a assigned chradek and unassigned ramya-rao-a May 19, 2020
@ramya-rao-a
Copy link
Contributor

@chradek should be able to answer your question

Meanwhile, can you share which overload of subscribe() method did you use? The one that takes a partitionId or the one that gives you events from all partitions? How many partitions do you have?

@kwontae
Copy link
Author

kwontae commented May 19, 2020

We are using the subscribe method that gives events from all partitions. 32 partitions and currently just single processor application. I know that from my experience with python sdk, load balancing partitions takes a few seconds, but never came across this cold start time prior to start of receiving events

@kwontae
Copy link
Author

kwontae commented May 19, 2020

We are also not setting a batch size and I think the default is set to 1?
@chradek

@chradek
Copy link
Contributor

chradek commented May 19, 2020

@kwontae Are you using a CheckpointStore when instantiating your EventHubConsumerClient? The EventHubConsumerClient should attempt to grab all partitions as fast as it can if there is no checkpointstore provided, but will claim one at a time every ~10 seconds otherwise.

We are investigating how to expose better control over how quickly an EventHubConsumerClient can ramp up with claiming partitions in the case where a checkpointstore is provided. I believe Python may let you adjust some of these options already.

@kwontae
Copy link
Author

kwontae commented May 19, 2020

Yes we are using a CheckpointStore

@kwontae
Copy link
Author

kwontae commented May 19, 2020

So the theory is that because we are using CheckpointStore, and the messages are being pushed to a partition 32 so it takes about ~320 seconds to claim?

@chradek
Copy link
Contributor

chradek commented May 19, 2020

@kwontae Correct. Currently if you had a single EventHubConsumerClient, and 32 partitions, it should take ~320 seconds to claim all partitions. You should start receiving events sooner than that though, since as soon as a partition is claimed (every ~10 seconds) you can start receiving events from that partition.

@kwontae
Copy link
Author

kwontae commented May 20, 2020

Hmm yeah. that's what I'm expecting as well, but doesn't seem to be the case here... So a few more things.

  1. Do I need to have a promise like in the example so it closes after a certain timeout?
  2. Process event is triggered when there is an incoming event to be ingested therefore the events in the example for(const event of events) will never be empty when triggered?
  3. Do we not have to set the startPosition if we are using the checkpointStore?

@kwontae
Copy link
Author

kwontae commented May 20, 2020

Getting this error when I run the example without the promise statement.
image

@kwontae
Copy link
Author

kwontae commented May 20, 2020

I see the issue occurs when it is claiming a partition without an event... which makes sense because if the events [] is empty, there won't be an event with the sequence number property therefore it throws an error when checkpointing. My understanding was that processEvent is only triggered when there is an event available for consumption upon partition claim.

@chradek
Copy link
Contributor

chradek commented May 20, 2020

@kwontae

Do I need to have a promise like in the example so it closes after a certain timeout?

I think you're referring to this piece:

// after 30 seconds, stop processing
await new Promise((resolve) => {
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
resolve();
}, 30000);
});
}

No, you don't need to close the subscription/client unless you don't want to receive any new messages. For the purpose of the sample, that's just in place to show that you can stop receiving messages (and to let the sample exit).

Process event is triggered when there is an incoming event to be ingested therefore the events in the example for(const event of events) will never be empty when triggered?

The events can be empty if there weren't any events received within the maxWaitTimeInSeconds
So for example, let's say you were receiving events from partitions 0, 1, and 2. Let's also say that partitions 0 and 1 are continuously getting new messages, but partition 2 does not. You'll still see processEvents called for partition 2 every 60 seconds if there are no events. You can think of this as sort of a heartbeat that lets you know that the consumer client is still trying to receive events from the partition, even if no events are available.

You can increase the maxWaitTimeInSeconds to a very large value if you'd rather see processEvents called with an empty array less often: client.subscribe(handlers, { maxWaitTimeInSeconds: 3600 });

Do we not have to set the startPosition if we are using the checkpointStore?

The startPosition is used when a checkpoint does not already exist for a given partition. By default, the consumer client will use the startPosition latest, which means it will receive any events that are sent to Event Hubs after the subscribe call has started listening for events from the partition.

Since you're using the storage blob checkpointstore, when the client starts receiving messages from a partition, it first checks if storage blob has a checkpoint for that partition and if it does, it starts receiving events where the checkpoint left off. If the partition does not have a checkpoint, it will receive any new events that are sent to that partition (unless you specify a different startPosition).

@kwontae
Copy link
Author

kwontae commented May 20, 2020

The events can be empty if there weren't any events received within the maxWaitTimeInSeconds
So for example, let's say you were receiving events from partitions 0, 1, and 2. Let's also say that partitions 0 and 1 are continuously getting new messages, but partition 2 does not. You'll still see processEvents called for partition 2 every 60 seconds if there are no events. You can think of this as sort of a heartbeat that lets you know that the consumer client is still trying to receive events from the partition, even if no events are available.

You can increase the maxWaitTimeInSeconds to a very large value if you'd rather see processEvents called with an empty array less often: client.subscribe(handlers, { maxWaitTimeInSeconds: 3600 });

So in the example of using this with a checkpoint store, i see that the example has a try catch block where we try to updateCheckpoint(events[events.length - 1]) This would be an issue if this is called with an empty events. We should probably have an if else statement to catch this edge case so it doesn't call updateCheckpoint on an empty events array. Shouldn't this be captured in the Doc?

@chradek
Copy link
Contributor

chradek commented Jun 8, 2020

@kwontae
We've updated the sample and the documentation for processEvents to make it clearer that events can be an empty array, and why that can happen as part of #9340

Thanks for pointing out how we could improve our docs! I'll close this issue since I believe we've addressed the original questions, but feel free to comment or open a new issue if there's more you want to discuss.

@chradek chradek closed this as completed Jun 8, 2020
@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Docs Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

3 participants