Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 307b5c9

Browse files
committedFeb 21, 2025
[fix][broker] Geo Replication lost messages or frequently fails due to Deduplication is not appropriate for Geo-Replication (apache#23697)
(cherry picked from commit 4ac4f3c)
1 parent 95101f3 commit 307b5c9

File tree

19 files changed

+1552
-47
lines changed

19 files changed

+1552
-47
lines changed
 

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public void startProducer() {
176176
prepareCreateProducer().thenCompose(ignore -> {
177177
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
178178
builderImpl.getConf().setNonPartitionedTopicExpected(true);
179+
builderImpl.getConf().setReplProducer(true);
179180
return producerBuilder.createAsync().thenAccept(producer -> {
180181
setProducerAndTriggerReadEntries(producer);
181182
});

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

+75-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
2222
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
23+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
24+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2325
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
2426
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -87,6 +89,7 @@ public class Producer {
8789

8890
private final PublisherStatsImpl stats;
8991
private final boolean isRemote;
92+
private final boolean isRemoteOrShadow;
9093
private final String remoteCluster;
9194
private final boolean isNonPersistentTopic;
9295
private final boolean isShadowTopic;
@@ -151,6 +154,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
151154

152155
String replicatorPrefix = serviceConf.getReplicatorPrefix() + ".";
153156
this.isRemote = producerName.startsWith(replicatorPrefix);
157+
this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix());
154158
this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);
155159

156160
this.isEncrypted = isEncrypted;
@@ -162,6 +166,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
162166
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
163167
}
164168

169+
/**
170+
* Difference with "isRemote" is whether the prefix string is end with a dot.
171+
*/
172+
public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) {
173+
return producerName != null && producerName.startsWith(replicatorPrefix);
174+
}
175+
165176
/**
166177
* Producer name for replicator is in format.
167178
* "replicatorPrefix.localCluster" (old)
@@ -270,11 +281,16 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
270281
return true;
271282
}
272283

284+
private boolean isSupportsReplDedupByLidAndEid() {
285+
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
286+
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
287+
}
288+
273289
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
274290
boolean isMarker, Position position) {
275291
MessagePublishContext messagePublishContext =
276292
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
277-
batchSize, isChunked, System.nanoTime(), isMarker, position);
293+
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
278294
if (brokerInterceptor != null) {
279295
brokerInterceptor
280296
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -286,7 +302,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
286302
long batchSize, boolean isChunked, boolean isMarker, Position position) {
287303
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
288304
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
289-
isChunked, System.nanoTime(), isMarker, position);
305+
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
290306
if (brokerInterceptor != null) {
291307
brokerInterceptor
292308
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -382,6 +398,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
382398
private long batchSize;
383399
private boolean chunked;
384400
private boolean isMarker;
401+
private boolean supportsReplDedupByLidAndEid;
385402

386403
private long startTimeNs;
387404

@@ -472,6 +489,11 @@ public long getOriginalSequenceId() {
472489
return originalSequenceId;
473490
}
474491

492+
@Override
493+
public boolean supportsReplDedupByLidAndEid() {
494+
return supportsReplDedupByLidAndEid;
495+
}
496+
475497
@Override
476498
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
477499
this.originalHighestSequenceId = originalHighestSequenceId;
@@ -539,8 +561,12 @@ public void run() {
539561
// stats
540562
rateIn.recordMultipleEvents(batchSize, msgSize);
541563
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
542-
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
543-
ledgerId, entryId);
564+
if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) {
565+
sendSendReceiptResponseRepl();
566+
} else {
567+
// Repl V1 is the same as normal for this handling.
568+
sendSendReceiptResponseNormal();
569+
}
544570
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
545571
if (this.chunked) {
546572
producer.chunkedMessageRate.recordEvent();
@@ -553,8 +579,46 @@ public void run() {
553579
recycle();
554580
}
555581

582+
private void sendSendReceiptResponseRepl() {
583+
// Case-1: is a repl marker.
584+
boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null;
585+
if (isReplMarker) {
586+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE,
587+
ledgerId, entryId);
588+
589+
return;
590+
}
591+
// Case-2: is a repl message.
592+
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
593+
if (positionPairObj == null || !(positionPairObj instanceof long[])
594+
|| ((long[]) positionPairObj).length < 2) {
595+
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
596+
+ " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {},"
597+
+ " sequence-id {}, prop-{}: not in expected format",
598+
producer.topic.getName(), producer.producerName,
599+
supportsReplDedupByLidAndEid(), getSequenceId(),
600+
MSG_PROP_REPL_SOURCE_POSITION);
601+
producer.cnx.getCommandSender().sendSendError(producer.producerId,
602+
Math.max(highestSequenceId, sequenceId),
603+
ServerError.PersistenceError, "Message can not determine whether the message is"
604+
+ " duplicated due to the acquired messages props were are invalid");
605+
return;
606+
}
607+
long[] positionPair = (long[]) positionPairObj;
608+
long replSequenceLId = positionPair[0];
609+
long replSequenceEId = positionPair[1];
610+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
611+
replSequenceEId, ledgerId, entryId);
612+
}
613+
614+
private void sendSendReceiptResponseNormal() {
615+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
616+
ledgerId, entryId);
617+
}
618+
556619
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
557-
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
620+
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
621+
boolean supportsReplDedupByLidAndEid) {
558622
MessagePublishContext callback = RECYCLER.get();
559623
callback.producer = producer;
560624
callback.sequenceId = sequenceId;
@@ -566,6 +630,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
566630
callback.originalSequenceId = -1L;
567631
callback.startTimeNs = startTimeNs;
568632
callback.isMarker = isMarker;
633+
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
569634
callback.ledgerId = position == null ? -1 : position.getLedgerId();
570635
callback.entryId = position == null ? -1 : position.getEntryId();
571636
if (callback.propertyMap != null) {
@@ -575,7 +640,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
575640
}
576641

577642
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
578-
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
643+
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
644+
boolean supportsReplDedupByLidAndEid) {
579645
MessagePublishContext callback = RECYCLER.get();
580646
callback.producer = producer;
581647
callback.sequenceId = lowestSequenceId;
@@ -588,6 +654,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
588654
callback.startTimeNs = startTimeNs;
589655
callback.chunked = chunked;
590656
callback.isMarker = isMarker;
657+
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
591658
callback.ledgerId = position == null ? -1 : position.getLedgerId();
592659
callback.entryId = position == null ? -1 : position.getEntryId();
593660
if (callback.propertyMap != null) {
@@ -815,7 +882,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
815882
}
816883
MessagePublishContext messagePublishContext =
817884
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
818-
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
885+
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
886+
cnx.isClientSupportsReplDedupByLidAndEid());
819887
if (brokerInterceptor != null) {
820888
brokerInterceptor
821889
.onMessagePublish(this, headersAndPayload, messagePublishContext);

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import javax.net.ssl.SSLSession;
6565
import javax.ws.rs.WebApplicationException;
6666
import javax.ws.rs.core.Response;
67+
import lombok.Getter;
6768
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6869
import org.apache.bookkeeper.mledger.Entry;
6970
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -233,6 +234,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
233234

234235
// Flag to manage throttling-rate by atomically enable/disable read-channel.
235236
private volatile boolean autoReadDisabledRateLimiting = false;
237+
@Getter
236238
private FeatureFlags features;
237239

238240
private PulsarCommandSender commandSender;

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

+4
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ default long getEntryTimestamp() {
127127
default void setEntryTimestamp(long entryTimestamp) {
128128

129129
}
130+
131+
default boolean supportsReplDedupByLidAndEid() {
132+
return false;
133+
}
130134
}
131135

132136
CompletableFuture<Void> initialize();

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.SocketAddress;
2424
import java.util.concurrent.CompletableFuture;
2525
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
26+
import org.apache.pulsar.common.api.proto.FeatureFlags;
2627

2728
public interface TransportCnx {
2829

@@ -88,4 +89,11 @@ public interface TransportCnx {
8889
* is null if the connection liveness check is disabled.
8990
*/
9091
CompletableFuture<Boolean> checkConnectionLiveness();
92+
93+
FeatureFlags getFeatures();
94+
95+
default boolean isClientSupportsReplDedupByLidAndEid() {
96+
return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid()
97+
&& getFeatures().isSupportsReplDedupByLidAndEid();
98+
}
9199
}

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2122
import io.netty.buffer.ByteBuf;
2223
import java.util.List;
2324
import java.util.concurrent.CompletableFuture;
@@ -195,9 +196,15 @@ protected boolean replicateEntries(List<Entry> entries) {
195196
msg.setSchemaInfoForReplicator(schemaFuture.get());
196197
msg.getMessageBuilder().clearTxnidMostBits();
197198
msg.getMessageBuilder().clearTxnidLeastBits();
199+
// Add props for sequence checking.
200+
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
201+
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));
198202
msgOut.recordEvent(headersAndPayload.readableBytes());
199203
// Increment pending messages for messages produced locally
200204
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
205+
if (log.isDebugEnabled()) {
206+
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
207+
}
201208
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
202209
atLeastOneMessageSentForReplication = true;
203210
}

0 commit comments

Comments
 (0)
Please sign in to comment.