Skip to content

Commit 69393b3

Browse files
committed
[fix] [log] Do not print warn log when concurrently publishing and switching ledgers (#23209)
(cherry picked from commit 0a5cb51)
1 parent a3e7d76 commit 69393b3

File tree

2 files changed

+73
-2
lines changed

2 files changed

+73
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1680,8 +1680,9 @@ void createNewOpAddEntryForNewLedger() {
16801680
if (existsOp.ledger != null) {
16811681
existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
16821682
} else {
1683-
// This scenario should not happen.
1684-
log.warn("[{}] An OpAddEntry's ledger is empty.", name);
1683+
// It may happen when the following operations execute at the same time, so it is expected.
1684+
// - Adding entry.
1685+
// - Switching ledger.
16851686
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
16861687
}
16871688
existsOp.setLedger(currentLedger);

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java

+70
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,19 @@
2020

2121
import com.carrotsearch.hppc.ObjectSet;
2222
import java.time.Duration;
23+
import java.util.ArrayList;
24+
import java.util.LinkedHashSet;
2325
import java.util.List;
26+
import java.util.Set;
27+
import java.util.concurrent.CompletableFuture;
2428
import java.util.concurrent.TimeUnit;
2529
import java.util.concurrent.atomic.AtomicInteger;
2630
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
2732
import org.apache.pulsar.broker.BrokerTestUtil;
2833
import org.apache.pulsar.broker.service.Dispatcher;
34+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
35+
import org.apache.pulsar.client.impl.MessageIdImpl;
2936
import org.apache.pulsar.common.naming.TopicName;
3037
import org.awaitility.Awaitility;
3138
import org.awaitility.reflect.WhiteboxImpl;
@@ -105,4 +112,67 @@ public void testConsumerListMatchesConsumerSet() throws Exception {
105112
// cleanup.
106113
client.close();
107114
}
115+
116+
@Test(timeOut = 30 * 1000)
117+
public void testConcurrentlyOfPublishAndSwitchLedger() throws Exception {
118+
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
119+
final String subscription = "s1";
120+
admin.topics().createNonPartitionedTopic(topicName);
121+
admin.topics().createSubscription(topicName, subscription, MessageId.earliest);
122+
// Make ledger switches faster.
123+
PersistentTopic persistentTopic =
124+
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
125+
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
126+
config.setMaxEntriesPerLedger(2);
127+
config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
128+
// Inject a delay for switching ledgers, so publishing requests will be push in to the pending queue.
129+
AtomicInteger delayTimes = new AtomicInteger();
130+
mockZooKeeper.delay(10, (op, s) -> {
131+
if (op.toString().equals("SET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
132+
return delayTimes.incrementAndGet() == 1;
133+
}
134+
return false;
135+
});
136+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
137+
.create();
138+
List<CompletableFuture<MessageId>> sendRequests = new ArrayList<>();
139+
List<String> msgsSent = new ArrayList<>();
140+
for (int i = 0; i < 100; i++) {
141+
String msg = i + "";
142+
sendRequests.add(producer.sendAsync(i + ""));
143+
msgsSent.add(msg);
144+
}
145+
// Verify:
146+
// - All messages were sent.
147+
// - The order of messages are correct.
148+
Set<String> msgIds = new LinkedHashSet<>();
149+
MessageIdImpl previousMsgId = null;
150+
for (CompletableFuture<MessageId> msgId : sendRequests) {
151+
Assert.assertNotNull(msgId.join());
152+
MessageIdImpl messageIdImpl = (MessageIdImpl) msgId.join();
153+
if (previousMsgId != null) {
154+
Assert.assertTrue(messageIdImpl.compareTo(previousMsgId) > 0);
155+
}
156+
msgIds.add(String.format("%s:%s", messageIdImpl.getLedgerId(), messageIdImpl.getEntryId()));
157+
previousMsgId = messageIdImpl;
158+
}
159+
Assert.assertEquals(msgIds.size(), 100);
160+
log.info("messages were sent: {}", msgIds.toString());
161+
List<String> msgsReceived = new ArrayList<>();
162+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
163+
.subscriptionName(subscription).subscribe();
164+
while (true) {
165+
Message<String> receivedMsg = consumer.receive(2, TimeUnit.SECONDS);
166+
if (receivedMsg == null) {
167+
break;
168+
}
169+
msgsReceived.add(receivedMsg.getValue());
170+
}
171+
Assert.assertEquals(msgsReceived, msgsSent);
172+
173+
// cleanup.
174+
consumer.close();
175+
producer.close();
176+
admin.topics().delete(topicName);
177+
}
108178
}

0 commit comments

Comments
 (0)