Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compiling Avro as part of other build causes kafka (de)serialisation failures #31650

Closed
briancullen opened this issue Mar 7, 2023 · 5 comments
Labels
area/kafka area/kotlin kind/bug Something isn't working triage/invalid This doesn't seem right

Comments

@briancullen
Copy link

Describe the bug

We are currently using quarkus 2.4.1.Final for this project and are trying to upgrade to the latest version but we are having issues with the changes that have occurred with how AVRO is handled.

We are using gradle with a number of subprojects. As these services defined by these subprojects tend to communicate over kafka we separated out the avro (along with some related Java classes) to it's own subproject which the consumers and producers then add as an implementation dependency. Unfortunately when we try this on newer version of quarkus, specifically those that print an error message requiring us to import the quarkus-confluent-registry-avro extension things break.This seems to be after Quarkus 2.8

Expected behavior

If the avro files have been compiled as part of a separate build that is included as an implementation dependency then you should be able to send and receive messages over kafka using these generated classes.

Actual behavior

The error we are encountering is different from the one I managed to create in a reproducer. My original error (and the one I can still get in the real repository) is as follows. Note I have changed the package of the Transaction event package.

       io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    	[Exception 0] java.lang.ClassCastException: class org.acme.kafka.TransactionEvent cannot be cast to class org.apache.avro.specific.SpecificRecord (org.acme.kafka.TransactionEvent is in unnamed module of loader 'app'; org.apache.avro.specific.SpecificRecord is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4642b71d)
    	[Exception 1] it.kahoot.exception.KafkaDeserializationFailure: Unable to deserialise message on kahoot.marketplace.transactions.v1
    	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:94)
    	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.dispatch(UniOnFailureFlatMap.java:83)
    	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.onFailure(UniOnFailureFlatMap.java:60)
    	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:31)
    	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    	at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap.subscribe(UniOnFailureFlatMap.java:31)
    	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    	at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:60)
    	at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
    	at io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46)
    	at io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.decorateDeserialization(DeserializationFailureHandler.java:93)
    	at io.smallrye.reactive.messaging.kafka.EventDeserializationFailureConfiguration_ProducerMethod_marketplaceUserPurchasesEventFailureHandler_28c0809e750e7edf6bbc6fa9604cf28a96f673a3_ClientProxy.decorateDeserialization(Unknown Source)
    	at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.wrapDeserialize(DeserializerWrapper.java:94)
    	at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.deserialize(DeserializerWrapper.java:74)
    	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1439)
    	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135)
    	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671)
    	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507)
    	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733)
    	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    	at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:141)
    	at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:108)
    	at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
    	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
    	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    	at java.base/java.lang.Thread.run(Thread.java:833)
    	Suppressed: it.kahoot.exception.KafkaDeserializationFailure: Unable to deserialise message on kahoot.marketplace.transactions.v1
    		at it.kahoot.marketplace.event.deserialization.EventDeserializationFailureHandler.handleDeserializationFailure(EventDeserializationHandlers.kt:41)
    		at io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.lambda$decorateDeserialization$0(DeserializationFailureHandler.java:90)
    		at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
    		at io.smallrye.mutiny.groups.UniOnFailure.lambda$recoverWithItem$8(UniOnFailure.java:190)
    		at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:92)
    		... 34 more
    	Caused by: java.lang.ClassCastException: class org.acme.kafka.TransactionEvent cannot be cast to class org.apache.avro.specific.SpecificRecord (org.acme.kafka.TransactionEvent is in unnamed module of loader 'app'; org.apache.avro.specific.SpecificRecord is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4642b71d)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:250)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:227)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:182)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:347)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:100)
    		at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
    		at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    		at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    		at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.lambda$deserialize$1(DeserializerWrapper.java:74)
    		at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
    		at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
    		... 31 more
    Caused by: [CIRCULAR REFERENCE: java.lang.ClassCastException: class org.acme.kafka.TransactionEvent cannot be cast to class org.apache.avro.specific.SpecificRecord (org.acme.kafka.TransactionEvent is in unnamed module of loader 'app'; org.apache.avro.specific.SpecificRecord is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4642b71d)]

The reproducer gives the error below and although it is different it is still related to the (de)serialisation process when the only difference seems to be the classes being compiled in a separate subproject.

2023-03-07 10:25:50,213 ERROR [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18260: Unable to recover from the serialization failure (topic: movies), configure a SerializationFailureHandler to recover from errors.: java.lang.RuntimeException: com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Movie","namespace":"org.acme.kafka.quarkus","fields":[{"name":"title","type":{"type":"string","avro.java.string":"String"}},{"name":"year","type":"int"}]} (through reference chain: org.acme.kafka.quarkus.Movie["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
	at io.quarkus.kafka.client.serialization.ObjectMapperSerializer.serialize(ObjectMapperSerializer.java:47)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.lambda$serialize$1(SerializerWrapper.java:56)
	at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.wrapSerialize(SerializerWrapper.java:81)
	at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.serialize(SerializerWrapper.java:56)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:952)
	at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.lambda$send$4(ReactiveKafkaProducer.java:150)
	at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
	at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:47)
	at io.smallrye.mutiny.operators.uni.UniMemoizeOp.subscribe(UniMemoizeOp.java:73)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Movie","namespace":"org.acme.kafka.quarkus","fields":[{"name":"title","type":{"type":"string","avro.java.string":"String"}},{"name":"year","type":"int"}]} (through reference chain: org.acme.kafka.quarkus.Movie["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
	at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:782)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:733)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:774)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4624)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3828)
	at io.quarkus.kafka.client.serialization.ObjectMapperSerializer.serialize(ObjectMapperSerializer.java:44)
	... 19 more
Caused by: org.apache.avro.AvroRuntimeException: Not an array: {"type":"record","name":"Movie","namespace":"org.acme.kafka.quarkus","fields":[{"name":"title","type":{"type":"string","avro.java.string":"String"}},{"name":"year","type":"int"}]}
	at org.apache.avro.Schema.getElementType(Schema.java:370)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:689)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:774)
	... 28 more

How to Reproduce?

https://github.com/briancullen/quakus-avro-sample

Steps to Reproduce:

  1. Check out one of the working branches (either confluent/working or apicurio/working).
  2. Start the consumer with quarkusDev
  3. Start the producer with quarkusDev
  4. Execute srcipts/run_consumer.sh in one terminal
  5. Execute scripts/emit_movies.sh
  6. Check the consumer to see the messages have arrived successfully.
  7. Switch to one of the failing branches (either confluent/failing or apicurio/failing).
  8. Do a clean build and remove any unstaged files.
  9. Repeat steps 2 - 5
  10. Check the logs for the producer and there should be errors.

Output of uname -a or ver

Linux host 5.19.0-35-generic #36~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Fri Feb 17 15:17:25 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

openjdk version "17.0.2" 2022-01-18
OpenJDK Runtime Environment (build 17.0.2+8-86)
OpenJDK 64-Bit Server VM (build 17.0.2+8-86, mixed mode, sharing)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.8

Build tool (ie. output of mvnw --version or gradlew --version)

------------------------------------------------------------
Gradle 7.2
------------------------------------------------------------

Build time:   2021-08-17 09:59:03 UTC
Revision:     a773786b58bb28710e3dc96c4d1a7063628952ad

Kotlin:       1.5.21
Groovy:       3.0.8
Ant:          Apache Ant(TM) version 1.10.9 compiled on September 27 2020
JVM:          17.0.2 (Oracle Corporation 17.0.2+8-86)
OS:           Linux 5.19.0-35-generic amd64

Additional information

No response

@briancullen briancullen added the kind/bug Something isn't working label Mar 7, 2023
@quarkus-bot
Copy link

quarkus-bot bot commented Mar 7, 2023

/cc @alesj (kafka), @cescoffier (kafka), @evanchooly (kotlin), @geoand (kotlin), @ozangunalp (kafka)

@briancullen
Copy link
Author

I notice that this got labelled as Kotlin... yes, one of the stack traces was from a Kotlin service but we are also having the same issue with Java services so it doesn't appear that language is an important issue.

@joschne
Copy link

joschne commented Sep 2, 2023

I have the same issue. @briancullen did you find a solution?
BTW: This repo https://github.com/briancullen/quakus-avro-sample has only 1 branch (master). Maybe you forgot to push the other branches (confluent/working, etc.)?

@ozangunalp
Copy link
Contributor

The second stack trace happens because producer and consumer apps can't discover that the payload type is an Avro generated type, and fallback to using an auto-generated json serde. That is because the shared-avro module doesn't include Jandex index: https://quarkus.io/guides/gradle-tooling#multi-module-gradle. The easiest way to fix this is to add an empty src/main/resources/META-INF/beans.xml file to the shared-avro module. This way Quarkus will discover that it is an Avro type and that the Confluent Avro serde is present in the classpath, so it'll auto-configure the Kafka serializer/deserializer.

I can not reproduce the first stacktrace, I reckon this happens because of an incompatible confluent serde dependency.
I've a working config pushed here: https://github.com/ozangunalp/quarkus-multi-module-avro-sample

@joschne unless you still experience an issue I am going to close this one.

Thanks!

@cescoffier
Copy link
Member

See last comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka area/kotlin kind/bug Something isn't working triage/invalid This doesn't seem right
Projects
None yet
Development

No branches or pull requests

4 participants