Skip to content

Commit

Permalink
[fix] [broker] Fix write all compacted out entry into compacted topic (
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled authored and Technoboy- committed Jan 22, 2024
1 parent 74678b6 commit fbf794c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
if (singleMessageMetadata.isCompactedOut()) {
// we may read compacted out message from the compacted topic
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
Unpooled.EMPTY_BUFFER, batchBuffer);
} else if (!singleMessageMetadata.hasPartitionKey()) {
if (retainNullKey) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
Expand All @@ -45,10 +47,15 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
Expand Down Expand Up @@ -160,6 +167,55 @@ public void testCompaction() throws Exception {
compactAndVerify(topic, expected, true);
}

@Test
public void testAllCompactedOut() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testAllCompactedOut";
// set retain null key to true
boolean oldRetainNullKey = pulsar.getConfig().isTopicCompactionRetainNullKey();
pulsar.getConfig().setTopicCompactionRetainNullKey(true);
this.restartBroker();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();

producer.newMessage().key("K1").value("V1").sendAsync();
producer.newMessage().key("K2").value("V2").sendAsync();
producer.newMessage().key("K2").value(null).sendAsync();
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

producer.newMessage().key("K1").value(null).sendAsync();
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.subscriptionName("reader-test")
.topic(topicName)
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}
// set retain null key back to avoid affecting other tests
pulsar.getConfig().setTopicCompactionRetainNullKey(oldRetainNullKey);
}

@Test
public void testCompactAddCompact() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Expand Down

0 comments on commit fbf794c

Please sign in to comment.