Skip to content

Commit

Permalink
[fix][broker] fix replicated subscriptions for transactional messages (
Browse files Browse the repository at this point in the history
…apache#22452)

(cherry picked from commit 9fd1b61)
(cherry picked from commit e300fbd)
thetumbled authored and srinath-ctds committed May 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 7042b52 commit 99c2a9d
Showing 11 changed files with 343 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -124,6 +124,7 @@
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -254,9 +255,13 @@ protected TopicStatsHelper initialValue() {

@Getter
protected final TransactionBuffer transactionBuffer;
@Getter
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack =
(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp();

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
private volatile long lastDataMessagePublishedTimestamp = 0;
// Record the last time max read position is moved forward, unless it's a marker message.
@Getter
private volatile long lastMaxReadPositionMovedForwardTimestamp = 0;
@Getter
private final ExecutorService orderedExecutor;

@@ -352,7 +357,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource());
} else {
@@ -653,6 +658,10 @@ private void decrementPendingWriteOpsAndCheck() {
}
}

private void updateMaxReadPositionMovedForwardTimestamp() {
lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis();
}

@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
@@ -661,12 +670,9 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);

if (!publishContext.isMarkerMessage()) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}

// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(),
publishContext.isMarkerMessage());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
@@ -3837,10 +3843,6 @@ private CompletableFuture<Void> transactionBufferCleanupAndClose() {
return transactionBuffer.clearSnapshot().thenCompose(__ -> transactionBuffer.closeAsync());
}

public long getLastDataMessagePublishedTimestamp() {
return lastDataMessagePublishedTimestamp;
}

public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}
Original file line number Diff line number Diff line change
@@ -206,8 +206,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
private void startNewSnapshot() {
cleanupTimedOutSnapshots();

if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastDataMessagePublishedTimestamp() == 0) {
if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
Original file line number Diff line number Diff line change
@@ -149,8 +149,9 @@ public interface TransactionBuffer {
/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message.
*/
void syncMaxReadPositionForNormalPublish(PositionImpl position);
void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage);

/**
* Get the can read max position.
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -213,11 +214,17 @@ public TransactionBufferReader newReader(long sequenceId) throws
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;

public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
@@ -369,8 +376,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Original file line number Diff line number Diff line change
@@ -102,6 +102,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen

private final AbortedTxnProcessor snapshotAbortedTxnProcessor;

private final MaxReadPositionCallBack maxReadPositionCallBack;
private final AbortedTxnProcessor.SnapshotType snapshotType;

public TopicTransactionBuffer(PersistentTopic topic) {
@@ -120,6 +121,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}

@@ -175,7 +177,7 @@ public void handleTxnEntry(Entry entry) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
@@ -290,7 +292,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
// max read position is less than first ongoing transaction message position
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition),
false);
}
}

@@ -314,7 +317,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
@@ -361,7 +364,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
txnAbortedCounter.increment();
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}

void updateMaxReadPosition(TxnID txnID) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
/**
* remove the specified transaction from ongoing transaction list and update the max read position.
* @param txnID
*/
void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
} else {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
this.changeMaxReadPositionCount.getAndIncrement();
}

/**
* update the max read position. if the new position is greater than the current max read position,
* we will trigger the callback, unless the disableCallback is true.
* Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
* For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
* to trigger the callback.
* @param newPosition new max read position to update.
* @param disableCallback whether disable the callback.
*/
void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
this.maxReadPosition = newPosition;
if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
if (!checkIfNoSnapshot()) {
this.changeMaxReadPositionCount.getAndIncrement();
}
if (!disableCallback) {
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition);
}
}
}

@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}

/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message, in such case, we
* don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
*/
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
// when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
// thread is the same tread, in this time the lastAddConfirm don't content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
this.maxReadPosition = position;
updateMaxReadPosition(position, isMarkerMessage);
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
changeMaxReadPositionCount.incrementAndGet();
updateMaxReadPosition(position, isMarkerMessage);
}
}
}
@@ -674,6 +704,18 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
}
}

/**
* A functional interface to handle the max read position move forward.
*/
public interface MaxReadPositionCallBack {
/**
* callback method when max read position move forward.
* @param oldPosition the old max read position.
* @param newPosition the new max read position.
*/
void maxReadPositionMovedForward(PositionImpl oldPosition, PositionImpl newPosition);
}

static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -42,8 +43,14 @@
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
@@ -91,8 +98,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Original file line number Diff line number Diff line change
@@ -291,6 +291,8 @@ public void testPublishMessage() throws Exception {
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();

/*
* MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder();
* messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setProducerName("producer-name");
@@ -315,10 +317,10 @@ public void setMetadataFromEntryData(ByteBuf entryData) {
assertEquals(entryData.array(), payload.array());
}
};

topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
}

@Test
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -40,8 +42,10 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -728,6 +732,21 @@ public void testReplicatedSubscriptionRestApi3() throws Exception {
consumer4.close();
}

/**
* before sending message, we should wait for transaction buffer recover complete,
* or the MaxReadPosition will not move forward when the message is sent, and the
* MaxReadPositionMovedForwardTimestamp will not be updated, then the replication will not be triggered.
* @param topicName
* @throws Exception
*/
private void waitTBRecoverComplete(PulsarService pulsarService, String topicName) throws Exception {
TopicTransactionBufferState buffer = (TopicTransactionBufferState) ((PersistentTopic) pulsarService.getBrokerService()
.getTopic(topicName, false).get().get()).getTransactionBuffer();
Field stateField = TopicTransactionBufferState.class.getDeclaredField("state");
stateField.setAccessible(true);
Awaitility.await().until(() -> !stateField.get(buffer).toString().equals("Initializing"));
}

/**
* Tests replicated subscriptions when replicator producer is closed
*/
@@ -755,6 +774,9 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
.subscribe();

// send one message to trigger replication
if (config1.isTransactionCoordinatorEnabled()) {
waitTBRecoverComplete(pulsar1, topicName);
}
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
@@ -916,6 +938,9 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
.statsInterval(0, TimeUnit.SECONDS).build();

Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create();
if (config1.isTransactionCoordinatorEnabled()) {
waitTBRecoverComplete(pulsar1, topicName);
}
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.close();
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TransactionalReplicateSubscriptionTest extends ReplicatorTestBase {
@Override
@BeforeClass(timeOut = 300000)
public void setup() throws Exception {
super.setup();
admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
createTransactionCoordinatorAssign(16, pulsar1);
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

/**
* enable transaction coordinator for the cluster1
*/
@Override
public void setConfig1DefaultValue(){
super.setConfig1DefaultValue();
config1.setTransactionCoordinatorEnabled(true);
}

protected void createTransactionCoordinatorAssign(int numPartitionsOfTC, PulsarService pulsarService) throws MetadataStoreException {
pulsarService.getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(numPartitionsOfTC));
}

/**
* Test replicated subscription with transaction.
* @throws Exception
*/
@Test
public void testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
final String subscriptionName = "s1";
final boolean isReplicatedSubscription = true;
final int messagesCount = 20;
final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
final Set<String> receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>());
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.topics().createNonPartitionedTopic(topicName);
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription);
final PersistentTopic topic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// Send messages
// Wait for the topic created on the cluster2.
// Wait for the snapshot created.
final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).enableTransaction(true).build();
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
Consumer<String> consumer1 = client1.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
Transaction txn1 = client1.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
for (int i = 0; i < messagesCount / 2; i++) {
String msg = i + "";
producer1.newMessage(txn1).value(msg).send();
sentMessages.add(msg);
}
txn1.commit().get();
Awaitility.await().untilAsserted(() -> {
ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators();
assertTrue(replicators != null && replicators.size() == 1, "Replicator should started");
assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected");
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
"One snapshot should be finished");
});
final PersistentTopic topic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> {
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
"Replicated subscription controller should created");
});
Transaction txn2 = client1.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
for (int i = messagesCount / 2; i < messagesCount; i++) {
String msg = i + "";
producer1.newMessage(txn2).value(msg).send();
sentMessages.add(msg);
}
txn2.commit().get();

// Consume half messages and wait the subscription created on the cluster2.
for (int i = 0; i < messagesCount / 2; i++){
Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
if (message == null) {
fail("Should not receive null.");
}
receivedMessages.add(message.getValue());
consumer1.acknowledge(message);
}
Awaitility.await().untilAsserted(() -> {
assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created");
});

// Switch client to cluster2.
// Since the cluster1 was not crash, all messages will be replicated to the cluster2.
consumer1.close();
final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();

// Verify all messages will be consumed.
Awaitility.await().untilAsserted(() -> {
while (true) {
Message message = consumer2.receive(2, TimeUnit.SECONDS);
if (message != null) {
receivedMessages.add(message.getValue().toString());
consumer2.acknowledge(message);
} else {
break;
}
}
assertEquals(receivedMessages.size(), sentMessages.size());
});

consumer2.close();
producer1.close();
client1.close();
client2.close();
}
}
Original file line number Diff line number Diff line change
@@ -19,11 +19,14 @@
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -38,7 +41,9 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -180,6 +185,37 @@ private void produceTest(boolean endAction) throws Exception {
log.info("produce and {} test finished.", endAction ? "commit" : "abort");
}

@Test
public void testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish() throws Exception {
final String topic = NAMESPACE1 + "/testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish";
PulsarClient pulsarClient = this.pulsarClient;
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
PersistentTopic persistentTopic = getTopic(topic);
long lastMaxReadPositionMovedForwardTimestamp = persistentTopic.getLastMaxReadPositionMovedForwardTimestamp();

// transactional publish will not update lastMaxReadPositionMovedForwardTimestamp
producer.newMessage(txn).value("hello world".getBytes()).send();
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() == lastMaxReadPositionMovedForwardTimestamp);

// commit transaction will update lastMaxReadPositionMovedForwardTimestamp
txn.commit().get();
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
}

private PersistentTopic getTopic(String topic) throws ExecutionException, InterruptedException {
Optional<Topic> optionalTopic = getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, true).get();
return (PersistentTopic) optionalTopic.get();
}

private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) {
futureList.forEach(messageIdFuture -> {
try {
Original file line number Diff line number Diff line change
@@ -1076,7 +1076,7 @@ public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exce
});
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1), false);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

}

0 comments on commit 99c2a9d

Please sign in to comment.