Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed retention of keys in compaction #11287

Merged
merged 2 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;

protected boolean isFirstRead = true;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig) {
Expand Down Expand Up @@ -159,6 +161,10 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
isKeyHashRangeFiltered = false;
}

if (consumers.isEmpty()) {
isFirstRead = true;
}

consumers.add(consumer);

if (!pickAndScheduleActiveConsumer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
Expand Down Expand Up @@ -123,12 +124,13 @@ public class Consumer {
private boolean preciseDispatcherFlowControl;
private PositionImpl readPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta, MessageId startMessageId) {

this.subscription = subscription;
this.subType = subType;
Expand All @@ -148,6 +150,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.appId = appId;

// Ensure we start from compacted view
this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId;

this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
Expand Down Expand Up @@ -835,5 +841,9 @@ public String getClientAddress() {
return clientAddress;
}

public MessageId getStartMessageId() {
return startMessageId;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
}

havePendingRead = false;
isFirstRead = false;

if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
Expand Down Expand Up @@ -338,7 +339,8 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, consumer, topic.getMaxReadPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntry
havePendingRead = false;
}

isFirstRead = false;

if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -197,7 +199,8 @@ protected void readMoreEntries(Consumer consumer) {
havePendingRead = true;

if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
readCompacted, initialPosition, keySharedMeta);
readCompacted, initialPosition, keySharedMeta, startMessageId);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
if (!cnx.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface RawReader {
static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;

public interface CompactedTopic {
CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx);
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
Expand Down Expand Up @@ -81,13 +83,20 @@ public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerI
}

@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx) {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition();
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
cursorPosition = PositionImpl.earliest;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, PositionImpl.latest);
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest);
} else {
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
Expand All @@ -96,35 +105,35 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe
// the cursor just needs to be set to the compaction horizon
if (startPoint == COMPACT_LEDGER_EMPTY) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
return CompletableFuture.completedFuture(null);
}
if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx,
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
PositionImpl.latest);
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
return CompletableFuture.completedFuture(null);
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext());
callback.readEntriesComplete(entries, ctx);
callback.readEntriesComplete(entries, consumer);
});
}
}))
.exceptionally((exception) -> {
if (exception.getCause() instanceof NoSuchElementException) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
callback.readEntriesComplete(Collections.emptyList(), consumer);
} else {
callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
callback.readEntriesFailed(new ManagedLedgerException(exception), consumer);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -299,7 +300,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -310,7 +311,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand Down Expand Up @@ -339,7 +340,7 @@ public void testAddRemoveConsumer() throws Exception {
// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -363,7 +364,7 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
Expand All @@ -377,7 +378,7 @@ public void testAddRemoveConsumer() throws Exception {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName());
Expand Down Expand Up @@ -460,7 +461,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
Expand All @@ -469,7 +470,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);

// 4. Verify active consumer doesn't change
Expand All @@ -482,7 +483,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
Expand Down Expand Up @@ -672,7 +673,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer =
new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest);
try {
consumer.flowPermits(permit);
} catch (Exception e) {
Expand Down
Loading