Skip to content

Latest commit

 

History

History
216 lines (186 loc) · 9.9 KB

pip-359.md

File metadata and controls

216 lines (186 loc) · 9.9 KB

PIP-359: Support custom message listener executor for specific subscription

Implementation PR: #22861

Background knowledge

In the current Pulsar client versions, from the user's perspective, when using a Pulsar Consumer, we have two main options to consume messages:

  1. Pull mode, by calling consumer.receive()(or consumer.receiveAsync())
public class ConsumerExample {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription")
                .subscribe();
        do {
            Message<Long> message = consumer.receive();
            consumer.acknowledge(message);
        } while (true);

    }
}
  1. Push mode, by registering a MessageListener interface, when building the Consumer. When this method is used, we can't also use consumer.receive()(or consumer.receiveAsync()). In the push mode, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when we build and use multiple Consumers from the same PulsarClient. It so happens that those consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.
public class ConsumerExample {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                })
                .subscribe();
    }
}

Motivation

As Background knowledge mentioned, when using asynchronous consumer, by registering a MessageListener interface, there is a problem of different consumer groups affecting each other, leading to unnecessary consumption delays. Therefore, for this scenario, this PIP prepare to support specific a message listener executor of consumer latitudes to solve such problem.

Goals

  1. Improve consumer message listener isolation, solve the consumption delay problem caused by mutual influence of different consumers from the same PulsarClient instance.

In Scope

If this PIP is accepted, it will help Pulsar solve the problem of different consumers from same PulsarClient affecting each other in the asynchronous consumption mode(MessageListener).

Out of Scope

This PIP will not build the plugin library mentioned in PR, we will open a new PIP in the future to do this

Detailed Design

Design & Implementation Details

  1. Add an interface MessageListenerExecutor, responsible for executing message listener callback tasks. Users can customize the implementation to determine in which thread the message listener task is executed. For example, in the situation described in Motivation part, users can implement the interface with an independent underlying thread pool to ensure that the message listener task of each consumer is executed in a separate thread. The caller would be responsible for the life cycle of the Executor, and it would be used only for this specific consumer.
    public interface MessageListenerExecutor {
    
    /**
     * select a thread by message(if necessary, for example, 
     * Key_Shared SubscriptionType, maybe need select thread 
     * by message order key to ensure order) to execute the runnable!
     *
     * @param message  the message
     * @param runnable the runnable to execute
     */
    void execute(Message<?> message, Runnable runnable);
    }
  2. Add an optional config messageListenerExecutor in ConsumerBuilder, then users can pass their implementations.
    ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

Why need an interface like MessageListenerExecutor

Some people may wonder why not just use java.util.concurrent.ExecutorService, but define an interface like MessageListenerExecutor.

The reason is that:

For sequential consumption scenarios, we need to ensure that messages with the same key or the same partition are processed by the same thread to ensure order. If we use java.util.concurrent.ExecutorService, refer to the following figure, we will not be able to make such guarantees, because for ExecutorService, which thread to execute the task is not controlled by the user.

Interface implementation suggestions

When implementing the MessageListenerExecutor interface, you should consider the following points.

  1. if you need to ensure the order of message processing, you can select the thread by the message order key or msg.getTopicName()(partition topic name), to ensure that the messages of the same order key (or partition) are processed in same thread.

Usage Example

    private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException {
    // for example: key_shared
    MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName);
    Consumer<Long> keySharedconsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // set and then message lister will be executed in the executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor(keySharedExecutor)
                    .subscribe();


    // for example: partition_ordered
    MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName);
    Consumer<Long> partitionOrderedConsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // set and then message lister will be executed in the executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor(partitionOrderedExecutor)
                    .subscribe();

    // for example: out-of-order
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Consumer<Long> outOfOrderConsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // not set and then message lister will be executed in the default executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor((message, runnable) -> executorService.execute(runnable))
                    .subscribe();
}

private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) {
    ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");

    return (message, runnable) -> {
        byte[] key = "".getBytes(StandardCharsets.UTF_8);
        if (message.hasKey()) {
            key = message.getKeyBytes();
        } else if (message.hasOrderingKey()) {
            key = message.getOrderingKey();
        }
        // select a thread by message key to execute the runnable!
        // that say, the message listener task with same order key
        // will be executed by the same thread
        ExecutorService executorService = executorProvider.getExecutor(key);
        // executorService is a SingleThreadExecutor
        executorService.execute(runnable);
    };
}

private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) {
    ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");

    return (message, runnable) -> {
        // select a thread by partition topic name to execute the runnable!
        // that say, the message listener task from the same partition topic
        // will be executed by the same thread
        ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes());
        // executorService is a SingleThreadExecutor
        executorService.execute(runnable);
    };
}

Public-facing Changes

Public API

  1. Add an optional config messageListenerExecutor in ConsumerBuilder
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

Backward & Forward Compatibility

You can do upgrading or reverting normally, no specified steps are needed to do.

Links