-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-22: Dead Letter Topic #2508
Conversation
retest this please |
@rdhabalia @merlimat can you help reviewing @codelipenghui 's pull request? |
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@rdhabalia @merlimat please review this. we need to include this for 2.2 release. |
ping @rdhabalia @merlimat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Sorry for delay. The change mostly LGTM. Just left few comments.
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class NonPersistentRedeliveryTracker implements RedeliveryTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this tracker called "non-persistent"? Is it because the counter itself is not stored?
In that case it's a bit confusing since there's the concept of non-persistent topics as well, which I think is orthogonal to the redelivery tracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because in the discuss of DLQ, has a solution is store the redelivery count in managed ledger, so i think it's possible to implement a PersistentRedeliveryTracker, so i named current implement to NonPersistentRedeliveryTracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, so then maybe we can rename into InMemoryRedeliveryTracker
to avoid confusion with this be associated with NonPersistentTopic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -97,6 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso | |||
this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); | |||
this.topic = topic; | |||
this.messagesToReplay = new ConcurrentLongPairSet(512, 2); | |||
this.redeliveryTracker = new NonPersistentRedeliveryTracker(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above, why NonPersistentRedeliveryTracker
?
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.common.policies.data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to keep this class in pulsar-client
package instead of pulsar-common
, under the org.apache.pulsar.client.api
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
this.maxRedeliverCount = maxRedeliverCount; | ||
} | ||
|
||
public DeadLetterPolicy(int maxRedeliverCount, String deadLetterTopic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of providing constructor overloads, we could use interface and builder approach like in other parts of the API. That way the code will long more descriptive.
Eg:
/// This
DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic("my-dql-topic")
.build()
// Instead of
new DeadLetterPolicy(10, "my-dql-topic")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -1184,6 +1239,40 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) { | |||
MessageIdData.Builder builder = MessageIdData.newBuilder(); | |||
batches.forEach(ids -> { | |||
List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> { | |||
List<MessageImpl<T>> deadLetterMessages = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this new portion of code be factored out in a separate method that is called from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -338,4 +340,9 @@ | |||
* @return consumer builder. | |||
*/ | |||
ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors); | |||
|
|||
/** | |||
* Set dead letter policy for consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please expand the comment, explaining what are the purpose and scope of dead-letter policy (for people who are not familiar with the concept).
Also it would be nice to provide here a quick code example that explain how to enable this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
CommandMessage.Builder msgBuilder = CommandMessage.newBuilder(); | ||
msgBuilder.setConsumerId(consumerId); | ||
msgBuilder.setMessageId(messageId); | ||
msgBuilder.setRedeliveryCount(redeliveryCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the redeliveryCount
is 0, which is the default in the proto definition, and it's passed when the feature is disabled, we should avoid setting the value on the protobuf builder. If that is omitted, it would save 2 bytes per each message.
@@ -265,7 +265,8 @@ public SendMessageInfo sendMessages(final List<Entry> entries, SendListener list | |||
if (i == (entries.size() - 1)) { | |||
promise = writePromise; | |||
} | |||
ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise); | |||
int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of keeping a separate redelivery tracker, would it be possible to reuse the pendingAcks
maps on the Consumer
class?
pendingAcks
is a map for which the key is a pair longs and the value is a pair of longs as well.
(ledgerId, entryId) --> (batchSize, none)
As you can see, the second long
field in the value is currently unused. We could keep the counter there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After i check the pending acks in consumer, i recommend use redeliveryTracker to track the redelivery count:
First, in a subscription only have one redelivery tracker, pending acks is binding to consumer, we don't need to record redelivery count per consumer and if record it in consumer, will get a different behavior ismax redelivery count per consumer
, but for now we defined the redelivery count per subscription.
Second, i think we need separate pending acks and redeliveryTracker, it's better to implement another redeliveryTracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, in a subscription only have one redelivery tracker, pending acks is binding to consumer, we don't need to record redelivery count per consumer and if record it in consumer, will get a different behavior ismax redelivery count per consumer, but for now we defined the redelivery count per subscription.
Sure, what I was thinking was more to store it in consumer pending acks and carrying it over to the new consumer when redelivering.
Second, i think we need separate pending acks and redeliveryTracker, it's better to implement another redeliveryTracker.
Sure, these are separate concepts. Though I think the current name pendingAcks
is a bit misleading. What the map really tracks is "message that were pushed to this consumer and not acked".
My main concern with the new map is to have to potentially accumulate lots of items in the concurrent hash map, irrespective of whether the DLQ is being used or not. Especially, this might be mostly not the case in exclusive/failover subscriptions, where data is typically processed in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern with the new map is to have to potentially accumulate lots of items in the concurrent hash map,
I'm understand your concern, i think the tracker map is limited by maxUnackMessages, only put element into tracker map when trigger by redeliverUnackedMessages().
irrespective of whether the DLQ is being used or not.
I think we can optimize it next step, i think we can add a field in proto like enableRedeliveryTracker
, broker just enable this mechanism when enableRedeliveryTracker
is true.
Especially, this might be mostly not the case in exclusive/failover subscriptions, where data is typically processed in order.
Yes, i'm agree, i think we can only enable the counter tracking only on Shared subscriptions, mean that just in PersistentDispatcherMultipleConsumers.
@merlimat I'm already address your comment except using pendingAcks to instead RedeliveryTracker. |
@@ -195,6 +195,7 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St | |||
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions); | |||
} | |||
cursor.asyncDelete(positions, deleteCallback, positions); | |||
dispatcher.getRedeliveryTracker().removeBatch(positions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not clean up the redelivery tracker when doing a "cumulative-ack". It should be also cleaned at line 192, though for cumulative acks there might be a sequence of entries to clean-up and the concurrent hash map might not be a good fit (since we'd need sorted map).
Given that, and can we initially only enable the counter only on Shared subscriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that, and can we initially only enable the counter only on Shared subscriptions?
I'm agree with your comment(Especially, this might be mostly not the case in exclusive/failover subscriptions, where data is typically processed in order).
This will not clean up the redelivery tracker when doing a "cumulative-ack". It should be also cleaned at line 192, though for cumulative acks there might be a sequence of entries to clean-up and the concurrent hash map might not be a good fit (since we'd need sorted map).
If we just enable the redelivery tracker in share mode, we don't need a sorted map, from client can't do cumulative-ack in share mode.
@merlimat Next step i will start a new PR to optimize the mechanism of only enable RedeliveryTracker by client subscription( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
retest this please |
run java8 tests |
run java8 tests |
run java8 tests |
@codelipenghui it is a great contribution! thank you! |
Reverts: #5881 ### Motivation The `redeliveryCount` was introduced in [PIP 22](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) with this PR #2508. It is an extra field on a message that indicates how many times a message has been redelivered. In the original design, it was only incremented for shared subscriptions when the consumer sent `REDELIVER_UNACKNOWLEDGED_MESSAGES` to the broker. In #5881, this field's logic changed so that it is incremented each time a broker delivers a message to a consumer (after the initial delivery). The problem with this logic is that it counts messages that are sent to a consumer's `receiveQueue`, but not actually received by the client application, as "delivered" messages. This is especially problematic for the DLQ implementation because it relies on the counter to track deliveries, and this eager incrementing of the `redeliveryCount` could lead to fewer retries than an application would like. This PR returns the broker's behavior to the original state before #5881. Note that the DLQ logic is only triggered by messages that hit their ack timeout or are negatively acknowledged. This means that in some cases, a message could be delivered many times to a `receiveQueue` and once to the application and then sent to the DLQ. Given that our DLQ implementation has an intentional preference towards over delivery instead of under delivery, I think this logic should be fixed. One of the consequences of this PR is that the message filter logic for redelivering messages triggers this logic for incrementing `redeliveryCount`. See this code here: https://github.com/apache/pulsar/blob/b1a29b520d34d60e60160e3a7b9b0e26926063ee/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L198-L206 I'll need feedback from someone more familiar with message filtering to understand if this is a problematic change. If it is, I think we might need to revisit the logic in `filterEntriesForConsumer`. ### Modifications * Revert the relevant changes from #5895. I kept the test that was added in the PR and modified the assertion. * Fix test assertion ordering and modify expected value to align with new paradigm. ### Verifying this change This change includes modifications to tests as well as existing test coverage. ### Does this pull request potentially affect one of the following parts: This change is a break in current behavior, so I will send an email to the dev mailing list: https://lists.apache.org/thread/ts9d6zbtlz3y5xtv7p0c3dslk0vljpj2. ### Documentation - [x] `doc-not-needed`
Reverts: apache#5881 ### Motivation The `redeliveryCount` was introduced in [PIP 22](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) with this PR apache#2508. It is an extra field on a message that indicates how many times a message has been redelivered. In the original design, it was only incremented for shared subscriptions when the consumer sent `REDELIVER_UNACKNOWLEDGED_MESSAGES` to the broker. In apache#5881, this field's logic changed so that it is incremented each time a broker delivers a message to a consumer (after the initial delivery). The problem with this logic is that it counts messages that are sent to a consumer's `receiveQueue`, but not actually received by the client application, as "delivered" messages. This is especially problematic for the DLQ implementation because it relies on the counter to track deliveries, and this eager incrementing of the `redeliveryCount` could lead to fewer retries than an application would like. This PR returns the broker's behavior to the original state before apache#5881. Note that the DLQ logic is only triggered by messages that hit their ack timeout or are negatively acknowledged. This means that in some cases, a message could be delivered many times to a `receiveQueue` and once to the application and then sent to the DLQ. Given that our DLQ implementation has an intentional preference towards over delivery instead of under delivery, I think this logic should be fixed. One of the consequences of this PR is that the message filter logic for redelivering messages triggers this logic for incrementing `redeliveryCount`. See this code here: https://github.com/apache/pulsar/blob/b1a29b520d34d60e60160e3a7b9b0e26926063ee/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L198-L206 I'll need feedback from someone more familiar with message filtering to understand if this is a problematic change. If it is, I think we might need to revisit the logic in `filterEntriesForConsumer`. ### Modifications * Revert the relevant changes from apache#5895. I kept the test that was added in the PR and modified the assertion. * Fix test assertion ordering and modify expected value to align with new paradigm. ### Verifying this change This change includes modifications to tests as well as existing test coverage. ### Does this pull request potentially affect one of the following parts: This change is a break in current behavior, so I will send an email to the dev mailing list: https://lists.apache.org/thread/ts9d6zbtlz3y5xtv7p0c3dslk0vljpj2. ### Documentation - [x] `doc-not-needed` (cherry picked from commit 2fd3509)
Reverts: apache#5881 ### Motivation The `redeliveryCount` was introduced in [PIP 22](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) with this PR apache#2508. It is an extra field on a message that indicates how many times a message has been redelivered. In the original design, it was only incremented for shared subscriptions when the consumer sent `REDELIVER_UNACKNOWLEDGED_MESSAGES` to the broker. In apache#5881, this field's logic changed so that it is incremented each time a broker delivers a message to a consumer (after the initial delivery). The problem with this logic is that it counts messages that are sent to a consumer's `receiveQueue`, but not actually received by the client application, as "delivered" messages. This is especially problematic for the DLQ implementation because it relies on the counter to track deliveries, and this eager incrementing of the `redeliveryCount` could lead to fewer retries than an application would like. This PR returns the broker's behavior to the original state before apache#5881. Note that the DLQ logic is only triggered by messages that hit their ack timeout or are negatively acknowledged. This means that in some cases, a message could be delivered many times to a `receiveQueue` and once to the application and then sent to the DLQ. Given that our DLQ implementation has an intentional preference towards over delivery instead of under delivery, I think this logic should be fixed. One of the consequences of this PR is that the message filter logic for redelivering messages triggers this logic for incrementing `redeliveryCount`. See this code here: https://github.com/apache/pulsar/blob/b1a29b520d34d60e60160e3a7b9b0e26926063ee/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L198-L206 I'll need feedback from someone more familiar with message filtering to understand if this is a problematic change. If it is, I think we might need to revisit the logic in `filterEntriesForConsumer`. ### Modifications * Revert the relevant changes from apache#5895. I kept the test that was added in the PR and modified the assertion. * Fix test assertion ordering and modify expected value to align with new paradigm. ### Verifying this change This change includes modifications to tests as well as existing test coverage. ### Does this pull request potentially affect one of the following parts: This change is a break in current behavior, so I will send an email to the dev mailing list: https://lists.apache.org/thread/ts9d6zbtlz3y5xtv7p0c3dslk0vljpj2. ### Documentation - [x] `doc-not-needed`
Motivation
Fixes #189
When consumer got messages from pulsar, It's difficult to ensure every message can be consume success. Pulsar support message redelivery feature by set acknowledge timeout when create a new consumer. This is a good feature guarantee consumer will not lost messages.
But however, some message will redelivery so many times possible, even to the extent that it can be never stop.
So, It's necessary to support a feature to control it by pulsar. Users can use this feature and customize this feature to control the message redelivery behavior. The feature named Dead Letter Topic.
Modifications
Consumer can set maximum number of redeliveries by java client.
Consumer can set the name of Dead Letter Topic by java client, It’s not necessary.
Message exceeding the maximum number of redeliveries should send to Dead Letter Topic and acknowledged automatic.
Result
If consumer enable future of dead letter topic. When Message exceeding the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic.