Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-184: Topic specific consumer priorityLevel #16481

Closed
nahguam opened this issue Jul 8, 2022 · 1 comment · Fixed by #16715
Closed

PIP-184: Topic specific consumer priorityLevel #16481

nahguam opened this issue Jul 8, 2022 · 1 comment · Fixed by #16715
Assignees
Labels

Comments

@nahguam
Copy link
Contributor

nahguam commented Jul 8, 2022

Motivation

The Pulsar Java consumer supports setting a priority level for priority message
dispatch in shared subscription consumers and priority assignment in failover
subscription consumers. See the ConsumerBuilder.html#priorityLevel(int)
Javadoc

for a detailed functional description. The Pulsar Java consumer also supports
consuming from multiple topics. However, it is not possible to set a different
priority level for different topics in the same Consumer instance.

This behaviour is desirable in some use cases. For example, a consumer
processing region specific topics might wish to configure region stickiness - A
multi-region application might be consuming from topics events-us-east-1 and
events-eu-west-1. Consumers in all regions should be configured to consume all
topics to ensure data completeness. However, to ensure low latency, the
us-east-1 consumer would need to set a higher priority level for the us-east-1
topic. Similarly, the eu-west-1 consumer would need to set a higher priority
level for the eu-west-1 topic.

Without the ability to configure different priority levels for different
topics, the developer would have to create multiple consumer instances and have
to manage the consumption from them in their application code. This feature
provides developer convenience to keep application code the same regardless of
whether this is a requirement or not.

Goal

Update the Java client API to allow the configuration of different priority
levels for different topics.

Do so in such a way that supports the addition of other topic specific
configuration options or overrides in the future.

Issues will be created to track feature parity in the other client
implementations for this PIP.

API Changes

In pulsar-client-api, update ConsumerBuilder to include two new methods:

public interface ConsumerBuilder<T> extends Cloneable {
    ...

    TopicConsumerBuilder<T> topicConfiguration(String topicName);
    
    ConsumerBuilder<T> topicConfiguration(String topicName, 
            java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);

    TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern);
    
    ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern, 
            java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
}

Create a new interface:

public interface TopicConsumerBuilder<T> {
    TopicConsumerBuilder<T> priorityLevel(int priorityLevel);

    ConsumerBuilder<T> build();
}

In pulsar-client-original, update ConsumerConfigurationData to include a new field:

@Data
public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
    ...

    private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();
}

Create a topic configuration class:

@Data
public class TopicConsumerConfigurationData implements Serializable {
    private static final long serialVersionUID = 1L;

    private TopicNameMatcher topicNameMatcher;
    private int priorityLevel;

    public interface TopicNameMatcher extends Serializable {
        boolean matches(String topicName);
    }
}

Then, in ConsumerImpl the appropriate topic configuration can be selected
based on the topic being subscribed to. Since the topic configuration is
effectively keyed by a topic name or pattern, it’s possible for the user to be
able configure multiple topic configurations that match the same concrete topic
name. In this case the first topic name match should be selected.

TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName, 
        ConsumerConfigurationData conf) {
    return topicConfigurations.stream()
        .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName))
        .findFirst()
        .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, conf));
}

Example Usage:

pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfiguration(Pattern.compile(".*us-east-1"), b -> b.priorityLevel(0))
    .subscribe();

or

pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfiguration(Pattern.compile(".*us-east-1"))
        .priorityLevel(0)
        .build()
    .subscribe();

Rejected Alternatives

  • Extend the existing ConsumerBuilder rather than introducing a nested, topic specific builder class.

Rejection reason: Does not provide a clear API to discover and extend other topic specific configuration options and overrides.

public interface ConsumerBuilder<T> extends Cloneable {
    ...

    ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int priorityLevel);
}

Example usage:

pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicPriorityLevel(".*us-east-1", 0)
    .subscribe();
  • Provide a configurator interface to configure options and overrides at runtime

Rejection reason: Not compatible with ConsumerBuilder.loadConf.

@Data 
class TopicConsumerConfigurationData {
    private int priorityLevel;
}
interface TopicConsumerConfigurator extends Serializable {
   void configure(String topicName, TopicConsumerConfigurationData topicConf);
}
public interface ConsumerBuilder<T> extends Cloneable {
    ...

    ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator configurator);
}

Example usage:

pulsarClient.newConsumer()
    .topicsPattern("events.*")
    .priorityLevel(1)
    .topicConfigurator((topicName, topicConf) -> {
        if (topicName.endsWith("us-east-1") {
            topicConf.setPriorityLevel(0);
        }
    })
    .subscribe();
nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 19, 2022
nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 21, 2022
nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 26, 2022
nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 26, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone Jul 27, 2022
@mattisonchao mattisonchao removed this from the 2.11.0 milestone Jul 27, 2022
nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 27, 2022
@nahguam
Copy link
Contributor Author

nahguam commented Jul 29, 2022

I've updated the design to keep the concrete topic names separate from the topic patterns in the API to avoid unwanted regex behaviour like like hyphens and illegal character ranges etc.

nahguam pushed a commit to nahguam/pulsar that referenced this issue Jul 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants