From 9a612301f12a7f34828d25ca92cf40ee640ef2e7 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Thu, 30 Aug 2018 15:18:47 +0800 Subject: [PATCH 01/11] Add redelivery_count to PulsarApi.proto. --- .../pulsar/common/api/proto/PulsarApi.java | 57 +++++++++++++++++++ pulsar-common/src/main/proto/PulsarApi.proto | 1 + 2 files changed, 58 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index edc7e9b118468..f2dbf2a67f21c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -13542,6 +13542,10 @@ public interface CommandMessageOrBuilder // required .pulsar.proto.MessageIdData message_id = 2; boolean hasMessageId(); org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId(); + + // optional uint64 redelivery_count = 3 [default = 0]; + boolean hasRedeliveryCount(); + long getRedeliveryCount(); } public static final class CommandMessage extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -13598,9 +13602,20 @@ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId() return messageId_; } + // optional uint64 redelivery_count = 3 [default = 0]; + public static final int REDELIVERY_COUNT_FIELD_NUMBER = 3; + private long redeliveryCount_; + public boolean hasRedeliveryCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getRedeliveryCount() { + return redeliveryCount_; + } + private void initFields() { consumerId_ = 0L; messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + redeliveryCount_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13637,6 +13652,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, messageId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt64(3, redeliveryCount_); + } } private int memoizedSerializedSize = -1; @@ -13653,6 +13671,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeMessageSize(2, messageId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(3, redeliveryCount_); + } memoizedSerializedSize = size; return size; } @@ -13770,6 +13792,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x00000002); + redeliveryCount_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -13811,6 +13835,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage buildPartial( to_bitField0_ |= 0x00000002; } result.messageId_ = messageId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.redeliveryCount_ = redeliveryCount_; result.bitField0_ = to_bitField0_; return result; } @@ -13823,6 +13851,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandMes if (other.hasMessageId()) { mergeMessageId(other.getMessageId()); } + if (other.hasRedeliveryCount()) { + setRedeliveryCount(other.getRedeliveryCount()); + } return this; } @@ -13879,6 +13910,11 @@ public Builder mergeFrom( subBuilder.recycle(); break; } + case 24: { + bitField0_ |= 0x00000004; + redeliveryCount_ = input.readUInt64(); + break; + } } } } @@ -13949,6 +13985,27 @@ public Builder clearMessageId() { return this; } + // optional uint64 redelivery_count = 3 [default = 0]; + private long redeliveryCount_ ; + public boolean hasRedeliveryCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getRedeliveryCount() { + return redeliveryCount_; + } + public Builder setRedeliveryCount(long value) { + bitField0_ |= 0x00000004; + redeliveryCount_ = value; + + return this; + } + public Builder clearRedeliveryCount() { + bitField0_ = (bitField0_ & ~0x00000004); + redeliveryCount_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 779a27bc1f027..b81f713246882 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -335,6 +335,7 @@ message CommandSendError { message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; + optional uint64 redelivery_count = 3 [default = 0]; } message CommandAck { From 7c0092a9af8e9ef4950ffb2870f067e28cdd9466 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Fri, 31 Aug 2018 17:14:52 +0800 Subject: [PATCH 02/11] Implement Redelivery Tracker in broker. --- .../pulsar/broker/service/Consumer.java | 3 +- .../pulsar/broker/service/Dispatcher.java | 2 + .../broker/service/RedeliveryTracker.java | 36 +++++++++++ .../service/RedeliveryTrackerDisabled.java | 55 +++++++++++++++++ ...PersistentDispatcherMultipleConsumers.java | 9 +++ ...sistentDispatcherSingleActiveConsumer.java | 9 +++ .../NonPersistentRedeliveryTracker.java | 59 +++++++++++++++++++ ...PersistentDispatcherMultipleConsumers.java | 14 ++++- ...sistentDispatcherSingleActiveConsumer.java | 11 ++++ .../persistent/PersistentSubscription.java | 1 + .../apache/pulsar/common/api/Commands.java | 3 +- 11 files changed, 199 insertions(+), 3 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 883bf5553e307..5752034a79a76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -265,7 +265,8 @@ public SendMessageInfo sendMessages(final List entries, SendListener list if (i == (entries.size() - 1)) { promise = writePromise; } - ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise); + int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())); + ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), promise); messageId.recycle(); messageIdBuilder.recycle(); entry.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 0d7b7d6e55dab..43f65cd73b985 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -70,4 +70,6 @@ public interface Dispatcher { void addUnAckedMessages(int unAckMessages); + RedeliveryTracker getRedeliveryTracker(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java new file mode 100644 index 0000000000000..0f2e54a542824 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; + +import java.util.List; + +public interface RedeliveryTracker { + + int incrementAndGetRedeliveryCount(Position position); + + int getRedeliveryCount(Position position); + + void remove(Position position); + + void removeBatch(List positions); + + void clear(); +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java new file mode 100644 index 0000000000000..521417f53544a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; + +import java.util.List; + +public class RedeliveryTrackerDisabled implements RedeliveryTracker { + + public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED = new RedeliveryTrackerDisabled(); + + private RedeliveryTrackerDisabled() {} + + @Override + public int incrementAndGetRedeliveryCount(Position position) { + return 0; + } + + @Override + public int getRedeliveryCount(Position position) { + return 0; + } + + @Override + public void remove(Position position) { + // no-op + } + + @Override + public void removeBatch(List positions) { + // no-op + } + + @Override + public void clear() { + // no-op + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index f40564bf69c0f..2067a80d8efc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -31,6 +31,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; @@ -54,6 +56,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher private volatile int totalAvailablePermits = 0; private final ServiceConfiguration serviceConfig; + private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) { this.topic = topic; @@ -61,6 +64,7 @@ public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscr this.name = topic.getName() + " / " + subscription.getName(); this.msgDrop = new Rate(); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } @Override @@ -178,6 +182,11 @@ public SubType getType() { return SubType.Shared; } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + @Override public void sendMessages(List entries) { Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 2083cc7864fe7..787fb00a940ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -29,6 +29,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -40,6 +42,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD private final Rate msgDrop; private final Subscription subscription; private final ServiceConfiguration serviceConfig; + private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, NonPersistentTopic topic, Subscription subscription) { @@ -48,6 +51,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int this.subscription = subscription; this.msgDrop = new Rate(); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } @Override @@ -117,6 +121,11 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { // No-op } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + @Override protected void scheduleReadOnActiveConsumer() { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java new file mode 100644 index 0000000000000..d875979af8e43 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.RedeliveryTracker; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NonPersistentRedeliveryTracker implements RedeliveryTracker { + + private ConcurrentHashMap trackerCache = new ConcurrentHashMap<>(16); + + @Override + public int incrementAndGetRedeliveryCount(Position position) { + trackerCache.putIfAbsent(position, new AtomicInteger(0)); + return trackerCache.get(position).incrementAndGet(); + } + + @Override + public int getRedeliveryCount(Position position) { + return trackerCache.getOrDefault(position, new AtomicInteger(0)).get(); + } + + @Override + public void remove(Position position) { + trackerCache.remove(position); + } + + @Override + public void removeBatch(List positions) { + if (positions != null) { + positions.forEach(this::remove); + } + } + + @Override + public void clear() { + trackerCache.clear(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 17c5db7a6a819..a0a13ac0684a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -44,6 +44,8 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentRedeliveryTracker; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -69,6 +71,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu private CompletableFuture closeFuture = null; private ConcurrentLongPairSet messagesToReplay; + private final RedeliveryTracker redeliveryTracker; private boolean havePendingRead = false; private boolean havePendingReplayRead = false; @@ -97,6 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); + this.redeliveryTracker = new NonPersistentRedeliveryTracker(); this.readBatchSize = MaxReadBatchSize; this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); @@ -556,7 +560,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { - positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); + positions.forEach(position -> { + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); + redeliveryTracker.incrementAndGetRedeliveryCount(position); + }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } @@ -624,5 +631,10 @@ public DispatchRateLimiter getDispatchRateLimiter() { return dispatchRateLimiter; } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 9ab7b877c0815..a74f8036c3870 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -39,6 +39,8 @@ import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentRedeliveryTracker; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -62,6 +64,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private final ServiceConfiguration serviceConfig; private ScheduledFuture readOnActiveConsumerTask = null; + private final RedeliveryTracker redeliveryTracker; + public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic) { super(subscriptionType, partitionIndex, topic.getName()); @@ -72,6 +76,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.readBatchSize = MaxReadBatchSize; this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.dispatchRateLimiter = null; + this.redeliveryTracker = new NonPersistentRedeliveryTracker(); } protected void scheduleReadOnActiveConsumer() { @@ -307,6 +312,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu @Override public void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { // We cannot redeliver single messages to single consumers to preserve ordering. + positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount); redeliverUnacknowledgedMessages(consumer); } @@ -485,5 +491,10 @@ public void addUnAckedMessages(int unAckMessages) { // No-op } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dac9f055d704c..669c6d41d457b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -195,6 +195,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map Date: Fri, 31 Aug 2018 18:30:48 +0800 Subject: [PATCH 03/11] Add DeadLetterPolicy. --- .../policies/data/DeadLetterPolicy.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java new file mode 100644 index 0000000000000..1e05d822cf3e8 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +public class DeadLetterPolicy { + + private final int maxRedeliverCount; + + private String deadLetterTopic; + + + public DeadLetterPolicy(int maxRedeliverCount) { + this.maxRedeliverCount = maxRedeliverCount; + } + + public DeadLetterPolicy(int maxRedeliverCount, String deadLetterTopic) { + this.maxRedeliverCount = maxRedeliverCount; + this.deadLetterTopic = deadLetterTopic; + } + + public int getMaxRedeliverCount() { + return maxRedeliverCount; + } + + public String getDeadLetterTopic() { + return deadLetterTopic; + } +} From dde79b7870a5521bdff8698a1c8375cb254d59e9 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Mon, 3 Sep 2018 17:43:07 +0800 Subject: [PATCH 04/11] Implement dead letter topic in client. --- .../pulsar/client/impl/RawReaderImpl.java | 2 +- .../client/api/DeadLetterTopicTest.java | 115 ++++++++++++++++++ .../impl/CompactedOutBatchMessageTest.java | 2 +- .../pulsar/client/api/ConsumerBuilder.java | 7 ++ .../apache/pulsar/client/impl/ClientCnx.java | 2 +- .../client/impl/ConsumerBuilderImpl.java | 9 +- .../pulsar/client/impl/ConsumerImpl.java | 78 +++++++++++- .../impl/conf/ConsumerConfigurationData.java | 3 + .../pulsar/common/api/proto/PulsarApi.java | 30 ++--- .../policies/data/DeadLetterPolicy.java | 1 + pulsar-common/src/main/proto/PulsarApi.proto | 2 +- 11 files changed, 226 insertions(+), 25 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index ae1a4dba17257..b428be1d74817 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -178,7 +178,7 @@ public CompletableFuture closeAsync() { } @Override - void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription, messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java new file mode 100644 index 0000000000000..e62ec7749b55e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.common.policies.data.DeadLetterPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNull; + +public class DeadLetterTopicTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDeadLetterTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 100; + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount)) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java index 85dbb245b9013..de1b88a21f965 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -73,7 +73,7 @@ public void testCompactedOutMessages() throws Exception { = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) .subscriptionName("my-subscriber-name").subscribe()) { // shove it in the sideways - consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer, + consumer.receiveIndividualMessagesFromBatch(metadata, 0, batchBuffer, MessageIdData.newBuilder().setLedgerId(1234) .setEntryId(567).build(), consumer.cnx()); Message m = consumer.receive(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 8657859103c51..45a61e7b0e836 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.common.policies.data.DeadLetterPolicy; + import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -330,4 +332,9 @@ public interface ConsumerBuilder extends Cloneable { * Set subscriptionInitialPosition for the consumer */ ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition); + + /** + * Set dead letter policy for consumer + */ + ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 7287194f62c8d..9306a8248bdb3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -284,7 +284,7 @@ protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayloa } ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId()); if (consumer != null) { - consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this); + consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f0067f7f10f2f..4b50492a7d26b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.policies.data.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -242,7 +243,13 @@ public ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPositio return this; } - public ConsumerConfigurationData getConf() { + @Override + public ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { + conf.setDeadLetterPolicy(deadLetterPolicy); + return this; + } + + public ConsumerConfigurationData getConf() { return conf; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fe37b69a50963..2406b44f2bd57 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -54,11 +55,13 @@ import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -81,6 +84,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,6 +137,12 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final String topicNameWithoutPartition; + private ConcurrentHashMap> possibleSendToDeadLetterTopicMessages; + + private DeadLetterPolicy deadLetterPolicy; + + private Producer deadLetterProducer; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -205,6 +215,23 @@ enum SubscriptionMode { NonPersistentAcknowledgmentGroupingTracker.of(); } + if (conf.getDeadLetterPolicy() != null) { + possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>(); + if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) { + this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), conf.getDeadLetterPolicy().getDeadLetterTopic()); + } else { + this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), String.format("%s-%s-DLQ", topic, subscription)); + } + try { + deadLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .blockIfQueueFull(false) + .create(); + } catch (Exception e) { + log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); + } + } + topicNameWithoutPartition = topicName.getPartitionedTopicName(); grabCnx(); @@ -233,6 +260,9 @@ public CompletableFuture unsubscribeAsync() { cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> { cnx.removeConsumer(consumerId); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } client.cleanupConsumer(ConsumerImpl.this); log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription); unsubscribeFuture.complete(null); @@ -439,9 +469,11 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack stats.incrementNumAcksSent(batchMessageId.getBatchSize()); unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + possibleSendToDeadLetterTopicMessages.remove(batchMessageId); } else { // increment counter by 1 for non-batch msg unAckedMessageTracker.remove(msgId); + possibleSendToDeadLetterTopicMessages.remove(msgId); stats.incrementNumAcksSent(1); } } else if (ackType == AckType.Cumulative) { @@ -468,7 +500,9 @@ public void connectionOpened(final ClientCnx cnx) { synchronized (this) { currentSize = incomingMessages.size(); startMessageId = clearReceiverQueue(); - unAckedMessageTracker.clear(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } } boolean isDurable = subscriptionMode == SubscriptionMode.Durable; @@ -612,6 +646,9 @@ public void connectionFailed(PulsarClientException exception) { public CompletableFuture closeAsync() { if (getState() == State.Closing || getState() == State.Closed) { unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } return CompletableFuture.completedFuture(null); } @@ -619,6 +656,9 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); setState(State.Closed); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } client.cleanupConsumer(this); return CompletableFuture.completedFuture(null); } @@ -640,6 +680,9 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed consumer", topic, subscription); setState(State.Closed); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } closeFuture.complete(null); client.cleanupConsumer(this); // fail all pending-receive futures to notify application @@ -686,7 +729,7 @@ void activeConsumerChanged(boolean isActive) { }); } - void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); @@ -745,6 +788,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { final MessageImpl message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema); + uncompressedPayload.release(); msgMetadata.recycle(); @@ -754,6 +798,9 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); + if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message)); + } if (!pendingReceives.isEmpty()) { notifyPendingReceivedCallback(message, null); } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) { @@ -780,7 +827,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC }); } else { // handle batch message enqueuing; uncompressed payload has all messages in batch - receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, cnx); + receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx); } uncompressedPayload.release(); msgMetadata.recycle(); @@ -869,7 +916,7 @@ private void triggerZeroQueueSizeListener(final Message message) { }); } - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload, + void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { int batchSize = msgMetadata.getNumMessagesInBatch(); @@ -878,7 +925,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc getPartitionIndex()); BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); unAckedMessageTracker.add(batchMessage); - + List possibleToDeadLetter = null; + if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleToDeadLetter = new ArrayList<>(); + } int skippedMessages = 0; try { for (int i = 0; i < batchSize; ++i) { @@ -918,6 +968,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc final MessageImpl message = new MessageImpl<>(batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, createEncryptionContext(msgMetadata), cnx, schema); + if (possibleToDeadLetter != null) { + possibleToDeadLetter.add(message); + } lock.readLock().lock(); try { if (pendingReceives.isEmpty()) { @@ -935,6 +988,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } + if (possibleToDeadLetter != null) { + possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter); + } + if (log.isDebugEnabled()) { log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription, consumerName, incomingMessages.size(), incomingMessages.remainingCapacity()); @@ -1172,6 +1229,17 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { + List messages = possibleSendToDeadLetterTopicMessages.get(messageId); + if (messages != null) { + try { + for (MessageImpl message : messages) { + deadLetterProducer.send(message); + } + acknowledge(messageId); + } catch (Exception e) { + log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId); + } + } // attempt to remove message from batchMessageAckTracker builder.setPartition(messageId.getPartitionIndex()); builder.setLedgerId(messageId.getLedgerId()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 47ede41de6000..45eb89697102f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.policies.data.DeadLetterPolicy; @Data public class ConsumerConfigurationData implements Serializable, Cloneable { @@ -82,6 +83,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private int patternAutoDiscoveryPeriod = 1; + private DeadLetterPolicy deadLetterPolicy; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index f2dbf2a67f21c..c6ff6c1668b75 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -13543,9 +13543,9 @@ public interface CommandMessageOrBuilder boolean hasMessageId(); org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId(); - // optional uint64 redelivery_count = 3 [default = 0]; + // optional uint32 redelivery_count = 3 [default = 0]; boolean hasRedeliveryCount(); - long getRedeliveryCount(); + int getRedeliveryCount(); } public static final class CommandMessage extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -13602,20 +13602,20 @@ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId() return messageId_; } - // optional uint64 redelivery_count = 3 [default = 0]; + // optional uint32 redelivery_count = 3 [default = 0]; public static final int REDELIVERY_COUNT_FIELD_NUMBER = 3; - private long redeliveryCount_; + private int redeliveryCount_; public boolean hasRedeliveryCount() { return ((bitField0_ & 0x00000004) == 0x00000004); } - public long getRedeliveryCount() { + public int getRedeliveryCount() { return redeliveryCount_; } private void initFields() { consumerId_ = 0L; messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); - redeliveryCount_ = 0L; + redeliveryCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13653,7 +13653,7 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeMessage(2, messageId_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeUInt64(3, redeliveryCount_); + output.writeUInt32(3, redeliveryCount_); } } @@ -13673,7 +13673,7 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream - .computeUInt64Size(3, redeliveryCount_); + .computeUInt32Size(3, redeliveryCount_); } memoizedSerializedSize = size; return size; @@ -13792,7 +13792,7 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x00000002); - redeliveryCount_ = 0L; + redeliveryCount_ = 0; bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -13912,7 +13912,7 @@ public Builder mergeFrom( } case 24: { bitField0_ |= 0x00000004; - redeliveryCount_ = input.readUInt64(); + redeliveryCount_ = input.readUInt32(); break; } } @@ -13985,15 +13985,15 @@ public Builder clearMessageId() { return this; } - // optional uint64 redelivery_count = 3 [default = 0]; - private long redeliveryCount_ ; + // optional uint32 redelivery_count = 3 [default = 0]; + private int redeliveryCount_ ; public boolean hasRedeliveryCount() { return ((bitField0_ & 0x00000004) == 0x00000004); } - public long getRedeliveryCount() { + public int getRedeliveryCount() { return redeliveryCount_; } - public Builder setRedeliveryCount(long value) { + public Builder setRedeliveryCount(int value) { bitField0_ |= 0x00000004; redeliveryCount_ = value; @@ -14001,7 +14001,7 @@ public Builder setRedeliveryCount(long value) { } public Builder clearRedeliveryCount() { bitField0_ = (bitField0_ & ~0x00000004); - redeliveryCount_ = 0L; + redeliveryCount_ = 0; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java index 1e05d822cf3e8..f9c1f512fceda 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java @@ -41,4 +41,5 @@ public int getMaxRedeliverCount() { public String getDeadLetterTopic() { return deadLetterTopic; } + } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b81f713246882..c50f0728e4961 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -335,7 +335,7 @@ message CommandSendError { message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; - optional uint64 redelivery_count = 3 [default = 0]; + optional uint32 redelivery_count = 3 [default = 0]; } message CommandAck { From 3c6a0c81a736fa56d24230ddd38932b4f26c6ee8 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Mon, 3 Sep 2018 20:48:02 +0800 Subject: [PATCH 05/11] Add UT to dead letter topic. --- .../client/api/DeadLetterTopicTest.java | 56 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 27 ++++++--- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index e62ec7749b55e..7d4e0e13b957f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -64,6 +64,8 @@ public void testDeadLetterTopic() throws Exception { producer.close(); + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -112,4 +114,58 @@ public void testDeadLetterTopic() throws Exception { checkConsumer.close(); } + + @Test + public void testDeadLetterTopicByCustomTopicName() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 100; + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .receiverQueueSize(100) + .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount, "persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + deadLetterConsumer.close(); + consumer.close(); + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + checkConsumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2406b44f2bd57..4acb2195e04b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -62,6 +62,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -137,7 +138,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final String topicNameWithoutPartition; - private ConcurrentHashMap> possibleSendToDeadLetterTopicMessages; + private ConcurrentHashMap>> possibleSendToDeadLetterTopicMessages; private DeadLetterPolicy deadLetterPolicy; @@ -223,7 +224,8 @@ enum SubscriptionMode { this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), String.format("%s-%s-DLQ", topic, subscription)); } try { - deadLetterProducer = client.newProducer(schema) + PulsarClient deadLetterClient = PulsarClient.builder().serviceUrl(client.getConfiguration().getServiceUrl()).build(); + deadLetterProducer = deadLetterClient.newProducer(schema) .topic(this.deadLetterPolicy.getDeadLetterTopic()) .blockIfQueueFull(false) .create(); @@ -469,11 +471,15 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack stats.incrementNumAcksSent(batchMessageId.getBatchSize()); unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); - possibleSendToDeadLetterTopicMessages.remove(batchMessageId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(batchMessageId); + } } else { // increment counter by 1 for non-batch msg unAckedMessageTracker.remove(msgId); - possibleSendToDeadLetterTopicMessages.remove(msgId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(msgId); + } stats.incrementNumAcksSent(1); } } else if (ackType == AckType.Cumulative) { @@ -925,7 +931,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv getPartitionIndex()); BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); unAckedMessageTracker.add(batchMessage); - List possibleToDeadLetter = null; + List> possibleToDeadLetter = null; if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleToDeadLetter = new ArrayList<>(); } @@ -1229,15 +1235,18 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { - List messages = possibleSendToDeadLetterTopicMessages.get(messageId); + List> messages = possibleSendToDeadLetterTopicMessages.get(messageId); if (messages != null) { try { - for (MessageImpl message : messages) { - deadLetterProducer.send(message); + for (MessageImpl message : messages) { + deadLetterProducer.newMessage() + .value(message.getValue()) + .properties(message.getProperties()) + .send(); } acknowledge(messageId); } catch (Exception e) { - log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId); + log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); } } // attempt to remove message from batchMessageAckTracker From 08a012dd0cece01b8f597cc184c56251efdaa4ce Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Mon, 3 Sep 2018 21:04:57 +0800 Subject: [PATCH 06/11] Fix bug in dead letter topic with multi topic subscription. --- .../client/api/DeadLetterTopicTest.java | 76 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 8 +- 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 7d4e0e13b957f..dcd1089dab990 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -115,6 +115,82 @@ public void testDeadLetterTopic() throws Exception { checkConsumer.close(); } + @Test + public void testDeadLetterTopicWithMultiTopic() throws Exception { + final String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1"; + final String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2"; + + final int maxRedeliveryCount = 2; + + int sendMessages = 100; + + Producer producer1 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic1) + .create(); + + Producer producer2 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic2) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer1.send(String.format("Hello Pulsar [%d]", i).getBytes()); + producer2.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + sendMessages = sendMessages * 2; + + producer1.close(); + producer2.close(); + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount)) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + @Test public void testDeadLetterTopicByCustomTopicName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4acb2195e04b3..5d0c2cb2007b4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1235,7 +1235,13 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { - List> messages = possibleSendToDeadLetterTopicMessages.get(messageId); + List> messages = null; + if (messageId instanceof BatchMessageIdImpl) { + messages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), + getPartitionIndex())); + } else { + messages = possibleSendToDeadLetterTopicMessages.get(messageId); + } if (messages != null) { try { for (MessageImpl message : messages) { From 793fc1f3592d0e625113b0687151c60b3f4df9c2 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Mon, 3 Sep 2018 21:11:40 +0800 Subject: [PATCH 07/11] Fix bug in send dead letter message then acknowledge. --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5d0c2cb2007b4..75a1d34885f97 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -472,7 +472,8 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); if (possibleSendToDeadLetterTopicMessages != null) { - possibleSendToDeadLetterTopicMessages.remove(batchMessageId); + possibleSendToDeadLetterTopicMessages.remove(new MessageIdImpl(batchMessageId.getLedgerId(), + batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); } } else { // increment counter by 1 for non-batch msg From 50ad8f7109a2c27cabe70ade614b763fac698a99 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Tue, 4 Sep 2018 10:51:00 +0800 Subject: [PATCH 08/11] Fix bugs in dead letter topic. --- .../apache/pulsar/client/impl/ConsumerImpl.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 75a1d34885f97..d1a2883232f87 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -805,7 +805,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message)); } if (!pendingReceives.isEmpty()) { @@ -995,7 +995,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } - if (possibleToDeadLetter != null) { + if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) { possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter); } @@ -1237,11 +1237,13 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { List> messages = null; - if (messageId instanceof BatchMessageIdImpl) { - messages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), - getPartitionIndex())); - } else { - messages = possibleSendToDeadLetterTopicMessages.get(messageId); + if (possibleSendToDeadLetterTopicMessages != null) { + if (messageId instanceof BatchMessageIdImpl) { + messages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), + getPartitionIndex())); + } else { + messages = possibleSendToDeadLetterTopicMessages.get(messageId); + } } if (messages != null) { try { From b3a17e7cba700833bb7d7c86361e7228ced3b9c8 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Tue, 4 Sep 2018 17:52:42 +0800 Subject: [PATCH 09/11] Do not instance dead letter producer in new consumerImpl(). --- .../pulsar/client/impl/ConsumerImpl.java | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d1a2883232f87..b76c3b525e773 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -223,15 +223,6 @@ enum SubscriptionMode { } else { this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), String.format("%s-%s-DLQ", topic, subscription)); } - try { - PulsarClient deadLetterClient = PulsarClient.builder().serviceUrl(client.getConfiguration().getServiceUrl()).build(); - deadLetterProducer = deadLetterClient.newProducer(schema) - .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .blockIfQueueFull(false) - .create(); - } catch (Exception e) { - log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); - } } topicNameWithoutPartition = topicName.getPartitionedTopicName(); @@ -1236,26 +1227,38 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { - List> messages = null; + List> deadLetterMessages = null; if (possibleSendToDeadLetterTopicMessages != null) { if (messageId instanceof BatchMessageIdImpl) { - messages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex())); } else { - messages = possibleSendToDeadLetterTopicMessages.get(messageId); + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId); } } - if (messages != null) { - try { - for (MessageImpl message : messages) { - deadLetterProducer.newMessage() - .value(message.getValue()) - .properties(message.getProperties()) - .send(); + if (deadLetterMessages != null) { + if (deadLetterProducer == null) { + try { + deadLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .blockIfQueueFull(false) + .create(); + } catch (Exception e) { + log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); + } + } + if (deadLetterProducer != null) { + try { + for (MessageImpl message : deadLetterMessages) { + deadLetterProducer.newMessage() + .value(message.getValue()) + .properties(message.getProperties()) + .send(); + } + acknowledge(messageId); + } catch (Exception e) { + log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); } - acknowledge(messageId); - } catch (Exception e) { - log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); } } // attempt to remove message from batchMessageAckTracker From 903c4a0d20fa24b0221965258407e104527fb1e9 Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Tue, 11 Sep 2018 14:36:06 +0800 Subject: [PATCH 10/11] optimization of dead letter topic. --- ...er.java => InMemoryRedeliveryTracker.java} | 5 +- ...PersistentDispatcherMultipleConsumers.java | 4 +- ...sistentDispatcherSingleActiveConsumer.java | 4 +- .../client/api/DeadLetterTopicTest.java | 10 ++- .../pulsar/client/api/ConsumerBuilder.java | 21 ++++- .../pulsar/client/api}/DeadLetterPolicy.java | 27 ++---- .../client/impl/ConsumerBuilderImpl.java | 2 +- .../pulsar/client/impl/ConsumerImpl.java | 88 ++++++++++--------- .../impl/conf/ConsumerConfigurationData.java | 2 +- .../apache/pulsar/common/api/Commands.java | 4 +- 10 files changed, 91 insertions(+), 76 deletions(-) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/service/{nonpersistent/NonPersistentRedeliveryTracker.java => InMemoryRedeliveryTracker.java} (90%) rename {pulsar-common/src/main/java/org/apache/pulsar/common/policies/data => pulsar-client/src/main/java/org/apache/pulsar/client/api}/DeadLetterPolicy.java (61%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java similarity index 90% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index d875979af8e43..99b38ccf2fb76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -16,16 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service.nonpersistent; +package org.apache.pulsar.broker.service; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.RedeliveryTracker; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -public class NonPersistentRedeliveryTracker implements RedeliveryTracker { +public class InMemoryRedeliveryTracker implements RedeliveryTracker { private ConcurrentHashMap trackerCache = new ConcurrentHashMap<>(16); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a0a13ac0684a7..5198a139ec536 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -45,7 +45,7 @@ import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.RedeliveryTracker; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentRedeliveryTracker; +import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -100,7 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); - this.redeliveryTracker = new NonPersistentRedeliveryTracker(); + this.redeliveryTracker = new InMemoryRedeliveryTracker(); this.readBatchSize = MaxReadBatchSize; this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index a74f8036c3870..53b8894d4e148 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -40,7 +40,7 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.RedeliveryTracker; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentRedeliveryTracker; +import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -76,7 +76,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.readBatchSize = MaxReadBatchSize; this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.dispatchRateLimiter = null; - this.redeliveryTracker = new NonPersistentRedeliveryTracker(); + this.redeliveryTracker = new InMemoryRedeliveryTracker(); } protected void scheduleReadOnActiveConsumer() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index dcd1089dab990..114152af07dc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.api; -import org.apache.pulsar.common.policies.data.DeadLetterPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -71,7 +70,7 @@ public void testDeadLetterTopic() throws Exception { .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .ackTimeout(3, TimeUnit.SECONDS) - .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount)) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) .receiverQueueSize(100) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -147,7 +146,7 @@ public void testDeadLetterTopicWithMultiTopic() throws Exception { .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .ackTimeout(3, TimeUnit.SECONDS) - .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount)) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) .receiverQueueSize(100) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -209,7 +208,10 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception { .subscriptionType(SubscriptionType.Shared) .ackTimeout(3, TimeUnit.SECONDS) .receiverQueueSize(100) - .deadLetterPolicy(new DeadLetterPolicy(maxRedeliveryCount, "persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ") + .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 7860248fe91d5..d94175846ad34 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.client.api; -import org.apache.pulsar.common.policies.data.DeadLetterPolicy; - import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -343,6 +341,25 @@ public interface ConsumerBuilder extends Cloneable { /** * Set dead letter policy for consumer + * + * By default some message will redelivery so many times possible, even to the extent that it can be never stop. + * By using dead letter mechanism messages will has the max redelivery count, when message exceeding the maximum + * number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic. + * + * You can enable the dead letter mechanism by setting dead letter policy. + * example: + *
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
+     *          .subscribe();
+     * 
+ * Default dead letter topic name is {TopicName}-{Subscription}-DLQ. + * To setting a custom dead letter topic name + *
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
+     *          .subscribe();
+     * 
*/ ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java similarity index 61% rename from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java index f9c1f512fceda..52a2a23b7fc1f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java @@ -16,30 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.policies.data; +package org.apache.pulsar.client.api; +import lombok.Builder; +import lombok.Data; + +@Builder +@Data public class DeadLetterPolicy { - private final int maxRedeliverCount; + private int maxRedeliverCount; private String deadLetterTopic; - - public DeadLetterPolicy(int maxRedeliverCount) { - this.maxRedeliverCount = maxRedeliverCount; - } - - public DeadLetterPolicy(int maxRedeliverCount, String deadLetterTopic) { - this.maxRedeliverCount = maxRedeliverCount; - this.deadLetterTopic = deadLetterTopic; - } - - public int getMaxRedeliverCount() { - return maxRedeliverCount; - } - - public String getDeadLetterTopic() { - return deadLetterTopic; - } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index c3e8c4c044a57..103bb5e847aec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -44,7 +44,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.policies.data.DeadLetterPolicy; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 17b25511e8971..b7f9918d92f29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -27,14 +27,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; import java.io.IOException; import java.util.ArrayList; -import static java.util.Base64.getEncoder; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -62,7 +60,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -85,7 +82,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.DeadLetterPolicy; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,9 +216,15 @@ enum SubscriptionMode { if (conf.getDeadLetterPolicy() != null) { possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>(); if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) { - this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), conf.getDeadLetterPolicy().getDeadLetterTopic()); + this.deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) + .deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic()) + .build(); } else { - this.deadLetterPolicy = new DeadLetterPolicy(conf.getDeadLetterPolicy().getMaxRedeliverCount(), String.format("%s-%s-DLQ", topic, subscription)); + this.deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) + .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription)) + .build(); } } @@ -1239,40 +1242,8 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { - List> deadLetterMessages = null; - if (possibleSendToDeadLetterTopicMessages != null) { - if (messageId instanceof BatchMessageIdImpl) { - deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), - getPartitionIndex())); - } else { - deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId); - } - } - if (deadLetterMessages != null) { - if (deadLetterProducer == null) { - try { - deadLetterProducer = client.newProducer(schema) - .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .blockIfQueueFull(false) - .create(); - } catch (Exception e) { - log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); - } - } - if (deadLetterProducer != null) { - try { - for (MessageImpl message : deadLetterMessages) { - deadLetterProducer.newMessage() - .value(message.getValue()) - .properties(message.getProperties()) - .send(); - } - acknowledge(messageId); - } catch (Exception e) { - log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); - } - } - } + // process message possible to dead letter topic + processPossibleToDLQ(messageId); // attempt to remove message from batchMessageAckTracker builder.setPartition(messageId.getPartitionIndex()); builder.setLedgerId(messageId.getLedgerId()); @@ -1301,6 +1272,43 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } } + private void processPossibleToDLQ(MessageIdImpl messageId) { + List> deadLetterMessages = null; + if (possibleSendToDeadLetterTopicMessages != null) { + if (messageId instanceof BatchMessageIdImpl) { + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), + getPartitionIndex())); + } else { + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId); + } + } + if (deadLetterMessages != null) { + if (deadLetterProducer == null) { + try { + deadLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .blockIfQueueFull(false) + .create(); + } catch (Exception e) { + log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); + } + } + if (deadLetterProducer != null) { + try { + for (MessageImpl message : deadLetterMessages) { + deadLetterProducer.newMessage() + .value(message.getValue()) + .properties(message.getProperties()) + .send(); + } + acknowledge(messageId); + } catch (Exception e) { + log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); + } + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 45eb89697102f..a0fd493494c20 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -38,7 +38,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.common.policies.data.DeadLetterPolicy; +import org.apache.pulsar.client.api.DeadLetterPolicy; @Data public class ConsumerConfigurationData implements Serializable, Cloneable { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 44c24df8a05a5..c94482d0227fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -286,7 +286,9 @@ public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, i CommandMessage.Builder msgBuilder = CommandMessage.newBuilder(); msgBuilder.setConsumerId(consumerId); msgBuilder.setMessageId(messageId); - msgBuilder.setRedeliveryCount(redeliveryCount); + if (redeliveryCount > 0) { + msgBuilder.setRedeliveryCount(redeliveryCount); + } CommandMessage msg = msgBuilder.build(); BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); BaseCommand cmd = cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build(); From df07259f45ad3f665ca26398bf69cf4e849795ee Mon Sep 17 00:00:00 2001 From: codelipenghui <309994536@qq.com> Date: Thu, 13 Sep 2018 14:55:43 +0800 Subject: [PATCH 11/11] Disable RedeliveryTracker in PersistentDispatcherSingleActiveConsumer --- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 53b8894d4e148..89dfb473313f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -40,7 +40,7 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.RedeliveryTracker; -import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -76,7 +76,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.readBatchSize = MaxReadBatchSize; this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.dispatchRateLimiter = null; - this.redeliveryTracker = new InMemoryRedeliveryTracker(); + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } protected void scheduleReadOnActiveConsumer() {