Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The reader of __transaction_buffer_snapshot did not been closed property #14878

Closed
codelipenghui opened this issue Mar 25, 2022 · 2 comments · Fixed by #15361
Closed

The reader of __transaction_buffer_snapshot did not been closed property #14878

codelipenghui opened this issue Mar 25, 2022 · 2 comments · Fixed by #15361
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@codelipenghui
Copy link
Contributor

codelipenghui commented Mar 25, 2022

Describe the bug

"subscriptions" : {
    "__compaction" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 3068776,
      "msgOutCounter" : 10729,
      "msgRateRedeliver" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 96,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 96,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1648218295485,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "replicated" : false,
      "durable" : true
    },
    "reader-52dbece9b6" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 4768,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 4768,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "activeConsumerName" : "fa67a",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "fa67a",
        "availablePermits" : 0,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "metadata" : { },
        "connectedSince" : "2022-03-25T22:13:36.114886+08:00",
        "clientVersion" : "2.9.2",
        "address" : "/127.0.0.1:51161"
      } ],
      "isDurable" : false,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "replicated" : false,
      "durable" : false
    }
  }

From the broker logs:

~/Downloads/geo-cluster (branch-0.8.1*) » cat cluster-a*/logs/* | grep -v "Handle read response:" | grep "52dbece9b6"                                                                                      lipenghui@lipenghuideMacBook-Pro-2
22:13:04.659 [pulsar-io-16-2] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:51161] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-52dbece9b6
22:13:35.732 [ForkJoinPool.commonPool-worker-31] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/__transaction_buffer_snapshot][reader-52dbece9b6] Creating non-durable subscription at msg id -1:-1:-1:-1
22:13:36.248 [ForkJoinPool.commonPool-worker-31] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/__transaction_buffer_snapshot][reader-52dbece9b6] Created new subscription for 10
22:13:36.248 [ForkJoinPool.commonPool-worker-31] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:51161] Created subscription on topic persistent://public/default/__transaction_buffer_snapshot / reader-52dbece9b6
22:12:52.577 [pulsar-io-16-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-52dbece9b6] Subscribing to topic on cnx [id: 0x7ecdb373, L:/127.0.0.1:51161 - R:/127.0.0.1:6652], consumerId 10
22:13:28.365 [pulsar-io-16-18] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-52dbece9b6] Failed to subscribe to topic on /127.0.0.1:6652

Looks like related to the 22:13:28.365 [pulsar-io-16-18] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-52dbece9b6] Failed to subscribe to topic on /127.0.0.1:6652

Because after checking the normal reader cleanup in the broker logs

22:20:06.971 [pulsar-io-16-2] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:51161] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-c065e64d8a
22:20:07.001 [ForkJoinPool.commonPool-worker-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Creating non-durable subscription at msg id -1:-1:-1:-1
22:20:07.002 [ForkJoinPool.commonPool-worker-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Created new subscription for 82
22:20:07.003 [ForkJoinPool.commonPool-worker-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:51161] Created subscription on topic persistent://public/default/__transaction_buffer_snapshot / reader-c065e64d8a
22:20:07.378 [pulsar-io-16-2] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/__transaction_buffer_snapshot, name=reader-c065e64d8a}, consumerId=82, consumerName=9a4b0, address=/127.0.0.1:51161}
22:20:07.378 [pulsar-io-16-2] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Successfully closed subscription [NonDurableCursorImpl{ledger=public/default/persistent/__transaction_buffer_snapshot, ackPos=49:4007, readPos=49:4008}]
22:20:07.379 [pulsar-io-16-2] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Successfully closed dispatcher for reader
22:20:06.970 [pulsar-io-16-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Subscribing to topic on cnx [id: 0x7ecdb373, L:/127.0.0.1:51161 - R:/127.0.0.1:6652], consumerId 82
22:20:07.016 [pulsar-io-16-17] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Subscribed to topic on /127.0.0.1:6652 -- consumer: 82
22:20:07.086 [pulsar-transaction-executor-13-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Get topic last message Id
22:20:07.089 [pulsar-io-16-17] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Successfully getLastMessageId 49:4007
22:20:07.373 [pulsar-transaction-executor-13-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Get topic last message Id
22:20:07.378 [pulsar-io-16-17] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][reader-c065e64d8a] Successfully getLastMessageId 49:4007
22:20:07.379 [pulsar-io-16-17] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot] [reader-c065e64d8a] Closed consumer

To Reproduce
Don't have a stable way to reproduce the issue.

Expected behavior
The reader should be closed property to avoid reader instance leak

Screenshots

Additional context
last branch-2.9 f0a2171

@mattisonchao
Copy link
Member

Hi, @codelipenghui
Could you offer a full log to help find the root cause?
Thanks~

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

congbobo184 added a commit that referenced this issue May 10, 2022
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader
codelipenghui pushed a commit that referenced this issue May 20, 2022
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
nicoloboschi pushed a commit to datastax/pulsar that referenced this issue May 23, 2022
…hrow RuntimeException (apache#15361)

Fixes: apache#14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
(cherry picked from commit 810d707)
mattisonchao pushed a commit that referenced this issue May 25, 2022
…hrow RuntimeException (#15361)

Fixes: #14878

### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic

When reader decode the Snapshot will throw RuntimeException not PulsarClientException

We should catch the Exception then close the reader and topic

```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
	at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
	... 8 more
```

### Modifications
catch Exception then close the topic and reader

(cherry picked from commit 0c58810)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
2 participants