Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerBuilderImpl#subscribe stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 #21971

Closed
2 tasks done
Shawyeok opened this issue Jan 25, 2024 · 0 comments · Fixed by #21985
Closed
2 tasks done
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@Shawyeok
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

In summary, jackson-annotations:2.12.0 or later is now required for pulsar-client 3.0.x, and this also applies to versions 3.1.x and 3.2.x.

My colleague experienced an issue where the caller thread was blocked at the ConsumerBuilder#subscribe method. Below is the stack trace:

"main" #1 prio=5 os_prio=0 cpu=18400.66ms elapsed=241.20s tid=0x00007f0fd0017800 nid=0x10 waiting on condition  [0x00007f0fd7f17000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.2/Native Method)
	- parking to wait for  <0x00000000f10134d8> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.2/LockSupport.java:194)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.2/CompletableFuture.java:1796)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.2/ForkJoinPool.java:3128)
	at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.2/CompletableFuture.java:1823)
	at java.util.concurrent.CompletableFuture.get(java.base@11.0.2/CompletableFuture.java:1998)
	at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)

The worst part is the absence of any error message; the main thread simply gets blocked indefinitely. After investigating, I identified the root cause mentioned above.

A full example to reproduce this issue can be found here: https://github.com/Shawyeok/pulsarClientSubscribeStuck

Below is the missing stack trace for ClassNotFoundException

Stacktrace
java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonKey
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.hasAsKey(JacksonAnnotationIntrospector.java:1129)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasAsKey(AnnotationIntrospectorPair.java:619)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:501)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:426)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:272)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:258)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:391)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1501)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1449)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:550)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:828)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:308)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1572)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1273)
	at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter.writeValueAsString(ObjectWriter.java:1140)
	at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.init(ConsumerStatsRecorderImpl.java:113)
	at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.(ConsumerStatsRecorderImpl.java:105)
	at org.apache.pulsar.client.impl.ConsumerImpl.(ConsumerImpl.java:294)
	at org.apache.pulsar.client.impl.ConsumerImpl.newConsumerImpl(ConsumerImpl.java:252)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createInternalConsumer(MultiTopicsConsumerImpl.java:1123)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$doSubscribeTopicPartitions$43(MultiTopicsConsumerImpl.java:1044)
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doSubscribeTopicPartitions(MultiTopicsConsumerImpl.java:1056)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$subscribeTopicPartitions$42(MultiTopicsConsumerImpl.java:1001)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeTopicPartitions(MultiTopicsConsumerImpl.java:999)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeAsync(MultiTopicsConsumerImpl.java:991)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$createPartitionedConsumer$39(MultiTopicsConsumerImpl.java:957)
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createPartitionedConsumer(MultiTopicsConsumerImpl.java:957)
	at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doSingleTopicSubscribeAsync$5(PulsarClientImpl.java:527)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$getPartitionedTopicMetadata$8(BinaryProtoLookupService.java:239)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:669)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:144)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.pulsar.shade.io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonKey
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 85 more

Solution

The ClassNotFoundException is thrown from the doSubscribeTopicPartitions method call, which is nested inside a whenComplete block. As a result, subscribeResult never completes. The relevant code can be found here:

private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> {
if (null == cause) {
doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}

To address this issue, we could modify the whenComplete to a combination of thenAccept and exceptionally. Here's how it could be rewritten:"

client.preProcessSchemaBeforeSubscribe(client, schema, topicName)
      .thenAccept(schema -> {
          doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist);
      }).exceptionally(cause -> {
          subscribeResult.completeExceptionally(cause);
      });

Alternatives

No response

Anything else?

Here are related discussions:

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
2 participants