Skip to content

Commit

Permalink
[fix][broker] Reader stuck after call hasMessageAvailable when enable…
Browse files Browse the repository at this point in the history
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
(cherry picked from commit 1dacca5)
  • Loading branch information
shibd authored and srinath-ctds committed May 16, 2024
1 parent 07b40ee commit 275090a
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.util;

import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.classification.InterfaceStability;

@InterfaceStability.Evolving
public class ManagedLedgerImplUtils {

/**
* Reverse find last valid position one-entry by one-entry.
*/
public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
final Predicate<Entry> predicate,
final PositionImpl startPosition) {
CompletableFuture<Position> future = new CompletableFuture<>();
if (!ledger.isValidPosition(startPosition)) {
future.complete(startPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
}
return future;
}

private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
final Predicate<Entry> predicate,
final PositionImpl position,
final CompletableFuture<Position> future) {
ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
final Position position = entry.getPosition();
try {
if (predicate.test(entry)) {
future.complete(position);
return;
}
PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
if (!ledger.isValidPosition(previousPosition)) {
future.complete(previousPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate,
ledger.getPreviousPosition((PositionImpl) position), future);
}
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
entry.release();
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.util;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import java.nio.charset.StandardCharsets;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.annotations.Test;

@Slf4j
public class ManagedLedgerImplUtilsTest extends MockedBookKeeperTestCase {

@Test
public void testGetLastValidPosition() throws Exception {
final int maxEntriesPerLedger = 5;

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig);

String matchEntry = "match-entry";
String noMatchEntry = "nomatch-entry";
Predicate<Entry> predicate = entry -> {
String entryValue = entry.getDataBuffer().toString(UTF_8);
return matchEntry.equals(entryValue);
};

// New ledger will return the last position, regardless of whether the conditions are met or not.
Position position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get();
assertEquals(ledger.getLastConfirmedEntry(), position);

for (int i = 0; i < maxEntriesPerLedger - 1; i++) {
ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
}
Position lastMatchPosition = ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < maxEntriesPerLedger; i++) {
ledger.addEntry(noMatchEntry.getBytes(StandardCharsets.UTF_8));
}

// Returns last position of entry is "match-entry"
position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get();
assertEquals(position, lastMatchPosition);

ledger.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2073,29 +2073,31 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
long requestId = getLastMessageId.getRequestId();

Topic topic = consumer.getSubscription().getTopic();
topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> {
Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName(),
consumer.readCompacted());
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
return null;
});
topic.checkIfTransactionBufferRecoverCompletely(true)
.thenCompose(__ -> topic.getLastDispatchablePosition())
.thenApply(lastPosition -> {
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName(),
consumer.readCompacted());
return null;
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
return null;
});
} else {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBack

Position getLastPosition();

/**
* Get the last message position that can be dispatch.
*/
default CompletableFuture<Position> getLastDispatchablePosition() {
throw new UnsupportedOperationException("getLastDispatchablePosition is not supported by default");
}

CompletableFuture<MessageId> getLastMessageId();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -162,6 +163,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -3204,6 +3206,22 @@ public Position getLastPosition() {
return ledger.getLastConfirmedEntry();
}

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
PositionImpl maxReadPosition = getMaxReadPosition();
// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
// so return `maxRedPosition` directly.
if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
return CompletableFuture.completedFuture(maxReadPosition);
} else {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
return !Markers.isServerOnlyMarker(md);
}, maxReadPosition);
}
}

@Override
public CompletableFuture<MessageId> getLastMessageId() {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -167,6 +168,82 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
"messages don't match.");
}

/**
* Tests replicated subscriptions across two regions and can read successful.
*/
@Test
public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
// this setting can be used to manually run the test with subscription replication disabled
// it shows that subscription replication has no impact in behavior for this test case
boolean replicateSubscriptionState = true;

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);

@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create subscription in r2
createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);

Set<String> sentMessages = new LinkedHashSet<>();

// send messages in r1
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
producer.close();


// consume 3 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
try (Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer1, receivedMessages, 3, false);
}

// wait for subscription to be replicated
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

// create a reader in r2
Reader<byte[]> reader = client2.newReader().topic(topicName)
.subscriptionName("new-sub")
.startMessageId(MessageId.earliest)
.create();
int readNum = 0;
while (reader.hasMessageAvailable()) {
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
assertNotNull(message);
log.info("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId());
readNum++;
}
assertEquals(readNum, numMessages);
}

@Test
public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
Expand Down
Loading

0 comments on commit 275090a

Please sign in to comment.