Skip to content

Commit

Permalink
Fixed retention of keys in compaction (apache#11287)
Browse files Browse the repository at this point in the history
### Motivation

This change fixes few issues in the compaction mechanism, the 

 * When a reader is created, reading from "earliest" message, it should read the compacted data and then continue from the next message. 
 * When the compaction consumer starts, it shouldn't seek to the beginning. This causes 2 issues: 
   * Rescanning of the topic each time the compaction runs
   * Keys that are being dropped from the topic are also getting dropped from the compacted view, while in fact they should be there until explicitly deleted (with an empty message for a key).

The main source of the problem is that when creating a cursor on "earliest" message, the cursor gets automatically adjusted on the earliest message available to read. This confuses the check for the read-compacted because it may think the reader/consumer is already ahead of the compaction horizon.
 
### Modifications

Introduced a "isFirstRead" flag to make sure we double check the start message id and use `MessageId.earliest` instead of the earliest available message to read on the topic. After the first read, the positioning will be fine.
  • Loading branch information
merlimat authored Jul 13, 2021
1 parent 878cc44 commit feb4ff1
Show file tree
Hide file tree
Showing 13 changed files with 328 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;

protected boolean isFirstRead = true;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig) {
Expand Down Expand Up @@ -159,6 +161,10 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
isKeyHashRangeFiltered = false;
}

if (consumers.isEmpty()) {
isFirstRead = true;
}

consumers.add(consumer);

if (!pickAndScheduleActiveConsumer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
Expand Down Expand Up @@ -123,12 +124,13 @@ public class Consumer {
private boolean preciseDispatcherFlowControl;
private PositionImpl readPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta, MessageId startMessageId) {

this.subscription = subscription;
this.subType = subType;
Expand All @@ -148,6 +150,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.appId = appId;

// Ensure we start from compacted view
this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId;

this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
Expand Down Expand Up @@ -835,5 +841,9 @@ public String getClientAddress() {
return clientAddress;
}

public MessageId getStartMessageId() {
return startMessageId;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
}

havePendingRead = false;
isFirstRead = false;

if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
Expand Down Expand Up @@ -338,7 +339,8 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, consumer, topic.getMaxReadPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntry
havePendingRead = false;
}

isFirstRead = false;

if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -197,7 +199,8 @@ protected void readMoreEntries(Consumer consumer) {
havePendingRead = true;

if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
readCompacted, initialPosition, keySharedMeta);
readCompacted, initialPosition, keySharedMeta, startMessageId);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
if (!cnx.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface RawReader {
static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;

public interface CompactedTopic {
CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx);
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
Expand Down Expand Up @@ -81,13 +83,20 @@ public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerI
}

@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx) {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition();
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
cursorPosition = PositionImpl.earliest;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, PositionImpl.latest);
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest);
} else {
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
Expand All @@ -96,35 +105,35 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe
// the cursor just needs to be set to the compaction horizon
if (startPoint == COMPACT_LEDGER_EMPTY) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
return CompletableFuture.completedFuture(null);
}
if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx,
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
PositionImpl.latest);
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
return CompletableFuture.completedFuture(null);
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext());
callback.readEntriesComplete(entries, ctx);
callback.readEntriesComplete(entries, consumer);
});
}
}))
.exceptionally((exception) -> {
if (exception.getCause() instanceof NoSuchElementException) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
} else {
callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
callback.readEntriesFailed(new ManagedLedgerException(exception), consumer);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -299,7 +300,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -310,7 +311,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand Down Expand Up @@ -339,7 +340,7 @@ public void testAddRemoveConsumer() throws Exception {
// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -363,7 +364,7 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
Expand All @@ -377,7 +378,7 @@ public void testAddRemoveConsumer() throws Exception {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName());
Expand Down Expand Up @@ -460,7 +461,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
Expand All @@ -469,7 +470,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);

// 4. Verify active consumer doesn't change
Expand All @@ -482,7 +483,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
Expand Down Expand Up @@ -672,7 +673,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer =
new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest);
try {
consumer.flowPermits(permit);
} catch (Exception e) {
Expand Down
Loading

0 comments on commit feb4ff1

Please sign in to comment.