You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Apr 1, 2024. It is now read-only.
This PIP introduces a way to support batched message using entry filter without having to deserilize the entry, through restricting same properties of messages in one batch.
It doesn't support batched message naturally. Because the entry filter only knows the main header of the entry and doesn't dig into the payload to deserialize the single message meta.
If the developer of the entry filter wants to filter batched message, he/she have to deserialize the payload to get the each message's properties , which will bring higher memory and cpu workload .
Let's expain the current entry filters in detail. Skip to "Solution" part directly if you have already been clear about the drawback above.
Today, when an entry filter receives an entry, it gets an Entry that has:
public interface Entry {
byte[] getData();
byte[] getDataAndRelease();
int getLength();
ByteBuf getDataBuffer();
Position getPosition();
long getLedgerId();
long getEntryId();
boolean release();
}
The Entry interface doesn't let you know if this is Batched Entry.
You also get FilterContext:
@Data
public class FilterContext {
private Subscription subscription;
private MessageMetadata msgMetadata;
private Consumer consumer;
and in MessageMetadata, you have
// differentiate single and batch message metadata
optional int32 num_messages_in_batch = 11 [default = 1];
Which enables you to know this entry is batched.
The developer can determine what class would deserialize the entry byte array into a list of separate messages.
So currently, given the entry is batched, the filter developer can act on it only by paying the cost of deserializing it.
Soultions
How can we using entry filter with batched messages, and without having to deserilize the entry?
One of rejected alternatives
One of alternatives is that we can alter the producers to extract specific properties from each message and place those properties values in the message metadata of the Batched Entry. The filter can then use the values to decide if to reject/accept.
The problem is that if you have different values for a given property for each message in the batch, then the filter author can't provide a reject or accept for this entry since some messages are rejected, and some are accepted.
Soultion
So the only solution is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted. If a message is added to the producer and the properties are not the same as the batched records, it will trigger a send of this batch and start a new batch with that message.
In summary, this proposal introduces another trigger condition to send the batch, on top of the current max count, max size, and max delay: Once a message is requested to be added to a batch of its properties (partial properteis as defined in a new configuration) values are different from the records in the batch (i.e. 1st record properties values), it will trigger the batch flush (i.e send and clear).
API Changes
Because we know which key/value of properties will be used in our entry filter, so we only need pick the properties which will be used to appy this proposal. Add a producer config to specialize the properties key/value. Only messages have same key/value of properties in the config will apply this proposal.
The restrictSameValuesInBatchProperties type is Map<String, List<String>>, the map'key is the properties key, and map'value is the properties values.
If restrictSameValuesInBatchProperties is empty (default is empty), that means this grouped by properties will not take effect.
Messages with properties have same key/value contains in restrictSameValuesInBatchProperties will be placed into same batch.
Implementation
When call org.apache.pulsar.client.impl.BatchMessageContainerImpl#add, we extract the message properties and add it to metadata:
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
numMessagesInBatch);
}
if (++numMessagesInBatch == 1) {
try {
// some properties are common amongst the different messages in the batch, hence we just pick it up from
// the first message
messageMetadata.setSequenceId(msg.getSequenceId());
List<KeyValue> filterProperties = getProperties(msg);
if (!filterProperties.isEmpty()) {
messageMetadata.addAllProperties(filterProperties); // and message properties here
}
Also we need to add a method hasSameProperties like hasSameSchema. Messages with same properties can be added to the same batch. Once a message with different properties is added, the producer will triger flush and sending the batch.
private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
return batchMessageContainer.haveEnoughSpace(msg) // messageContainer controls the memory
&& (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
&& batchMessageContainer.hasSameProperties(msg) // invoke it here
&& batchMessageContainer.hasSameTxn(msg);
}
In summary, most of modification in this proposal just are:
Extract the first message properties in the batch and fill into the BatchMessageContainerImpl#messageMetada
Check if the sending message has same properties with the properties in BatchMessageContainerImpl#messageMetada additionally in ProducerImpl#canAddToCurrentBatch method.
Example
There is an example maybe helpful to understand this:
Let's set restrictSameValuesInBatchProperties=<region=us,eu; version=1,2>
This means only key named region values 'us' or 'eu', and version values '1' or'2' will be extracted to the batch meta properties
Then we have a producer that sends the messges below in order:
msg1 with properties: <region: eu>
msg2 with properties: <region: eu>
msg3 with properties: <region: eu, version:1, tag:a>
msg4 with properties: <region: eu, version:1>
msg5 with properties: <region: us, version:1>
msg6 with properties: <region: us, version:2>
msg7 with properties: <region: us, version:5>
msg8 with properties: <region: us, version:6>
The process of properties extraction will be:
msg1 and msg2 have the same properties: <region: eu>, so they will put into the same batch
msg3 and msg4 have the same properties: <region: eu, version:1>. tag:a in msg3 will be ignored because the restrictSameValuesInBatchProperties doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
msg5 and msg6 have different properties, because the value of version is different. So we publish msg5 and msg6 with different batch.
msg7 and msg8 have the same properties <region:us>, and <version> will be ignored because it's values doesn't exist in restrictSameValuesInBatchProperties.
Just to summarize, the result will be:
batch meta properties
single meta properties
payload
single meta properties
payload
batch1
<region: eu>
<region: eu>
msg1
<region: eu>
msg2
batch2
<region: eu, version:1>
<region: eu, version:1, tag:a>
msg3
<region: eu, version:1>
msg4
batch3
<region: us, version:1>
<region: us, version:1>
msg5
batch4
<region: us, version:2>
<region: us, version:2>
msg6
batch5
<region: us>
<region: us, version:5>
msg7
<region: us, version:6>
msg7
Trade-off
The side effect of this behavior is that it can easily end up with tiny batches, perhaps even 1 record per batch. There is a good chance once they turn this feature on, they will lose all performance benefits of batching since the batches will be very small. It completely depends on the distribution of values.
In spite of this, we shoud clarify that, entry filter dosen't support batched messages currently. So this proposal gives a big chance that batched messages can also using entry filter. It bring great benefits especially when you have konw the distrbution of values.
Reject Alternatives
Implement a AbstractBatchMessageContainer , saying BatchMessagePropertiesBasedContainer, keeping messages with same properties in a single hashmap entry, like BatchMessageKeyBasedContainer.
Rejection reason: This will publish messages out of order
The text was updated successfully, but these errors were encountered:
Original Issue: apache#16680
discuss mail-thread: https://lists.apache.org/thread/cdw5c2lpj5nwzl2zqyv8mphsqqv9vozj
Motivation
This PIP introduces a way to support batched message using entry filter without having to deserilize the entry, through restricting same properties of messages in one batch.
We already have a plug-in way to filter entries in broker, aka PIP-105 PIP 105: Support pluggable entry filter in Dispatcher apache/pulsar#12269. But this way has some drawback:
Let's expain the current entry filters in detail. Skip to "Solution" part directly if you have already been clear about the drawback above.
Today, when an entry filter receives an entry, it gets an Entry that has:
The Entry interface doesn't let you know if this is Batched Entry.
You also get FilterContext:
and in MessageMetadata, you have
Which enables you to know this entry is batched.
The developer can determine what class would deserialize the entry byte array into a list of separate messages.
So currently, given the entry is batched, the filter developer can act on it only by paying the cost of deserializing it.
Soultions
How can we using entry filter with batched messages, and without having to deserilize the entry?
One of rejected alternatives
One of alternatives is that we can alter the producers to extract specific properties from each message and place those properties values in the message metadata of the Batched Entry. The filter can then use the values to decide if to reject/accept.
The problem is that if you have different values for a given property for each message in the batch, then the filter author can't provide a reject or accept for this entry since some messages are rejected, and some are accepted.
Soultion
So the only solution is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted. If a message is added to the producer and the properties are not the same as the batched records, it will trigger a send of this batch and start a new batch with that message.
In summary, this proposal introduces another trigger condition to send the batch, on top of the current max count, max size, and max delay: Once a message is requested to be added to a batch of its properties (partial properteis as defined in a new configuration) values are different from the records in the batch (i.e. 1st record properties values), it will trigger the batch flush (i.e send and clear).
API Changes
Because we know which key/value of properties will be used in our entry filter, so we only need pick the properties which will be used to appy this proposal. Add a producer config to specialize the properties key/value. Only messages have same key/value of properties in the config will apply this proposal.
restrictSameValuesInBatchProperties
type isMap<String, List<String>>
, the map'key is the properties key, and map'value is the properties values.restrictSameValuesInBatchProperties
is empty (default is empty), that means this grouped by properties will not take effect.restrictSameValuesInBatchProperties
will be placed into same batch.Implementation
org.apache.pulsar.client.impl.BatchMessageContainerImpl#add
, we extract the message properties and add it tometadata
:hasSameProperties
likehasSameSchema
. Messages with same properties can be added to the same batch. Once a message with different properties is added, the producer will triger flush and sending the batch.In summary, most of modification in this proposal just are:
Example
There is an example maybe helpful to understand this:
Let's set
restrictSameValuesInBatchProperties
=<region=us,eu; version=1,2>
This means only key named
region
values 'us' or 'eu', andversion
values '1' or'2' will be extracted to the batch meta propertiesThen we have a producer that sends the messges below in order:
msg1
with properties:<region: eu>
msg2
with properties:<region: eu>
msg3
with properties:<region: eu, version:1, tag:a>
msg4
with properties:<region: eu, version:1>
msg5
with properties:<region: us, version:1>
msg6
with properties:<region: us, version:2>
msg7
with properties:<region: us, version:5>
msg8
with properties:<region: us, version:6>
The process of properties extraction will be:
restrictSameValuesInBatchProperties
doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.<region:us>
, and<version>
will be ignored because it's values doesn't exist inrestrictSameValuesInBatchProperties
.Just to summarize, the result will be:
Trade-off
The side effect of this behavior is that it can easily end up with tiny batches, perhaps even 1 record per batch. There is a good chance once they turn this feature on, they will lose all performance benefits of batching since the batches will be very small. It completely depends on the distribution of values.
In spite of this, we shoud clarify that, entry filter dosen't support batched messages currently. So this proposal gives a big chance that batched messages can also using entry filter. It bring great benefits especially when you have konw the distrbution of values.
Reject Alternatives
AbstractBatchMessageContainer
, sayingBatchMessagePropertiesBasedContainer
, keeping messages with same properties in a singlehashmap
entry, likeBatchMessageKeyBasedContainer
.Rejection reason: This will publish messages out of order
The text was updated successfully, but these errors were encountered: