Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request] C++ and Python support batch index acknowledgment #87

Closed
1 of 2 tasks
shibd opened this issue Aug 19, 2022 · 1 comment · Fixed by #151
Closed
1 of 2 tasks

[feature request] C++ and Python support batch index acknowledgment #87

shibd opened this issue Aug 19, 2022 · 1 comment · Fixed by #151
Assignees

Comments

@shibd
Copy link
Member

shibd commented Aug 19, 2022

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Batch index acknowledgment can improve individual ack performance.

Refer PIP 54: Support acknowledgment at batch index level

Solution

Refer java client PR: apache/pulsar#6052

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@BewareMyPower BewareMyPower transferred this issue from apache/pulsar Oct 28, 2022
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Nov 7, 2022
Master issue: apache#87

### Motivation

To support batch index acknowledgment, we must provide a method to get
the batch size of a batched message ID.

### Modifications

Instead of adding another overload constructor to `MessageId`, this PR
adds a `MessageIdBuilder` class to construct the `MessageId` in a more
elegant way.

The original constructor is counterintuitive because the partition
index is the 1st argument.

https://github.com/apache/pulsar-client-cpp/blob/74ef1a01f5c7a4604d251de6d040c433f9bbf56b/include/pulsar/MessageId.h#L47

Therefore, this PR marks it as deprecated and replace all invocations of
it with the `MessageIdBuilder` usages.

To verify the `MessageId::batchSize()`, the following tests are
modified:
- `BatchMessageTest.testBatchSizeInBytes`: the batch size is always 2
  because of the `batchingMaxAllowedSizeInBytes` config.
- `MessageChunkingTest.testEndToEnd`: the batch size field is not set
  (default: 0) because batching is disabled.
RobertIndie pushed a commit that referenced this issue Nov 14, 2022
Master issue: #87

### Motivation

To support batch index acknowledgment, we must provide a method to get
the batch size of a batched message ID.

### Modifications

Instead of adding another overload constructor to `MessageId`, this PR
adds a `MessageIdBuilder` class to construct the `MessageId` in a more
elegant way.

The original constructor is counterintuitive because the partition
index is the 1st argument.

https://github.com/apache/pulsar-client-cpp/blob/74ef1a01f5c7a4604d251de6d040c433f9bbf56b/include/pulsar/MessageId.h#L47

Therefore, this PR marks it as deprecated and replace all invocations of
it with the `MessageIdBuilder` usages.

To verify the `MessageId::batchSize()`, the following tests are
modified:
- `BatchMessageTest.testBatchSizeInBytes`: the batch size is always 2
  because of the `batchingMaxAllowedSizeInBytes` config.
- `MessageChunkingTest.testEndToEnd`: the batch size field is not set
  (default: 0) because batching is disabled.
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Dec 20, 2022
Fixes apache#87

### Modifications

- Add an consumer configuration `setBatchIndexAckEnabled` to enable the
  batch index ACK. When it's enabled, passing the original `MessageId`
  instead of the `MessageIdImpl` that trunks the batch index to the ACK
  grouping tracker.
- Since now a `BatchedMessageIdImpl` could be accepted in the ACK
  grouping tracker, fix the compare logic.
- Support passing a `BitSet` in `Commands::newAck` and get the internal
  `BitSet` from `MessageId` in `Commands::newMultiMessageAck`.
- Skip the acknowledged batch indexes when receiving batched messages in
  `ConsumerImpl::receiveIndividualMessagesFromBatch`.

### Verifications

Modify `BitSetTest.testSet` to verify the `BitSet::get` method added in
this commit.

Add `AcknowledgeTest.testBatchIndexAck` to test batch index ACK for all
types of acknowledgment:
- Individual ACK for a single message
- Individual ACK for a list of messages
- Cumulative ACK

Add `AcknowledgeTest.testMixedCumulativeAck` to test the new compare
logic between `BatchedMessageIdImpl` and `MessageIdImpl` works for
cumulative ACK.
BewareMyPower added a commit that referenced this issue Jan 17, 2023
Fixes #87

### Modifications

- Add an consumer configuration `setBatchIndexAckEnabled` to enable the
  batch index ACK. When it's enabled, passing the original `MessageId`
  instead of the `MessageIdImpl` that trunks the batch index to the ACK
  grouping tracker.
- Since now a `BatchedMessageIdImpl` could be accepted in the ACK
  grouping tracker, fix the compare logic.
- Support passing a `BitSet` in `Commands::newAck` and get the internal
  `BitSet` from `MessageId` in `Commands::newMultiMessageAck`.
- Skip the acknowledged batch indexes when receiving batched messages in
  `ConsumerImpl::receiveIndividualMessagesFromBatch`.

### Verifications

Modify `BitSetTest.testSet` to verify the `BitSet::get` method added in
this commit.

Add `AcknowledgeTest.testBatchIndexAck` to test batch index ACK for all
types of acknowledgment:
- Individual ACK for a single message
- Individual ACK for a list of messages
- Cumulative ACK

Add `AcknowledgeTest.testMixedCumulativeAck` to test the new compare
logic between `BatchedMessageIdImpl` and `MessageIdImpl` works for
cumulative ACK.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants