Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException #15361

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Apr 28, 2022

Fixes: #14878

Motivation

clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more

Modifications

catch Exception then close the topic and reader

Verifying this change

change test

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@congbobo184 congbobo184 added type/bug The PR fixed a bug or issue reported a bug area/transaction doc-not-needed Your PR changes do not impact docs release/2.9.3 release/2.10.1 labels Apr 28, 2022
@congbobo184 congbobo184 added this to the 2.11.0 milestone Apr 28, 2022
@congbobo184 congbobo184 self-assigned this Apr 28, 2022
@@ -486,7 +486,7 @@ public void testTransactionBufferRecoverThrowPulsarClientException() throws Exce
TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
(TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
// mock reader can't read snapshot fail
doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
doThrow(new Exception("test")).when(reader).hasMoreEvents();

PulsarClientException is not a RuntimeException, If use RuntimeException here, we will lose the previous test coverage.

Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@congbobo184 congbobo184 merged commit 0c58810 into apache:master May 10, 2022
codelipenghui pushed a commit that referenced this pull request May 20, 2022
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 23, 2022
…hrow RuntimeException (apache#15361)

Fixes: apache#14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
(cherry picked from commit 810d707)
mattisonchao pushed a commit that referenced this pull request May 25, 2022
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
@mattisonchao mattisonchao added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label May 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/transaction cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.9.3 release/2.10.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The reader of __transaction_buffer_snapshot did not been closed property
5 participants