Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 1565147

Browse files
authored
fix the expired PersistentTopic has a closed managedLedger that will throw a ManagedLedgerException (#862)
Fixes #861 ### Motivation See #861 for the reason for the investigation. Since the namespacebundle has a listener, it can be guaranteed that the topic corresponding to tcm can be removed from the cache after being unloaded and loaded. But for topic, it does not seem reasonable to create a listener for close, because for the server, there will be a large number of topics . One more thing, pulsar currently does not have a listener for the close method of a single topic. And there is no need to implement such a listener.
1 parent c31e946 commit 1565147

File tree

3 files changed

+137
-80
lines changed

3 files changed

+137
-80
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,15 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
221221
return CompletableFuture.completedFuture(null);
222222
}
223223
final ManagedLedger ledger = topic.getManagedLedger();
224+
225+
if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
226+
log.error("[{}] Async get cursor for offset {} failed, because current managedLedger has been closed",
227+
requestHandler.ctx.channel(), offset);
228+
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
229+
future.completeExceptionally(new Exception("Current managedLedger has been closed."));
230+
return future;
231+
}
232+
224233
return ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).thenApply(position -> {
225234
final String cursorName = "kop-consumer-cursor-" + topic.getName()
226235
+ "-" + position.getLedgerId() + "-" + position.getEntryId()

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java

+92-80
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,36 @@ public void handleFetch() {
290290
});
291291
}
292292

293+
private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos) {
294+
statsLogger.getPrepareMetadataStats().registerFailedEvent(
295+
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
296+
}
297+
298+
private boolean checkOffsetOutOfRange(KafkaTopicConsumerManager tcm,
299+
long offset,
300+
TopicPartition topicPartition,
301+
long startPrepareMetadataNanos) {
302+
// handle offset out-of-range exception
303+
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger();
304+
long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger);
305+
// TODO: Offset out-of-range checks are still incomplete
306+
// We only check the case of `offset > logEndOffset` and `offset < LogStartOffset`
307+
// is currently not handled.
308+
// Because we found that the operation of obtaining `LogStartOffset`
309+
// requires reading from disk,
310+
// and such a time-consuming operation is likely to harm the performance of FETCH request.
311+
// More discussions please refer to https://github.com/streamnative/kop/pull/531
312+
if (offset > logEndOffset) {
313+
log.error("Received request for offset {} for partition {}, "
314+
+ "but we only have entries less than {}.",
315+
offset, topicPartition, logEndOffset);
316+
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
317+
addErrorPartitionResponse(topicPartition, Errors.OFFSET_OUT_OF_RANGE);
318+
return true;
319+
}
320+
return false;
321+
}
322+
293323
private void handlePartitionData(final TopicPartition topicPartition,
294324
final FetchRequest.PartitionData partitionData,
295325
final String fullTopicName,
@@ -300,91 +330,73 @@ private void handlePartitionData(final TopicPartition topicPartition,
300330
// the future that is returned by getTopicConsumerManager is always completed normally
301331
topicManager.getTopicConsumerManager(fullTopicName).thenAccept(tcm -> {
302332
if (tcm == null) {
303-
statsLogger.getPrepareMetadataStats().registerFailedEvent(
304-
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
333+
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
305334
// remove null future cache
306335
KafkaTopicConsumerManagerCache.getInstance().removeAndCloseByTopic(fullTopicName);
307336
addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
308-
return;
309-
}
310-
311-
// handle offset out-of-range exception
312-
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger();
313-
long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger);
314-
// TODO: Offset out-of-range checks are still incomplete
315-
// We only check the case of `offset > logEndOffset` and `offset < LogStartOffset`
316-
// is currently not handled.
317-
// Because we found that the operation of obtaining `LogStartOffset`
318-
// requires reading from disk,
319-
// and such a time-consuming operation is likely to harm the performance of FETCH request.
320-
// More discussions please refer to https://github.com/streamnative/kop/pull/531
321-
if (offset > logEndOffset) {
322-
log.error("Received request for offset {} for partition {}, "
323-
+ "but we only have entries less than {}.",
324-
offset, topicPartition, logEndOffset);
325-
addErrorPartitionResponse(topicPartition, Errors.OFFSET_OUT_OF_RANGE);
326-
return;
327-
}
328-
329-
if (log.isDebugEnabled()) {
330-
log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .",
331-
topicPartition, offset);
332-
}
333-
334-
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture =
335-
tcm.removeCursorFuture(offset);
336-
if (cursorFuture == null) {
337-
// tcm is closed, just return a NONE error because the channel may be still active
338-
log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}",
339-
requestHandler.ctx, fullTopicName);
340-
KafkaTopicConsumerManagerCache.getInstance().removeAndCloseByTopic(fullTopicName);
341-
addErrorPartitionResponse(topicPartition, Errors.NONE);
342-
return;
343-
}
344-
345-
// cursorFuture is never completed exceptionally because ManagedLedgerImpl#asyncFindPosition
346-
// is never completed exceptionally.
347-
cursorFuture.thenAccept(cursorLongPair -> {
348-
if (cursorLongPair == null) {
349-
log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. "
350-
+ "Fetch for topic return error.",
351-
offset, topicPartition);
352-
addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
353-
return;
337+
} else if (!checkOffsetOutOfRange(tcm, offset, topicPartition, startPrepareMetadataNanos)) {
338+
if (log.isDebugEnabled()) {
339+
log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .",
340+
topicPartition, offset);
354341
}
355342

356-
final ManagedCursor cursor = cursorLongPair.getLeft();
357-
final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight());
358-
359-
statsLogger.getPrepareMetadataStats().registerSuccessfulEvent(
360-
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
361-
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
362-
readEntries(cursor, topicPartition, cursorOffset, adjustedMaxBytes)
363-
.whenComplete((entries, throwable) -> {
364-
if (throwable != null) {
365-
tcm.deleteOneCursorAsync(cursorLongPair.getLeft(),
366-
"cursor.readEntry fail. deleteCursor");
367-
addErrorPartitionResponse(topicPartition, Errors.forException(throwable));
368-
return;
369-
}
370-
if (entries == null) {
371-
addErrorPartitionResponse(topicPartition,
372-
Errors.forException(new ApiException("Cursor is null")));
373-
return;
374-
}
375-
long readSize = entries.stream().mapToLong(Entry::getLength).sum();
376-
limitBytes.addAndGet(-1 * readSize);
377-
handleEntries(
378-
entries,
379-
topicPartition,
380-
partitionData,
381-
fullTopicName,
382-
tcm,
383-
cursor,
384-
cursorOffset,
385-
readCommitted);
386-
});
387-
});
343+
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture =
344+
tcm.removeCursorFuture(offset);
345+
if (cursorFuture == null) {
346+
// tcm is closed, just return a NONE error because the channel may be still active
347+
log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}",
348+
requestHandler.ctx, fullTopicName);
349+
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
350+
KafkaTopicConsumerManagerCache.getInstance().removeAndCloseByTopic(fullTopicName);
351+
addErrorPartitionResponse(topicPartition, Errors.NONE);
352+
} else {
353+
cursorFuture.whenComplete((cursorLongPair, ex) -> {
354+
if (ex != null) {
355+
log.error("KafkaTopicConsumerManager.asyncGetCursorByOffset({}) failed for topic {}.",
356+
offset, topicPartition, ex.getCause());
357+
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
358+
KafkaTopicConsumerManagerCache.getInstance().removeAndCloseByTopic(fullTopicName);
359+
addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
360+
} else if (cursorLongPair == null) {
361+
log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. "
362+
+ "Fetch for topic return error.",
363+
offset, topicPartition);
364+
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
365+
addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
366+
} else {
367+
final ManagedCursor cursor = cursorLongPair.getLeft();
368+
final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight());
369+
370+
statsLogger.getPrepareMetadataStats().registerSuccessfulEvent(
371+
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
372+
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
373+
readEntries(cursor, topicPartition, cursorOffset, adjustedMaxBytes)
374+
.whenComplete((entries, throwable) -> {
375+
if (throwable != null) {
376+
tcm.deleteOneCursorAsync(cursorLongPair.getLeft(),
377+
"cursor.readEntry fail. deleteCursor");
378+
addErrorPartitionResponse(topicPartition, Errors.forException(throwable));
379+
} else if (entries == null) {
380+
addErrorPartitionResponse(topicPartition,
381+
Errors.forException(new ApiException("Cursor is null")));
382+
} else {
383+
long readSize = entries.stream().mapToLong(Entry::getLength).sum();
384+
limitBytes.addAndGet(-1 * readSize);
385+
handleEntries(
386+
entries,
387+
topicPartition,
388+
partitionData,
389+
fullTopicName,
390+
tcm,
391+
cursor,
392+
cursorOffset,
393+
readCommitted);
394+
}
395+
});
396+
}
397+
});
398+
}
399+
}
388400
});
389401
}
390402

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.testng.Assert.assertNull;
2323
import static org.testng.Assert.assertSame;
2424
import static org.testng.Assert.assertTrue;
25+
import static org.testng.Assert.fail;
2526

2627
import io.netty.channel.Channel;
2728
import io.netty.channel.ChannelHandlerContext;
@@ -36,6 +37,7 @@
3637
import java.util.Properties;
3738
import java.util.concurrent.CompletableFuture;
3839
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.ExecutionException;
3941
import java.util.concurrent.ExecutorService;
4042
import java.util.concurrent.Executors;
4143
import java.util.concurrent.TimeUnit;
@@ -506,4 +508,38 @@ public void testTopicManagerClose() throws Exception {
506508
assertTrue(originalTcmList.get(0).isClosed());
507509
assertTrue(originalTcmList.get(1).isClosed());
508510
}
511+
512+
@Test(timeOut = 20000)
513+
public void testUnloadTopic() throws Exception {
514+
final String topic = "test-unload-topic";
515+
final String fullTopicName = "persistent://public/default/" + topic + "-partition-0";
516+
final int numPartitions = 1;
517+
admin.topics().createPartitionedTopic(topic, numPartitions);
518+
519+
final int totalMessages = 5;
520+
@Cleanup
521+
final KafkaProducer<String, String> producer = new KafkaProducer<>(newKafkaProducerProperties());
522+
int numMessages = 0;
523+
while (numMessages < totalMessages) {
524+
producer.send(new ProducerRecord<>(topic, null, "test-value" + numMessages)).get();
525+
numMessages++;
526+
}
527+
528+
// We first get KafkaTopicConsumerManager, and then unload topic,
529+
// so that KafkaTopicConsumerManager will become invalid
530+
CompletableFuture<KafkaTopicConsumerManager> tcm = kafkaTopicManager.getTopicConsumerManager(fullTopicName);
531+
KafkaTopicConsumerManager topicConsumerManager = tcm.get();
532+
// unload topic
533+
admin.topics().unload(fullTopicName);
534+
535+
// This proves that ManagedLedger has been closed
536+
// and that the newly added code has taken effect.
537+
try {
538+
topicConsumerManager.removeCursorFuture(totalMessages - 1).get();
539+
fail("should have failed");
540+
} catch (ExecutionException ex) {
541+
assertTrue(ex.getCause().getMessage().contains("Current managedLedger has been closed."));
542+
}
543+
544+
}
509545
}

0 commit comments

Comments
 (0)