Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deprecation fixes #261

Merged
merged 4 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
<avro.version>1.8.2</avro.version>

<!--Tests-->
<scalatestplus.mockito.version>3.1.0.0</scalatestplus.mockito.version>
<scalatest.version>3.1.0</scalatest.version>
<scalatestplus.mockito.version>3.1.2.0</scalatestplus.mockito.version>
<scalatest.version>3.1.4</scalatest.version>
<scalatest.maven.plugin.version>2.0.2</scalatest.maven.plugin.version>

<scala.maven.plugin.version>3.3.2</scala.maven.plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.functions.struct

import java.nio.charset.Charset
import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -53,7 +54,7 @@ object AvroSchemaUtils {
}
val hdfs = FileSystem.get(new Configuration())
val stream = hdfs.open(new Path(path))
try IOUtils.readLines(stream).asScala.mkString("\n") finally stream.close()
try IOUtils.readLines(stream, Charset.defaultCharset).asScala.mkString("\n") finally stream.close()
}

def toAvroSchema(
Expand Down Expand Up @@ -94,7 +95,7 @@ object AvroSchemaUtils {
recordName: String,
nameSpace: String
): Schema =
toAvroSchema(dataFrame, dataFrame.columns, recordName, nameSpace)
toAvroSchema(dataFrame, dataFrame.columns.toIndexedSeq, recordName, nameSpace)

def wrapSchema(schema: Schema, name: String, namespace: String): Schema = {
SchemaBuilder.record(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package za.co.absa.abris.examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.avro.registry.SchemaSubject
import za.co.absa.abris.config.AbrisConfig

import scala.concurrent.duration.DurationInt

object ConfluentKafkaAvroReader {

val kafkaTopicName = "test_topic"
Expand Down Expand Up @@ -56,13 +59,14 @@ object ConfluentKafkaAvroReader {
.usingSchemaRegistry("http://localhost:8081")

import za.co.absa.abris.avro.functions.from_avro
val deserialized = dataFrame.select(from_avro(col("value"), abrisConfig) as 'data)
val deserialized = dataFrame.select(from_avro(col("value"), abrisConfig) as "data")

deserialized.printSchema()

deserialized
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime(5.seconds))
.option("truncate", "false")
.start()
.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.abris.examples

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession}
import za.co.absa.abris.avro.format.SparkAvroConversions
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
Expand Down Expand Up @@ -49,7 +49,7 @@ object ConfluentKafkaAvroWriter {
val schemaString = ComplexRecordsGenerator.usedAvroSchema

// to serialize all columns in dataFrame we need to put them in a spark struct
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
val allColumns = struct(dataFrame.columns.map(col).toIndexedSeq: _*)

val abrisConfig = AbrisConfig
.toConfluentAvro
Expand All @@ -59,7 +59,7 @@ object ConfluentKafkaAvroWriter {

import za.co.absa.abris.avro.functions.to_avro

val avroFrame = dataFrame.select(to_avro(allColumns, abrisConfig) as 'value)
val avroFrame = dataFrame.select(to_avro(allColumns, abrisConfig) as "value")

avroFrame
.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package za.co.absa.abris.avro.format
import org.apache.avro.Schema.Type
import org.apache.avro.SchemaBuilder
import org.apache.spark.sql.types._
import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.examples.data.generation.TestSchemas

import scala.collection.JavaConverters._
import scala.collection._

class SparkAvroConversionsSpec extends FlatSpec with Matchers {
class SparkAvroConversionsSpec extends AnyFlatSpec with Matchers {

// scalastyle:off magic.number

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package za.co.absa.abris.avro.read.confluent

import org.scalatest.{BeforeAndAfterEach, FlatSpec}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.abris.avro.registry.{AbrisRegistryClient, ConfluentMockRegistryClient, ConfluentRegistryClient, TestRegistryClient}
import za.co.absa.abris.config.AbrisConfig

import scala.reflect.runtime.{universe => ru}

class SchemaManagerFactorySpec extends FlatSpec with BeforeAndAfterEach {
class SchemaManagerFactorySpec extends AnyFlatSpec with BeforeAndAfterEach {

private val schemaRegistryConfig1 = Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "http://dummy")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package za.co.absa.abris.avro.read.confluent

import org.scalatest.{BeforeAndAfter, FlatSpec}
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.registry.{ConfluentMockRegistryClient, LatestVersion, NumVersion, SchemaSubject}
import za.co.absa.abris.config.AbrisConfig

class schemaManagerSpec extends FlatSpec with BeforeAndAfter {
class schemaManagerSpec extends AnyFlatSpec with BeforeAndAfter {

private val schema = AvroSchemaUtils.parse(
"{\"type\": \"record\", \"name\": \"Blah\", \"fields\": [{ \"name\": \"name\", \"type\": \"string\" }]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package za.co.absa.abris.avro.registry

import org.scalatest.{BeforeAndAfter, FlatSpec}
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils

class SchemaSubjectSpec extends FlatSpec with BeforeAndAfter {
class SchemaSubjectSpec extends AnyFlatSpec with BeforeAndAfter {

private val schema = AvroSchemaUtils.parse(
"""{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package za.co.absa.abris.avro.schemas

import java.io.File

import org.apache.commons.io.FileUtils
import org.scalatest.FlatSpec
import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.examples.data.generation.TestSchemas

class SchemaLoaderSpec extends FlatSpec {
import java.io.File
import java.nio.charset.Charset

class SchemaLoaderSpec extends AnyFlatSpec {

private val testDir = new File("testDirSchemaLoader")

Expand All @@ -44,7 +45,7 @@ class SchemaLoaderSpec extends FlatSpec {

private def writeIntoFS(schema: String, name: String): File = {
val destination = new File(testDir, name)
FileUtils.write(destination, schema)
FileUtils.write(destination, schema, Charset.defaultCharset)
destination
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.abris.avro.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -48,7 +49,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
"basic.auth.user.info" -> sensitiveData
))

val column = from_avro('avroBytes, fromAvroConfig)
val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.toString() should not include sensitiveData
}

Expand Down
Loading