Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-247: Notifications for partitions update #19596

Open
AnonHxy opened this issue Feb 22, 2023 · 3 comments
Open

PIP-247: Notifications for partitions update #19596

AnonHxy opened this issue Feb 22, 2023 · 3 comments
Assignees

Comments

@AnonHxy
Copy link
Contributor

AnonHxy commented Feb 22, 2023

Discussion thread: https://lists.apache.org/thread/bcry0cz4z7kzot8pc4nhbktfv44xrk2y
Vote thread: https://lists.apache.org/thread/rgkxfw9w6frv89lxmjh9kytz8jmnrd3r

Motivation

Pulsar client will poll brokers at fix time for checking the partitions update if we publish/subscribe the partitioned topics with autoUpdatePartitions as true. This causes unnecessary load for both clients and brokers since most of the time the number of partitions will not change. In addition polling introduces latency in partitions update which is specified by autoUpdatePartitionsInterval.

This PIP would like to introduce a notification mechanism for partition update, which is much like PIP-145 for regex subscriptions #14505. But it's different from PIP-145 that if this feature is enable the poll task will not run.

Goal

This PIP proposes the following change to improve performance:

  • clients will be able to register with brokers as observers of the concerned topics partition update. Brokers will send events to clients whenever there's a change of the number of partition update.

To help compatibility of new clients with older brokers, a new feature flag will be introduced in client for this feature. That means only new clients could be able to register with brokers as observers.

API Changes

Protocol Changes

  • Clients can register as partiton update observers by sending the command CommandWatchPartitionUpdate:
message TopicWithPartition {
    required string topic        = 1;
    required uint32 partition    = 2;
}
message CommandWatchPartitionUpdate {
    required uint64 request_id     = 1;
    required uint64 watcher_id     = 2;
    repeated string topics         = 3;
}
  • Brokers will respond with a success message containing the watcher ID, and the concerned topics with the number of the partitons.
message CommandWatchPartitionUpdateSuccess {
    required uint64 request_id     = 1;
    required uint64 watcher_id     = 2;
    repeated TopicWithPartition topic_with_partition     = 3;
}
  • When concerned topics partition update, the broker sends all the concerned topics of the watcher with the number of the partitions. The version monotonically increases every time update.
message CommandPartitionUpdate {
    required uint64 version        = 1;
    required uint64 watcher_id     = 2;
    repeated TopicWithPartition topic_with_partition     = 3;
}
  • When the client calls PartitionsUpdateListener success, it will send CommandPartitionUpdateResult to broker.
message CommandPartitionUpdateResult {
    required uint64 version        = 1;
    required bool success          = 2;
    required uint64 watcher_id     = 3;
}
  • Clients will send CommandWatchPartitionUpdateClose to unregister observers when closing
message CommandWatchPartitionUpdateClose {
    required uint64 request_id     = 1;
    required uint64 watcher_id     = 2;
}
  • When a client connects to a broker it is notified if the broker supports partition updated watchers. If not, client will not send CommandWatchPartitionUpdate message and continues to rely on polling.
message FeatureFlags {
  optional bool supports_auth_refresh = 1 [default = false];
  optional bool supports_broker_entry_metadata = 2 [default = false];
  optional bool supports_partial_producer = 3 [default = false];
  optional bool supports_topic_watchers = 4 [default = false];
  optional bool supports_partition_update_watchers = 5 [default = false];
}

message CommandConnected {
    required string server_version = 1;
    optional int32 protocol_version = 2 [default = 0];
    optional int32 max_message_size = 3;
    optional FeatureFlags feature_flags = 4;
}
  • Configuration Changes

    • client side
      In case this feature impacts user's systems in an unwanted way, a new producer configuration property ProducerConfigurationData#enableNotificationForPartitionUpdate and consumer config property ConsumerConfigurationData#enableNotificationForPartitionUpdate will be added with default value true. Setting this to false will disable the feature.
    • broker side
      A new broker configuration property enableNotificationForPartitionUpdate will be added with default value true. Setting this to false will disable the feature.
      A new broker configuration property notificationForPartitionUpdateTimeoutSeconds will be added with default value 10.
  • Interface Changes
    A new listener in client will be introduced to notify the partition update action.

public interface PartitionsUpdateListener {
    CompletableFuture<Void> onTopicPartitionsCountChanged(List<String> topics, List<Integer> partitions);
}

Here we didn't use the old listener org.apache.pulsar.client.impl.PartitionsChangedListener, because it only has one topics parameter. We will not change the actions if this feature is not enable. And we will mark "PartitionsChangedListener" deprecated

public interface PartitionsChangedListener {
    CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended);
}

Implementation

Notifications

If this feature is enable in clients, when PartitionedProducerImpl or MultiTopicsConsumerImpl initializes, the client will create such a watcher, called org.apache.pulsar.client.impl.PartitionUpdateWatcher, and send CommandWatchPartitionUpdate to broker to register as an observer.

A new class, org.apache.pulsar.PartitonUpdateWatcherService will keep track of watchers and will listen to the changes in the metadata. Whenever a topic partition updates it checks if any watchers should be notified and sends an update for all topics the watcher concerns through the ServerCnx.  Then we will record this request into a map, PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, Pair<version, long/*timestamp*/>>.  A timer will check this update timeout through inFlightUpdate.  We will query all the concerned topics's partition if this watcher has sent an update timeout and will resend it.

The client acks CommandPartitionUpdateResult to broker when it finishes updating.  The broker handle CommandPartitionUpdateResult request:

  • If CommandPartitionUpdateResult#version < PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, broker ignores this ack.
  • If CommandPartitionUpdateResult#version == PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
    • If CommandPartitionUpdateResult#success is true, broker just removes the watcherID from inFlightUpdate.
    • If CommandPartitionUpdateResult#success is false, broker removes the watcherId from inFlightUpdate , and queries all the concerned topics's partition and resend.
  • If CommandPartitionUpdateResult#version > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this should not happen.

To prevent memory leaks, all watchers will be removed from the PartitonUpdateWatcherService when the ServerCnx's channel becomes inactive.

Of course, the poll task will not start if this feature is enable

Edge cases

  • Broker restarts or crashes
    Client will reconnect to another broker, broker responses CommandWatchPartitionUpdateSuccess with watcher concerned topics's partitions. We will call PartitionsUpdateListener if the connection opens.
  • Client acks fail or timeout
    Broker will resend the watcher concerned topics's partitions either client acks fail or acks timeout.
  • Partition updates before client acks.
    CommandPartitionUpdate#version monotonically increases every time it is updated. If Partition updates before client acks, a greater version will be put into PartitonUpdateWatcherService#inFlightUpdate. The previous acks will be ignored because the version is less than the current version.
  • Endless retries
    We will remove the watcher from broker if channelInactive invoked like PIP-145. Endless retries should not happen if we do so

Compatibility

  • Old clients with new servers
    Old clients will just using the poll task to fetch partitions at fixed time, which is much like new clients with this feature disable.

  • New clients with old servers
    When new clients connect to broker, the supports_partition_update_watchers will be false by default. So new clients will using poll task to fetch partitions by default.

Alternatives

No response

Anything else?

No response

@AnonHxy AnonHxy self-assigned this Feb 22, 2023
@AnonHxy AnonHxy changed the title PIP-247: Notifications for partitons update PIP-247: Notifications for partitions update Feb 22, 2023
@eolivelli
Copy link
Contributor

can't we enable this by default in case we detect that the connected Broker supports it ?
I can't find any reason for not using this mechanism if it is available.

Maybe we can set the default to "true" and allow users to disable it in case it impacts their systems in an unwanted way.

Maybe It would be useful to have a way to disable the mechanism on the broker side as well

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Feb 22, 2023

Thanks for your great suggestion @eolivelli .
I agreed with you. It's more reasonable to add a supports_partition_update_watchers in FeatureFlags to detect that the connected
Broker supporting this feature , and add a new broker configuration property enableNotificationForPartitionUpdate with default value true, which is much like PIP-145. I have updated the descriptions.

@github-actions
Copy link

github-actions bot commented Apr 8, 2023

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Apr 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants