Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix NPE when reset Replicator's cursor by position. #20597

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4111,31 +4111,43 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
PersistentSubscription sub = null;
PersistentReplicator repl = null;

if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
} else {
sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
}

CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);

PersistentReplicator finalRepl = repl;
PersistentSubscription finalSub = sub;

batchSizeFuture.thenAccept(bi -> {
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
issued = repl.expireMessages(position);
issued = finalRepl.expireMessages(position);
} else {
issued = sub.expireMessages(position);
issued = finalSub.expireMessages(position);
}

if (issued) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -43,6 +44,7 @@
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
private final PersistentTopic topic;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
Expand All @@ -57,9 +59,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");

public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor,
PersistentSubscription subscription) {
this.topicName = topicName;
public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
@Nullable PersistentSubscription subscription) {
Comment on lines -60 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR modifies the public API (even it's in the pulsar-broker module), we should not cherry-pick it to release branches unless we make it compatible. @poorbarcode @Technoboy-

this.topic = topic;
this.topicName = topic.getName();
this.cursor = cursor;
this.subName = subscriptionName;
this.subscription = subscription;
Expand Down Expand Up @@ -98,11 +101,12 @@ public boolean expireMessages(int messageTTLInSeconds) {

public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition();
if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond "
+ "current topic's last position {}", topicName, subName, messagePosition,
subscription.getTopic().getLastPosition());
+ "current topic's last position {}", topicName, subName, messagePosition,
topicLastPosition);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
brokerService, replicationClient);
this.topic = localTopic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName,
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
Expand Down Expand Up @@ -230,7 +231,11 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
});
assertTrue(ex.get());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
Expand Down Expand Up @@ -407,7 +412,11 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Expand Down Expand Up @@ -444,15 +453,16 @@ void testMessageExpiryWithPosition() throws Exception {
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

PersistentSubscription subscription = mock(PersistentSubscription.class);
Topic topic = mock(Topic.class);
PersistentTopic topic = mock(PersistentTopic.class);
when(subscription.getTopic()).thenReturn(topic);
when(topic.getName()).thenReturn("topicname");

for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));

PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic,
cursor.getName(), cursor, subscription));
assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
Expand Down Expand Up @@ -491,7 +501,7 @@ void testMessageExpiryWithPosition() throws Exception {
clearInvocations(monitor);

ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname",
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic,
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to
// expire message shouldn't issue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,36 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}


@Test(timeOut = 30000)
public void testResetReplicatorSubscriptionPosition() throws Exception {
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);

// Produce from cluster1 and consume from the rest
for (int i = 0; i < 10; i++) {
producer1.produce(2);
}

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();

PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keys().get(0)));

MessageId id = topic.getLastMessageId().get();
admin1.topics().expireMessages(dest.getPartitionedTopicName(),
replicator.getCursor().getName(),
id,false);

replicator.updateRates();

ReplicatorStats status = replicator.getStats();
assertEquals(status.getReplicationBacklog(), 0);
}

@Test(timeOut = 30000)
public void testResetCursorNotFail() throws Exception {

Expand Down