Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-359: Support custom message listener executor for specific subscription #22902

Merged
merged 10 commits into from
Jul 16, 2024

Conversation

AuroraTwinkle
Copy link
Contributor

@AuroraTwinkle AuroraTwinkle commented Jun 13, 2024

Motivation

PIP-359
Implementation PR:#22861

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@asafm
Copy link
Contributor

asafm commented Jun 14, 2024

Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented Jun 14, 2024

Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client

Yeah, the solution you mentioned can also solve this problem. But on the contrary, I think providing a unified builder configuration would be easier to use.

@asafm
Copy link
Contributor

asafm commented Jun 14, 2024

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained.
In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

@liangyepianzhou
Copy link
Contributor

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained.
In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

You make a very valid point. We do indeed need a plugin library that allows contributors to customize Pulsar according to their own needs without introducing more complexity into Pulsar. Do you have the opportunity to promote this matter? If so, I think we can continue to push forward with this. And take this proposal as the first test case.

Otherwise, it might be better to first promote this optimization, so that those who need it can start using it.

I am very interested in the plugin library you mentioned, and perhaps later we can work together to promote its implementation in Pulsar. However, it would be unwise to block other optimizations from entering before that.

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented Jun 15, 2024

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

Great idea! Are you interested in building and promoting a similar contrib repo for Pulsar? This PIP may be a good opportunity to do this, and we can build it together. @asafm @liangyepianzhou

@asafm
Copy link
Contributor

asafm commented Jun 15, 2024

I'm currently working on my free time on pulsar as in my new company they don't even use pulsar.
I can help review a pip to open such a repo. You can read OTel policy on how they handle it and use this as the basis for the contrib repo PIP.
If you are also on free time we can try to work on this together but it'll be a lower pace

@liangyepianzhou
Copy link
Contributor

We also participate in this work in our spare time, and progress is indeed slower. However, if we can achieve something significant together, it would be a very cool thing.

@liangyepianzhou
Copy link
Contributor

liangyepianzhou commented Jun 17, 2024

@asafm Since we all agree that the plugin library is still a long way from being truly realized, it's best that we do not hinder the entry of other optimizations before that time. It would be best to let it continue along its original process, allowing those who need it to use this feature first.We can migrate some functionalities after the plugin library is completed.
What do you think about it?

@asafm
Copy link
Contributor

asafm commented Jun 18, 2024

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about.
If I summarize it:
When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool).
The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener :
.messageListener(myMessageListener)

I suggest we expand it to:
.messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

@AuroraTwinkle
Copy link
Contributor Author

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener)

I suggest we expand it to: .messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

Okay, good suggestions! I will update it later as suggested.Thank you all @asafm @liangyepianzhou

@AuroraTwinkle
Copy link
Contributor Author

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener)

I suggest we expand it to: .messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

I have made the changes to the PIP as suggested, PTAL @asafm @liangyepianzhou Thanks!

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented Jun 18, 2024

I support the motivation, make MessageListener executor isolate is indeed a good point. But I hold a conservative attitude towards the implementation.

I have explained why we cannot use simply java.util.concurrent.ExecutorService at #22902 (comment), do you agree with this opinion?If not, please provide more detailed suggestions and I will try my best to make it better.

@dao-jun
Copy link
Member

dao-jun commented Jun 18, 2024

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right?
Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

@dao-jun
Copy link
Member

dao-jun commented Jun 18, 2024

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

Or

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
    ExecutorService selectOrdering(Message<?> message);
}

If the subscriptionType is KeyShared, call selectOrdering, others call select;

@AuroraTwinkle
Copy link
Contributor Author

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

For the first point, I don't see any better point than now, on the contrary, the interface you mentioned has shortcomings:
The select method returns an ExecutorService instance, which may contain any number of threads, and the order is still not guaranteed.

For the second point, refer to the previous discussion:#22861 (comment)

@AuroraTwinkle
Copy link
Contributor Author

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.
I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.
Others is OK

Or

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
    ExecutorService selectOrdering(Message<?> message);
}

If the subscriptionType is KeyShared, call selectOrdering, others call select;

Also see:#22902 (comment)

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented Jun 18, 2024

Copy link

@AuroraTwinkle Please select only one documentation label in your PR description.

pip/pip-359.md Outdated Show resolved Hide resolved
pip/pip-359.md Show resolved Hide resolved
@asafm
Copy link
Contributor

asafm commented Jun 19, 2024

I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.

ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

This is basically to "spill the guts/internals" of the way the customer is implemented.
This is super hard to undetrstand for a normal user.
I haven't seen anything like it in Kafka.
Have you guys looked at how other messaging are doing it?

The reason I'm asking is that I think it's too complicated.
I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.

You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.

@lhotari
Copy link
Member

lhotari commented Jun 19, 2024

I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.

ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

This is basically to "spill the guts/internals" of the way the customer is implemented. This is super hard to undetrstand for a normal user. I haven't seen anything like it in Kafka. Have you guys looked at how other messaging are doing it?

@asafm I believe that this concern could be handled by improving the explanation of the concept and abstraction that this exposes. I agree that it's not a good practice to add abstractions that are implementation specific. This abstraction isn't implementation specific, but the problem is that 95% of the users don't need it. In a multi-topic consumer when a MessageListener is used, there might be a requirement to have different priorities how the messages for a specific topic are processed. It is up to the implementer how to implement this logic.

The reason I'm asking is that I think it's too complicated. I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.

That's a valid point. I think that we would need to come up with a way to add extensions to the Pulsar client without cluttering the basic use cases. 95% of Pulsar users will never need this messageListenerExecutor, but we'd like to make it possible to do this for the remaining 5% of users. Perhaps we should prioritize finding the way to add such extensions to the Pulsar client. .

You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.

It's great to see that this PIP has interest. Even if someone is accepted with a vote doesn't mean that we cannot make changes later before releasing the feature. In this case, there's a clear need to solve the multi-topic consumer queuing problem when using a MessageListener. We have to find a way to address that problem.

@lhotari
Copy link
Member

lhotari commented Jun 19, 2024

One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.

In this case, there could be this type of method on the ConsumerBuilder interface:

<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);

then these interfaces:

interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> {
      MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
}
interface ConsumerBuilderExtention {
    ConsumerBuilder<T> parent();
}

then it would be possible to call
.extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented Jun 19, 2024

One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.

In this case, there could be this type of method on the ConsumerBuilder interface:

<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);

then these interfaces:

interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> {
      MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
}
interface ConsumerBuilderExtention {
    ConsumerBuilder<T> parent();
}

then it would be possible to call .extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

How do you think about this? @asafm @liangyepianzhou

@liangyepianzhou
Copy link
Contributor

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

Hi @lhotari, this is a truly innovative concept that showcases a profound level of technical proficiency and expertise. However, as you've mentioned, it does seem overly complex.

I suggest we revert to the initial design approach; I'm convinced that the first iteration of the API design was the most succinct and user-friendly. What are your thoughts? cc @asafm @AuroraTwinkle

  pulsarClient.newConsumer().topic("topic").subscriptionName("subName")
          .messageListener((consumer, msg) -> {}, new ExecutorProvider(10, "your_poolName"))
          .subscribe();

@liangyepianzhou
Copy link
Contributor

The pip is approved with 5 binding votes and 1 non-binding vote and stay more than 72 hours. So merge it.

@liangyepianzhou liangyepianzhou merged commit 5c6602c into apache:master Jul 16, 2024
20 checks passed
liangyepianzhou pushed a commit that referenced this pull request Aug 5, 2024
… specific subscription (#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist
  
  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the 
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. 

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->
Denovo1998 pushed a commit to Denovo1998/pulsar that referenced this pull request Aug 17, 2024
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist
  
  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the 
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. 

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->
liangyepianzhou pushed a commit that referenced this pull request Sep 2, 2024
… specific subscription (#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist

  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.

  - Each pull request should address only one issue, not mix up code from multiple issues.

  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->

(cherry picked from commit 10f4e02)
grssam pushed a commit to grssam/pulsar that referenced this pull request Sep 4, 2024
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist
  
  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the 
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. 

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 5, 2024
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist

  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.

  - Each pull request should address only one issue, not mix up code from multiple issues.

  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->

(cherry picked from commit 10f4e02)
(cherry picked from commit c5846bb)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 5, 2024
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <duanlinllin@xiaohongshu.com>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist

  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.

  - Each pull request should address only one issue, not mix up code from multiple issues.

  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->

(cherry picked from commit 10f4e02)
(cherry picked from commit c5846bb)
@lhotari lhotari added this to the 4.0.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants