-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce batch message container framework and support key based batching container #4435
Introduce batch message container framework and support key based batching container #4435
Conversation
d2bcfec
to
c5a04b5
Compare
relate to #4077 |
/cc @wolfstudy please help review the changes |
run Integration Tests |
run java8 tests |
run Integration Tests |
run Integration Tests |
* | ||
* @param ex cause | ||
*/ | ||
void handleException(Exception ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better name might be discard
? this method is basically releasing payload and the container when encountering exceptions, no?
* | ||
* @return message batch size in bytes | ||
*/ | ||
long getCurrentBatchSizeBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getCurrentBatchSize
is probably good enough. because size
is typically in bytes.
} | ||
|
||
@Override | ||
public List<OpSendMsg> createOpSendMsgs() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems that we have to create one list for each batch when using normal batching mechanism. is it possible that we can reduce this overhead when key based batching is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, i think can add a method isMultiBatch
, if isMultiBatch
is true call createOpSendMsgs() , otherwise call createOpSendMsg()
return msg.getKey(); | ||
} | ||
|
||
private class KeyBasedPart { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the class static
?
Also is KeyedBatch
a better name?
return result; | ||
} | ||
|
||
private String peekKey(MessageImpl<?> msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peek
sounds like an action to retrieve a key from a queue. why not just call it getKey
?
if (part == null) { | ||
part = new KeyBasedPart(); | ||
part.addMsg(msg, callback); | ||
batches.putIfAbsent(key, part); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a race condition here. if concurrent puts happen, one put will fail to add the message to the part. hence there is data loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before add message to the container, producer already with synchronized
semantics. BatchMessageKeyBasedContainer and BatchMessageContainerImpl is not thread-safe, if we want to make sure they are thread-safe, we need to change some other fields to thread-safe(e.g. numMessagesInBatch
, currentBatchSizeBytes
), is it necessary for us to do so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui if it is not threadsafe, why do we need to use ConcurrentHashMap? Can't we just use a HashMap?
A good practice is - if you are using ConcurrentMap, it might be worth making it thread-safe. Otherwise it will be hard to maintain the code, since people will be confused about why a ConcurrentMap is used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can just use a HashMap.
@Override | ||
public void add(MessageImpl<?> msg, SendCallback callback) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName, | |
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName, |
@@ -223,6 +224,13 @@ private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData c | |||
return this; | |||
} | |||
|
|||
@Override | |||
public ProducerBuilder<T> batchingContainerBuilder(BatchMessageContainerBuilder batchingContainerBuilder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if batchingContainerBuilder
is the best name. maybe MessageBatcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also struggled with this problem, if we named messageBatcher, It looks like it's provided with a message batcher, but in fact, it provides the ability to build a message batcher. because if it is a partitioned topic, each partitions producer can not use same message batcher, so i named batchingContainerBuilder. This naming seems a bit complicated, how about batcherBuilder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batcher sounds like a better name.
* batch message container builder | ||
* @return the producer builder instance | ||
*/ | ||
ProducerBuilder<T> batchingContainerBuilder(BatchMessageContainerBuilder batchingContainerBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that this is only be used when batching is enabled
134f9dd
to
3a6e788
Compare
@sijie I'm already address all your comments, PTAL. |
run cpp tests |
run java8 tests |
run cpp tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. lgtm.
Thanks for the work.
### Motivation Support key based batching for C++ client. This is a catch up work on #4435. In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work. ### Modifications - Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list. - Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`. - Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send. - Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`. - Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`. - Add a producer config to change batching type. - Add some units tests for key based batching and a unit test for key shared subscription. ### Verifying this change This change added tests and can be verified as follows: - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*` - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching` * Change ClientConnection::getMaxMessageSize() to static method * Refactor the BatchMessageContainer * Add batching type to producer config * Fix testBigMessageSizeBatching error * Add key based batching container * Make batching type config work * Add unit tests for key based batching and key shared subscription * Fix flush test error possibly caused by timeout overflow * Make BatchMessageKeyBasedContainer work for a single batch
### Motivation Support key based batching for C++ client. This is a catch up work on apache#4435. In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work. ### Modifications - Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list. - Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`. - Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send. - Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`. - Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`. - Add a producer config to change batching type. - Add some units tests for key based batching and a unit test for key shared subscription. ### Verifying this change This change added tests and can be verified as follows: - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*` - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching` * Change ClientConnection::getMaxMessageSize() to static method * Refactor the BatchMessageContainer * Add batching type to producer config * Fix testBigMessageSizeBatching error * Add key based batching container * Make batching type config work * Add unit tests for key based batching and key shared subscription * Fix flush test error possibly caused by timeout overflow * Make BatchMessageKeyBasedContainer work for a single batch
### Motivation Support key based batching for C++ client. This is a catch up work on apache#4435. In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work. ### Modifications - Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list. - Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`. - Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send. - Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`. - Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`. - Add a producer config to change batching type. - Add some units tests for key based batching and a unit test for key shared subscription. ### Verifying this change This change added tests and can be verified as follows: - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*` - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching` * Change ClientConnection::getMaxMessageSize() to static method * Refactor the BatchMessageContainer * Add batching type to producer config * Fix testBigMessageSizeBatching error * Add key based batching container * Make batching type config work * Add unit tests for key based batching and key shared subscription * Fix flush test error possibly caused by timeout overflow * Make BatchMessageKeyBasedContainer work for a single batch
Motivation
Introduce batch message container framework to support multiple ways to do message batch.
Currently, pulsar support a most basic batch message container, use the batch message container framework can quickly implement other types batch message container, even users can customize their own batch message container.
Add a new batch message container named BatchMessageKeyBasedContainer to support batching message in key_shared subscription mode.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation