Skip to content

Commit

Permalink
[fix][broker] Fix compaction subscription acknowledge Marker msg issu…
Browse files Browse the repository at this point in the history
…e. (#16205)
  • Loading branch information
Technoboy- authored Jun 26, 2022
1 parent ce24db1 commit 8e0cd9c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +37,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
Expand Down Expand Up @@ -152,15 +154,13 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
Expand All @@ -175,8 +175,7 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray

entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
Expand Down Expand Up @@ -234,6 +233,12 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
return totalEntries;
}

private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
if (!(subscription instanceof CompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties);
}
}

private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
ImmutableList<EntryFilterWithClassLoader> entryFilters) {
for (EntryFilter entryFilter : entryFilters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -44,11 +45,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -66,13 +73,17 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1678,4 +1689,63 @@ public void testHealthCheckTopicNotCompacted() {
producer1.close();
producer2.close();
}

@Test(timeOut = 60000)
public void testCompactionWithMarker() throws Exception {
String namespace = "my-property/use/my-ns";
final TopicName dest = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
admin.topics().createNonPartitionedTopic(dest.toString());
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(dest.toString())
.subscriptionName("test-compaction-sub")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(dest.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join();
Assert.assertTrue(topic.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topic.get();
Random random = new Random();
for (int i = 0; i < 100; i++) {
int rad = random.nextInt(3);
ByteBuf marker;
if (rad == 0) {
marker = Markers.newTxnCommitMarker(-1L, 0, i);
} else if (rad == 1) {
marker = Markers.newTxnAbortMarker(-1L, 0, i);
} else {
marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1");
}
persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
//
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
//
}
}, null);
marker.release();
}
producer.send("msg-2".getBytes(StandardCharsets.UTF_8));
admin.topics().triggerCompaction(dest.toString());
Awaitility.await()
.atMost(50, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
Assert.assertNotEquals(ledgerId, -1L);
});
}
}

0 comments on commit 8e0cd9c

Please sign in to comment.