diff --git a/config/spark2kafka.properties b/config/spark2kafka.properties index e2387f3096..780ec52cfd 100644 --- a/config/spark2kafka.properties +++ b/config/spark2kafka.properties @@ -34,4 +34,4 @@ topic.partitions = topic.replicas = #The rest of the topic can be configured parameters.For example: keyA=valueA,keyB=valueB,keyC=valueC... -topic.config = \ No newline at end of file +topic.config = diff --git a/etl/src/main/scala/org/astraea/etl/DataType.scala b/etl/src/main/scala/org/astraea/etl/DataType.scala index 1ce23c04fc..8e100242e7 100644 --- a/etl/src/main/scala/org/astraea/etl/DataType.scala +++ b/etl/src/main/scala/org/astraea/etl/DataType.scala @@ -80,7 +80,8 @@ object DataType { ByteType, IntegerType, LongType, - ShortType + ShortType, + TimestampType ) } } diff --git a/etl/src/main/scala/org/astraea/etl/Metadata.scala b/etl/src/main/scala/org/astraea/etl/Metadata.scala index 0eb7b75dc2..2ec97f1754 100644 --- a/etl/src/main/scala/org/astraea/etl/Metadata.scala +++ b/etl/src/main/scala/org/astraea/etl/Metadata.scala @@ -46,18 +46,20 @@ import scala.collection.JavaConverters._ * Set deployment model, which will be used in * SparkSession.builder().master(deployment.model).Two settings are currently * supported spark://HOST:PORT and local[*]. + * @param checkpoint + * Spark checkpoint path. */ case class Metadata private ( - var deployModel: String, - var sourcePath: File, - var sinkPath: File, - var column: Seq[DataColumn], - var kafkaBootstrapServers: String, - var topicName: String, - var numPartitions: Int, - var numReplicas: Short, - var topicConfig: Map[String, String], - var checkpoint: File + deployModel: String, + sourcePath: File, + sinkPath: File, + column: Seq[DataColumn], + kafkaBootstrapServers: String, + topicName: String, + numPartitions: Int, + numReplicas: Short, + topicConfig: Map[String, String], + checkpoint: File ) object Metadata { diff --git a/etl/src/main/scala/org/astraea/etl/Reader.scala b/etl/src/main/scala/org/astraea/etl/Reader.scala index 431c52a92d..1038a6f219 100644 --- a/etl/src/main/scala/org/astraea/etl/Reader.scala +++ b/etl/src/main/scala/org/astraea/etl/Reader.scala @@ -16,9 +16,7 @@ */ package org.astraea.etl -import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.astraea.etl.DataType.StringType import org.astraea.etl.Reader._ @@ -70,14 +68,16 @@ class Reader[PassedStep <: BuildStep] private ( def readCSV( source: String )(implicit ev: PassedStep =:= FullReader): DataFrameOp = { - var df = createSpark(deploymentModel).readStream + val df = createSpark(deploymentModel).readStream .option("cleanSource", "archive") .option("sourceArchiveDir", sinkPath) .schema(userSchema) .csv(source) - pk.foreach(str => - df = df.withColumn(str, new Column(AssertNotNull(col(str).expr))) - ) + .filter(row => { + val bool = (0 until row.length).exists(i => !row.isNullAt(i)) + bool + }) + new DataFrameOp(df) } diff --git a/etl/src/test/scala/org/astraea/etl/FileCreator.scala b/etl/src/test/scala/org/astraea/etl/FileCreator.scala index bfdc5ba74e..3051493484 100644 --- a/etl/src/test/scala/org/astraea/etl/FileCreator.scala +++ b/etl/src/test/scala/org/astraea/etl/FileCreator.scala @@ -48,18 +48,9 @@ object FileCreator { ): Try[Unit] = { val str = sourceDir + "/local_kafka" + "-" + int.toString + ".csv" val fileCSV2 = Files.createFile(new File(str).toPath) - writeCsvFile(fileCSV2.toAbsolutePath.toString, addPrefix(rows)) + writeCsvFile(fileCSV2.toAbsolutePath.toString, rows) } - def addPrefix(lls: List[List[String]]): List[List[String]] = - lls - .foldLeft((1, List.empty[List[String]])) { - case ((serial: Int, acc: List[List[String]]), value: List[String]) => - (serial + 1, (serial.toString +: value) +: acc) - } - ._2 - .reverse - def writeCsvFile( path: String, rows: List[List[String]] diff --git a/etl/src/test/scala/org/astraea/etl/ReaderTest.scala b/etl/src/test/scala/org/astraea/etl/ReaderTest.scala index 853e32810a..3cb49bf839 100644 --- a/etl/src/test/scala/org/astraea/etl/ReaderTest.scala +++ b/etl/src/test/scala/org/astraea/etl/ReaderTest.scala @@ -25,7 +25,7 @@ import org.astraea.etl.FileCreator.{createCSV, generateCSVF, getCSVFile, mkdir} import org.astraea.etl.Reader.createSpark import org.astraea.it.RequireBrokerCluster import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.api.{Disabled, Test} +import org.junit.jupiter.api.Test import java.io._ import java.nio.file.Files @@ -35,7 +35,7 @@ import scala.concurrent.duration.Duration import scala.util.Random class ReaderTest extends RequireBrokerCluster { - @Test def pkNonNullTest(): Unit = { + @Test def skipBlankLineTest(): Unit = { val tempPath: String = System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random .nextInt() @@ -45,13 +45,15 @@ class ReaderTest extends RequireBrokerCluster { System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random .nextInt() mkdir(tempArchivePath) + val dataDir = mkdir(tempPath + "/data") + val checkoutDir = mkdir(tempPath + "/checkout") val columnOne: List[String] = - List("A1", "B1", "C1", null) + List("A1", "B1", null, "D1") val columnTwo: List[String] = - List("52", "36", "45", "25") + List("52", "36", null, "25") val columnThree: List[String] = - List("fghgh", "gjgbn", "fgbhjf", "dfjf") + List("fghgh", "gjgbn", null, "dfjf") val row = columnOne .zip(columnTwo.zip(columnThree)) @@ -64,7 +66,6 @@ class ReaderTest extends RequireBrokerCluster { val structType = Reader.createSchema( Map( - "SerialNumber" -> IntegerType, "RecordNumber" -> StringType, "Size" -> IntegerType, "Type" -> StringType @@ -81,15 +82,21 @@ class ReaderTest extends RequireBrokerCluster { .readCSV(new File(tempPath).getPath) .dataFrame() - assertThrows( - classOf[StreamingQueryException], - () => - df.writeStream - .outputMode(OutputMode.Append()) - .format("console") - .start() - .awaitTermination(Duration(5, TimeUnit.SECONDS).toMillis) - ) + df.writeStream + .format("csv") + .option("path", dataDir.getPath) + .option("checkpointLocation", checkoutDir.getPath) + .outputMode("append") + .start() + .awaitTermination(Duration(20, TimeUnit.SECONDS).toMillis) + + val writeFile = getCSVFile(new File(dataDir.getPath)).head + val br = new BufferedReader(new FileReader(writeFile)) + + assertEquals(br.readLine, "A1,52,fghgh") + assertEquals(br.readLine, "B1,36,gjgbn") + assertEquals(br.readLine, "D1,25,dfjf") + } @Test def sparkReadCSVTest(): Unit = { @@ -104,20 +111,19 @@ class ReaderTest extends RequireBrokerCluster { val structType = Reader.createSchema( Map( - "SerialNumber" -> IntegerType, "RecordNumber" -> StringType, "Size" -> IntegerType, "Type" -> StringType ) ) - assertEquals(structType.length, 4) + assertEquals(structType.length, 3) val csvDF = Reader .of() .spark("local[2]") .schema(structType) .sinkPath(sinkDir.getPath) - .primaryKeys(Seq("SerialNumber")) + .primaryKeys(Seq("RecordNumber")) .readCSV(sourceDir.getPath) assertTrue( csvDF.dataFrame().isStreaming, @@ -137,10 +143,10 @@ class ReaderTest extends RequireBrokerCluster { val writeFile = getCSVFile(new File(dataDir.getPath)).head val br = new BufferedReader(new FileReader(writeFile)) - assertEquals(br.readLine, "1,A1,52,fghgh") - assertEquals(br.readLine, "2,B1,36,gjgbn") - assertEquals(br.readLine, "3,C1,45,fgbhjf") - assertEquals(br.readLine, "4,D1,25,dfjf") + assertEquals(br.readLine, "A1,52,fghgh") + assertEquals(br.readLine, "B1,36,gjgbn") + assertEquals(br.readLine, "C1,45,fgbhjf") + assertEquals(br.readLine, "D1,25,dfjf") Files.exists( new File( @@ -181,7 +187,7 @@ class ReaderTest extends RequireBrokerCluster { .asScala .map(row => (row.getAs[String]("key"), row.getAs[String]("value"))) .toMap - println(resultExchange) + assertEquals(1, resultExchange.size) assertEquals( "{\"age\":\"29\",\"name\":\"Michael\"}", @@ -240,13 +246,12 @@ class ReaderTest extends RequireBrokerCluster { @Test def jsonToByteTest(): Unit = { val spark = createSpark("local[2]") - var data = Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) + var data = Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) (0 to 10000).iterator.foreach(_ => - data = data ++ Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) + data = data ++ Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) ) val structType = new StructType() - .add("ID", "integer") .add("name", "string") .add("age", "integer") .add("xx", "string") @@ -256,7 +261,6 @@ class ReaderTest extends RequireBrokerCluster { .add("fInt", "integer") val columns = Seq( - DataColumn("ID", isPK = true, dataType = IntegerType), DataColumn("name", isPK = false, dataType = StringType), DataColumn("age", isPK = false, dataType = IntegerType), DataColumn("xx", isPK = false, dataType = StringType), diff --git a/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala b/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala index bd3ffd029d..aaf7096078 100644 --- a/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala +++ b/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala @@ -32,10 +32,7 @@ import java.util import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import scala.collection.convert.ImplicitConversions.{ - `collection AsScalaIterable`, - `collection asJava` -} +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import scala.concurrent.duration.Duration import scala.util.Random @@ -65,7 +62,7 @@ class Spark2KafkaTest extends RequireBrokerCluster { val rowData = s2kType(rows) - rowData.forEach(record => assertEquals(records(record._1), record._2)) + records.foreach(records => assertEquals(records._2, rowData(records._1))) } @Test @@ -119,18 +116,16 @@ class Spark2KafkaTest extends RequireBrokerCluster { .inclusive(0, 3) .map(i => ( - s"""{"${colNames(1)}":"${rows( + s"""{"${colNames.head}":"${rows( i - ).head}","${colNames(2)}":"${rows(i)(1)}"}""", - s"""{"${colNames(3)}":"${rows( + ).head}","${colNames(1)}":"${rows(i)(1)}"}""", + s"""{"${colNames(2)}":"${rows( i )( 2 - )}","${colNames(1)}":"${rows( + )}","${colNames.head}":"${rows( i - ).head}","${colNames.head}":"${i + 1}","${colNames(2)}":"${rows(i)( - 1 - )}"}""" + ).head}","${colNames(1)}":"${rows(i)(1)}"}""" ) ) .toMap @@ -143,7 +138,7 @@ object Spark2KafkaTest extends RequireBrokerCluster { private val source: String = tempPath + "/source" private val sinkD: String = tempPath + "/sink" private val COL_NAMES = - "ID=integer,FirstName=string,SecondName=string,Age=integer" + "FirstName=string,SecondName=string,Age=integer" @BeforeAll def setup(): Unit = { @@ -162,10 +157,7 @@ object Spark2KafkaTest extends RequireBrokerCluster { sinkDir.getPath, checkoutDir.getPath ) - Spark2Kafka.executor( - Array(myPropDir.toString), - 20 - ) + Spark2Kafka.executor(Array(myPropDir.toString), 20) } private def writeProperties( @@ -192,7 +184,7 @@ object Spark2KafkaTest extends RequireBrokerCluster { properties.setProperty(SINK_PATH, sinkPath) properties.setProperty( COLUMN_NAME, - "ID=integer,FirstName=string,SecondName=string,Age=integer" + "FirstName=string,SecondName=string,Age=integer" ) properties.setProperty(PRIMARY_KEYS, "FirstName=string,SecondName=string") properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers()) @@ -209,11 +201,11 @@ object Spark2KafkaTest extends RequireBrokerCluster { private def rows: List[List[String]] = { val columnOne: List[String] = - List("Michael", "Andy", "Justin", "LuLu") + List("Michael", "Andy", "Justin", "") val columnTwo: List[String] = - List("A.K", "B.C", "C.L", "C.C") + List("A.K", "B.C", "C.L", "") val columnThree: List[String] = - List("29", "30", "19", "18") + List("29", "30", "19", "") columnOne .zip(columnTwo.zip(columnThree))