Skip to content

Commit

Permalink
[fix] [log] Do not print warn log when concurrently publishing and sw…
Browse files Browse the repository at this point in the history
…itching ledgers (#23209)

(cherry picked from commit 0a5cb51)
  • Loading branch information
poorbarcode committed Aug 22, 2024
1 parent f5e45b8 commit f7c540e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1679,8 +1679,9 @@ void createNewOpAddEntryForNewLedger() {
if (existsOp.ledger != null) {
existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
} else {
// This scenario should not happen.
log.warn("[{}] An OpAddEntry's ledger is empty.", name);
// It may happen when the following operations execute at the same time, so it is expected.
// - Adding entry.
// - Switching ledger.
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@

import com.carrotsearch.hppc.ObjectSet;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -105,4 +112,67 @@ public void testConsumerListMatchesConsumerSet() throws Exception {
// cleanup.
client.close();
}

@Test(timeOut = 30 * 1000)
public void testConcurrentlyOfPublishAndSwitchLedger() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subscription, MessageId.earliest);
// Make ledger switches faster.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
config.setMaxEntriesPerLedger(2);
config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
// Inject a delay for switching ledgers, so publishing requests will be push in to the pending queue.
AtomicInteger delayTimes = new AtomicInteger();
mockZooKeeper.delay(10, (op, s) -> {
if (op.toString().equals("SET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
return delayTimes.incrementAndGet() == 1;
}
return false;
});
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
.create();
List<CompletableFuture<MessageId>> sendRequests = new ArrayList<>();
List<String> msgsSent = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String msg = i + "";
sendRequests.add(producer.sendAsync(i + ""));
msgsSent.add(msg);
}
// Verify:
// - All messages were sent.
// - The order of messages are correct.
Set<String> msgIds = new LinkedHashSet<>();
MessageIdImpl previousMsgId = null;
for (CompletableFuture<MessageId> msgId : sendRequests) {
Assert.assertNotNull(msgId.join());
MessageIdImpl messageIdImpl = (MessageIdImpl) msgId.join();
if (previousMsgId != null) {
Assert.assertTrue(messageIdImpl.compareTo(previousMsgId) > 0);
}
msgIds.add(String.format("%s:%s", messageIdImpl.getLedgerId(), messageIdImpl.getEntryId()));
previousMsgId = messageIdImpl;
}
Assert.assertEquals(msgIds.size(), 100);
log.info("messages were sent: {}", msgIds.toString());
List<String> msgsReceived = new ArrayList<>();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscription).subscribe();
while (true) {
Message<String> receivedMsg = consumer.receive(2, TimeUnit.SECONDS);
if (receivedMsg == null) {
break;
}
msgsReceived.add(receivedMsg.getValue());
}
Assert.assertEquals(msgsReceived, msgsSent);

// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName);
}
}

0 comments on commit f7c540e

Please sign in to comment.