Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry. More details about Schema Registry on the official website.
We expect that you use it together with native Spark Kafka Reader.
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val utils = ConfluentSparkAvroUtils("http://schema-registry.my-company.com:8081")
val keyDeserializer = utils.deserializerForSubject("topic1-key")
val valueDeserialzer = utils.deserializerForSubject("topic1-value")
df.select(
keyDeserializer(col("key").alias("key")),
valueDeserializer(col("value").alias("value"))
).show(10)
With this same sample code above you can read data encrypted with AES256 with KMS, except it expect encrypted data to use specific format: [magic byte (value 2 or 3) | encrypted aes256 key | encrypted avro data]
The tool is designed to be used with Spark >= 2.0.2.
sbt assembly
ll target/scala-2.11/confluent-spark-avro-assembly-1.0.jar
We haven't added unit tests, but you can test UDFs with the next command:
sbt "project confluent-spark-avro" "run kafka.host:9092 http://schema-registry.host:8081 kafka.topic"
[ ] Spark UDFs to serialize messages.
The project is licensed under the Apache 2 license.