-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SinkContext: ability to seek/pause/resume consumer for a topic #10498
Conversation
...ons/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
Outdated
Show resolved
Hide resolved
@dlg99 can you also please explain why this is needed for the Pulsar IO Sink and Kafka Sink compatibility layer? |
@jerrypeng kafka-connect-adaptor implements some of the kafka interfaces. For the sink it is SinkTaskContext https://kafka.apache.org/23/javadoc/org/apache/kafka/connect/sink/SinkTaskContext.html - it has offset/pause/resume methods that need seek/pause/resume from the Sink context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, overall the approach is good to me.
either this approach or the proposal from @jerrypeng works for me.
...ons/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
Outdated
Show resolved
Hide resolved
...ns/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a simpler approach to referencing consumers created in PulsarSource from Context. Please see my comment.
@jerrypeng @eolivelli I did the changes that @jerrypeng requested, also figured that |
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (inputConsumers == null) { | ||
throw new PulsarClientException("Getting consumer is not supported"); | ||
} | ||
for (int i = 0; i < 2; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this for loop needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two attempts to get the consumer:
- Try to get the consumer.
- If not found, reprocess MultiTopicsConsumers in case new consumers appeared (happens on repartition or a new topic that matches provided pattern if the pattern is used),
- give it another try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like the right way to do this. Retrying twice seems arbitrary. I would rather let the user implement how many retries they want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng this is not arbitrary.
I have no access to callbacks/events when the MultiTopicsConsumers adds another partition or a topic (for pattern-based subscription). MultiTopicsConsumers do have that internally.
So if the check of previously cached topic/partition -> consumer map finds nothing it means that it:
- either doesn't exist
- or the topic/partition (and consumer) got added after the initial map was built.
In that case it makes sense to check MultiTopicsConsumers once to update the map and try again.
I don't think blocking and polling is justified, at least I don't have such usecase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlg99 if partitions / topics are added dynamically because of topic pattern match or increases in partition count, how would the function know when that happened and why would trying to the consumers twice help with that? There is an inherent race condition there. If can always argue there can be a scenario that topics/partitions got created after you called this method. If the goal of the method is to get new consumers for new topics / partitions then this API is not appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked offline about reasons it is needed.
I expressed the retry more clearly and added comments. Please take a look.
@jerrypeng do you have any additional feedback or is it good to go? |
Motivation
Allow sink to rewind a topic to given offset and pause/resume consumer for given topic
This is needed for #9927 https://github.com/apache/pulsar/pull/9927/files#r595722189
Modifications
SinkContext API:
New methods to seek/pause/resume
Matching implementations in ContextImpl
Added ExtendedSourceContext interface (not public) to provide access to the PulsarSource's consumers.
Updated ContextImpl and PulsarSource's implementations to provide this functionality.
Verifying this change
(Please pick either of the following options)
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesSinkContext added new methods, default implementation provided.
Documentation