Skip to content

Commit

Permalink
[fix][txn] Topic transaction buffer recover don't close reader when t…
Browse files Browse the repository at this point in the history
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
  • Loading branch information
congbobo184 authored and mattisonchao committed May 25, 2022
1 parent 6bf9b31 commit 6fd31d1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,10 @@ public void run() {
callBack.noNeedToRecover();
return;
}
} catch (PulsarClientException pulsarClientException) {
log.error("[{}]Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
callBack.recoverExceptionally(pulsarClientException);
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
callBack.recoverExceptionally(ex);
closeReader(reader);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,


@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
@Cleanup
Producer<byte[]> producer = pulsarClient
Expand Down Expand Up @@ -491,7 +491,14 @@ public void testTransactionBufferRecoverThrowPulsarClientException() throws Exce
field.setAccessible(true);
TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
(TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
// mock reader can't read snapshot fail
// mock reader can't read snapshot fail throw RuntimeException
doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
transactionBufferSnapshotService, originalTopic, field, producer);
doReturn(true).when(reader).hasMoreEvents();

// mock reader can't read snapshot fail throw PulsarClientException
doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
Expand Down

0 comments on commit 6fd31d1

Please sign in to comment.