-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-34 Key_Shared subscription core implementation. #4079
PIP-34 Key_Shared subscription core implementation. #4079
Conversation
@@ -110,8 +110,9 @@ message MessageMetadata { | |||
// Additional parameters required by encryption | |||
optional bytes encryption_param = 15; | |||
optional bytes schema_version = 16; | |||
|
|||
optional bool partition_key_b64_encoded = 17 [ default = false ]; | |||
optional bool partition_key_b64_encoded = 17 [ default = false ]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems not aligned? :)
@@ -124,8 +125,9 @@ message SingleMessageMetadata { | |||
// the timestamp that this event occurs. it is typically set by applications. | |||
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`. | |||
optional uint64 event_time = 5 [default = 0]; | |||
|
|||
optional bool partition_key_b64_encoded = 6 [ default = false ]; | |||
optional bool partition_key_b64_encoded = 6 [ default = false ]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alignment issue
|
||
public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) { | ||
super(topic, cursor); | ||
selector = new HashRangeStickyKeyConsumerSelector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it easy here to get selector type config from pulsar().getBrokerService().config()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, PersistentStickyKeyDispatcherMultipleConsumers is inherited PersistentDispatcherMultipleConsumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to make this configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can set de default selector for broker level, i think It would be more useful for the client to choose a different selector.
If config at broker side, different consumer selector may need different params to create a selector instance, this will add a lot of configuration at broker.conf.
On client site, user can specify a consumer selector definition with selector class name and some properties.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be configured at subscription level via a policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm already add this task to the tasks tracker, i'll fix it by a separated PR.
1.test redelivery with Key_Shared subscription 2.test none key dispatch with Key_Shared subscription 3.test ordering key dispatch with Key_Shared subscription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice contribution @codelipenghui ! Overall looks good to me. Left some comments.
@@ -234,6 +236,7 @@ message CommandSubscribe { | |||
Exclusive = 0; | |||
Shared = 1; | |||
Failover = 2; | |||
Key_Shared = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you bump ProtocolVersion
to v15? Since a new subscription type is introduced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V14 is added in 2.4 release. It could be reused. @codelipenghui could you add a comments after V14 for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is not breaking at the protocol level and there's anyway no check in the code when a consumer tries to subscribe if the broker supports the feature or not.
optional bool partition_key_b64_encoded = 6 [ default = false ]; | ||
optional bool partition_key_b64_encoded = 6 [ default = false ]; | ||
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. | ||
optional string ordering_key = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer using bytes
for ordering_key. it is a more nature behavior rather using a string because string requires taking care of encoding.
@@ -68,6 +68,12 @@ public MessageId send() throws PulsarClientException { | |||
return this; | |||
} | |||
|
|||
@Override | |||
public TypedMessageBuilder<T> orderingKey(String orderingKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer using byte[]
for ordering key. because you have to handle charsets and encoding for String
.
@Override | ||
public Consumer select(String stickyKey) { | ||
if (rangeMap.size() > 0) { | ||
int slot = Math.abs(stickyKey.hashCode() % RANGE_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid using hashCode
here. Use Murmur3_32Hash
.
You can move Hash
interface and its implementations (Murmur3_32Hash
) to pulsar-common. since it can be used both for client and broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* | ||
* 1.Each consumer serves a fixed range of hash value | ||
* 2.The whole range of hash value could be covered by all the consumers. | ||
* 3.Once a consumer is removed, the left consumers could still serve the whole range. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would suggest you adding more descriptions into the javadoc. It might be worthing using an example to explain how does your algorithm work here.
|
||
public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) { | ||
super(topic, cursor); | ||
selector = new HashRangeStickyKeyConsumerSelector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to make this configurable.
return SubType.Key_Shared; | ||
} | ||
|
||
private String peekStickyKey(ByteBuf metadataAndPayload) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to handle batched messages. any thoughts on how we can handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to handle batched messages here, for now batched message without key, so this will be treated as NONE_KEY
and all message without key will be send to a consumer, i will make a PR for NONE_KEY
policy setting by client.
For batched messages, we need a key based message batching mechanism, by this way peekStickyKey() still using the same logic.
NONE_KEY
policy and key based message batching mechanism is already in my backlog ISSUE-4077 .
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, overall lgtm.
retest this please |
run java8 tests |
run cpp tests |
@codelipenghui can you check the error? You can run |
run java8 tests |
* @param orderingKey the ordering key for the message | ||
* @return the message builder instance | ||
*/ | ||
TypedMessageBuilder<T> orderingKey(byte[] orderingKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hugely confusing. Now we can have 2 different keys on the message. Users might think the other key doesn't guarantee ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have already discussed through the discuss email of PIP-34, orderingKey
is not necessary, if user use partitionKey
for different purposes, orderingKey
can offer a way allow user to ordering message by another identification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's very confusing to have 2 keys. In particular to refer to one of them as the ordering key, while in fact the other is an "ordering key"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, do you have any suggestions? We really need another flag to overwrite the partitionKey
based ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO It is commonly seen in a distributed database having two kind of keys, partition key and ordering key (or local key). E.g. Spanner. CDC for such databases is one of the use cases for this. Having the ability to provide an ordering key is necessary.
If it is the java doc making things confusing, we can consider improving the java doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a bit generic at this point. I don't think it's even possible to do CDC on Spanner. In any case I don't see why would that be strictly required for this feature.
I just used Spanner as an example here. Whether Spanner supports CDC is not the point to discuss. There are many open source Spanner-like NewSQL databases. E.g. TiDB, YugaByte, and many in-house solutions. I know there are already people working on integrations between TiDB and Pulsar, where the ordering key shines there.
As always, I think it's better to add things in the API when there is a concrete need, rather than speculate possible use cases that might not apply.
Why do you think there is no concrete need when people propose a new PIP?
In this case, since the application expect messages in order by conversation_id, using that as the partitioning key will achieve the same identical behavior.
Pulsar is a multiple subscription system. One subscription can use failover subscription, while the other subscription can use key_shared subscription. You can't force the application to choose conversation id as the partition key. As I said, how applications can use these two keys varies from their needs.
Why would you care about routing per user_id if you just care of ordering per partition_id?
Because there are subscriptions required to consume all the events from a particular user_id.
Finally, as mentioned above I think that "ordering-key" is a very misleading name. It really would be a "sub-key", "delivery-key", "dispatch-key" or other name.
I agree that "ordering" can have a different meaning in different context. It can mean - publish-order, log-order, consumer-order, dispatch-order, key-order. However I don't think "sub-key", "delivery-key" or "dispatch-key" is a better name than "ordering key". In some cases, the ordering key is a "sub-key", but it can be a completely different key while in other cases. Same applies to "delivery-key" or "dispatch-key".
IMO "ordering key" is not a bad name. It is a name that people already have some general ideas about it. Also people generally understand what partitions key and ordering key means. Applications can choose how to use them to adopt to their use cases.
However, I am also not particularly strong on the name itself. We could have called it others if there was a better name came up in the PIP discussion email thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think there is no concrete need when people propose a new PIP?
One thing is the per-key delivery, one other is a CDC pipeline. The 2 don't necessarely have to go together. This is for per-key delivery, then you bring up CDC.
IMO "ordering key" is not a bad name. It is a name that people already have some general ideas about it. Also people generally understand what partitions key and ordering key means
I think it's a very bad name in this context because it's a property on a message that implies something that is misleading. The implementation of this feature also has nothing to do with ordering, rather is give me messages with same keys. Ordering is a derivative property.
In some cases, the ordering key is a "sub-key", but it can be a completely different key while in other cases.
In my view it's a "sub-key" because the routing is done on 2 levels, to partition and to consumers. If you have messages with same "ordering-key" they will be delivered out of order if they have a different partition key.
We could have called it others if there was a better name came up in the PIP discussion email thread.
Just to understand, where is it written that things are set in stone? If people are busy to comments for a few days, one just submits a PR, his buddy approves and merge and that's it? Done? No one can comment on it anymore?
Also, as a curtesy, it would be nice to actively seek comments from people when introducing major features. It's not a race to get PR merged while others are not looking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing is the per-key delivery, one other is a CDC pipeline. The 2 don't necessarely have to go together. This is for per-key delivery, then you bring up CDC.
CDC pipeline is a most typical use case for per-key delivery. The bottleneck of a CDC pipeline is key ordering. You want to ensure key ordering but you also want to scale out. That's why you need a per-key delivery. I am not sure why do you think they are not related.
The implementation of this feature also has nothing to do with ordering, rather is give me messages with same keys. Ordering is a derivative property.
The change includes two parts: 1) adding an ordering key to allow people choose a different key for defining its ordering rather than partition key. That is the change for adding "ordering key". Hence the ordering key is for messages, not for subscription. 2) key_shared subscription is one type of subscription that choose which key to be used for dispatching messages. There can many other different ways for using these two keys.
In my view it's a "sub-key" because the routing is done on 2 levels, to partition and to consumers.
when you are using a term of "sub-key", it means you need both partition-key and sub-key together for deciding the ordering. in my example explained above, "from_user_id" and "conversation_id" might have some application specific relationship. but "conversation_id" is not necessarily a "sub-key" of "from_user_id". Instead, "conversation_id" provides an alternative way for grouping and ordering conversations than using "from_user_id".
To me it is more of a name. I am fine with whether it should be called "sub-key" or "ordering-key". To me it is more a matter for making a good clarification on how ordering_key
is used on javadoc and website.
Just to understand, where is it written that things are set in stone? If people are busy to comments for a few days, one just submits a PR, his buddy approves and merge and that's it? Done? No one can comment on it anymore?
Also, as a curtesy, it would be nice to actively seek comments from people when introducing major features. It's not a race to get PR merged while others are not looking.
You are overreacting to what I said here. I just meant the name was coming up from the discussion thread. I also didn't say it is a final. It can be any other names if there is a better one. In the whole conversation here, what I was trying to do is to show you the use cases I learned and my opinions - why I think "sub-key", "delivery-key" and "dispatch-key" is not better than "ordering-key".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when you are using a term of "sub-key", it means you need both partition-key and sub-key together for deciding the ordering. in my example explained above, "from_user_id" and "conversation_id" might have some application specific relationship. but "conversation_id" is not necessarily a "sub-key" of "from_user_id". Instead, "conversation_id" provides an alternative way for grouping and ordering conversations than using "from_user_id".
On a partition topic the ordering will be on 2 layers. Ordering will be based on the combination of both.
If you say these 2 keys are not related, then, on a partitioned topic, the "ordering-key" will not be able to guarantee the ordering, which sounds a bit weird...
I'm not saying to use any of the examples which just came out of the mind. I'm saying that "orderingKey" can be very confusing to users because it seems to be implying something which is not.
What the application means with the key is up to the application. What it means on the dispatcher is that, within a given partition, all messages with same key will go to same consumer. "Ordering-Key" as a name fails to convey any of the properties while clashing with the partition key that is already used to decide the ordering on the partitions.
To throw more names: "consumeOrderingKey", "dispatchOrderingKey"
You are overreacting to what I said here. I just meant the name was coming up from the discussion thread. I also didn't say it is a final.
Am I? Perhaps it was not intended but your phrase sounded exactly like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frankly speaking, I am not sure it is a good idea to add "consume" or "dispatch" to the "ordering-key".
in a data system, "partition-key" and "ordering-key" is actually a clean concept. "partition-key" is used for partitioning and "ordering-key" is used for ordering. ideally a partition-key doesn't have any ordering implication. but because pulsar is implementing a partition based on an ordered log, the "ordering" on a "partition-key" is implicitly indicated.
If an application needs end-to-end ordering, it can omit "ordering-key" or choose a sub-key of "partition-key" or another related key. since end-to-end ordering requires producers and consumers have agreements on keys.
If an application doesn't require end-to-end ordering or only needs partial ordering (e.g. in a map-reduce-ish pipeline, an intermediate topic is used for computing partially aggregated state), it can just use a key for partitioning and the other key for ordering.
decoupling the partition-key and ordering-key allows adopting to more use cases in future.
However as I said in previous comment, I don't have any strong opinions about the naming. I would let the original authors and you decide which name to pick.
@codelipenghui @jiazhai This feature should have a flag to enable/disable it. |
@merlimat |
Yes
This feature is not well tested as the rest of the code. Administrators must be able to decide whether they want to offer and support this feature or turn it off. |
Can we add some comment at client side to describe |
No. It's not about the users, it's about the administrator that needs to be able to decide which features to support. |
Do we have any consensus about what features require such flags? Or should this rule apply to all features? IMO we might need to discuss in the dev@ and come out a guideline for any features introduced in future. |
We have added feature flags on all major features. I think the downsides of not having it greater exceed the bothering of adding it. As it happened before, new feature not properly tested can cause production issues. |
Noted. It would be great to update the contributor guide (http://pulsar.apache.org/en/contributing/). So that new contributors will know the guideline for this. |
This make sense to me, i will add a flag to control enable or disable this feature, by default it will be disable. |
Fixes #401 ### Motivation According to apache/pulsar#4079, orderingKey was introduced to let user set message order manually, currently pulsar-client-go do not have related apis exposed to user. We should add orderingKey related apis to pulsar-client-go. ### Modifications - add OrderingKey to ProducerMessage - add OrderingKey() to Message interface - sync OrderingKey to SingleMessageMetadata - tests
Motivation
This is a core implementation for PIP-34 and there is a task tracker ISSUE-4077 for this PIP
Modifications
Verifying this change
Add new unit tests to verifying the hash range selector and Key_Shared mode message consume.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation