Skip to content

Commit

Permalink
spark2kafka process blank line (#1227)
Browse files Browse the repository at this point in the history
  • Loading branch information
wycccccc authored Dec 15, 2022
1 parent 7562073 commit cfa7c5b
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 77 deletions.
2 changes: 1 addition & 1 deletion config/spark2kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ topic.partitions =
topic.replicas =

#The rest of the topic can be configured parameters.For example: keyA=valueA,keyB=valueB,keyC=valueC...
topic.config =
topic.config =
3 changes: 2 additions & 1 deletion etl/src/main/scala/org/astraea/etl/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ object DataType {
ByteType,
IntegerType,
LongType,
ShortType
ShortType,
TimestampType
)
}
}
22 changes: 12 additions & 10 deletions etl/src/main/scala/org/astraea/etl/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ import scala.collection.JavaConverters._
* Set deployment model, which will be used in
* SparkSession.builder().master(deployment.model).Two settings are currently
* supported spark://HOST:PORT and local[*].
* @param checkpoint
* Spark checkpoint path.
*/
case class Metadata private (
var deployModel: String,
var sourcePath: File,
var sinkPath: File,
var column: Seq[DataColumn],
var kafkaBootstrapServers: String,
var topicName: String,
var numPartitions: Int,
var numReplicas: Short,
var topicConfig: Map[String, String],
var checkpoint: File
deployModel: String,
sourcePath: File,
sinkPath: File,
column: Seq[DataColumn],
kafkaBootstrapServers: String,
topicName: String,
numPartitions: Int,
numReplicas: Short,
topicConfig: Map[String, String],
checkpoint: File
)

object Metadata {
Expand Down
14 changes: 7 additions & 7 deletions etl/src/main/scala/org/astraea/etl/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.astraea.etl

import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.astraea.etl.DataType.StringType
import org.astraea.etl.Reader._
Expand Down Expand Up @@ -70,14 +68,16 @@ class Reader[PassedStep <: BuildStep] private (
def readCSV(
source: String
)(implicit ev: PassedStep =:= FullReader): DataFrameOp = {
var df = createSpark(deploymentModel).readStream
val df = createSpark(deploymentModel).readStream
.option("cleanSource", "archive")
.option("sourceArchiveDir", sinkPath)
.schema(userSchema)
.csv(source)
pk.foreach(str =>
df = df.withColumn(str, new Column(AssertNotNull(col(str).expr)))
)
.filter(row => {
val bool = (0 until row.length).exists(i => !row.isNullAt(i))
bool
})

new DataFrameOp(df)
}

Expand Down
11 changes: 1 addition & 10 deletions etl/src/test/scala/org/astraea/etl/FileCreator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,9 @@ object FileCreator {
): Try[Unit] = {
val str = sourceDir + "/local_kafka" + "-" + int.toString + ".csv"
val fileCSV2 = Files.createFile(new File(str).toPath)
writeCsvFile(fileCSV2.toAbsolutePath.toString, addPrefix(rows))
writeCsvFile(fileCSV2.toAbsolutePath.toString, rows)
}

def addPrefix(lls: List[List[String]]): List[List[String]] =
lls
.foldLeft((1, List.empty[List[String]])) {
case ((serial: Int, acc: List[List[String]]), value: List[String]) =>
(serial + 1, (serial.toString +: value) +: acc)
}
._2
.reverse

def writeCsvFile(
path: String,
rows: List[List[String]]
Expand Down
58 changes: 31 additions & 27 deletions etl/src/test/scala/org/astraea/etl/ReaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.astraea.etl.FileCreator.{createCSV, generateCSVF, getCSVFile, mkdir}
import org.astraea.etl.Reader.createSpark
import org.astraea.it.RequireBrokerCluster
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Test

import java.io._
import java.nio.file.Files
Expand All @@ -35,7 +35,7 @@ import scala.concurrent.duration.Duration
import scala.util.Random

class ReaderTest extends RequireBrokerCluster {
@Test def pkNonNullTest(): Unit = {
@Test def skipBlankLineTest(): Unit = {
val tempPath: String =
System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random
.nextInt()
Expand All @@ -45,13 +45,15 @@ class ReaderTest extends RequireBrokerCluster {
System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random
.nextInt()
mkdir(tempArchivePath)
val dataDir = mkdir(tempPath + "/data")
val checkoutDir = mkdir(tempPath + "/checkout")

val columnOne: List[String] =
List("A1", "B1", "C1", null)
List("A1", "B1", null, "D1")
val columnTwo: List[String] =
List("52", "36", "45", "25")
List("52", "36", null, "25")
val columnThree: List[String] =
List("fghgh", "gjgbn", "fgbhjf", "dfjf")
List("fghgh", "gjgbn", null, "dfjf")

val row = columnOne
.zip(columnTwo.zip(columnThree))
Expand All @@ -64,7 +66,6 @@ class ReaderTest extends RequireBrokerCluster {

val structType = Reader.createSchema(
Map(
"SerialNumber" -> IntegerType,
"RecordNumber" -> StringType,
"Size" -> IntegerType,
"Type" -> StringType
Expand All @@ -81,15 +82,21 @@ class ReaderTest extends RequireBrokerCluster {
.readCSV(new File(tempPath).getPath)
.dataFrame()

assertThrows(
classOf[StreamingQueryException],
() =>
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.start()
.awaitTermination(Duration(5, TimeUnit.SECONDS).toMillis)
)
df.writeStream
.format("csv")
.option("path", dataDir.getPath)
.option("checkpointLocation", checkoutDir.getPath)
.outputMode("append")
.start()
.awaitTermination(Duration(20, TimeUnit.SECONDS).toMillis)

val writeFile = getCSVFile(new File(dataDir.getPath)).head
val br = new BufferedReader(new FileReader(writeFile))

assertEquals(br.readLine, "A1,52,fghgh")
assertEquals(br.readLine, "B1,36,gjgbn")
assertEquals(br.readLine, "D1,25,dfjf")

}

@Test def sparkReadCSVTest(): Unit = {
Expand All @@ -104,20 +111,19 @@ class ReaderTest extends RequireBrokerCluster {

val structType = Reader.createSchema(
Map(
"SerialNumber" -> IntegerType,
"RecordNumber" -> StringType,
"Size" -> IntegerType,
"Type" -> StringType
)
)

assertEquals(structType.length, 4)
assertEquals(structType.length, 3)
val csvDF = Reader
.of()
.spark("local[2]")
.schema(structType)
.sinkPath(sinkDir.getPath)
.primaryKeys(Seq("SerialNumber"))
.primaryKeys(Seq("RecordNumber"))
.readCSV(sourceDir.getPath)
assertTrue(
csvDF.dataFrame().isStreaming,
Expand All @@ -137,10 +143,10 @@ class ReaderTest extends RequireBrokerCluster {
val writeFile = getCSVFile(new File(dataDir.getPath)).head
val br = new BufferedReader(new FileReader(writeFile))

assertEquals(br.readLine, "1,A1,52,fghgh")
assertEquals(br.readLine, "2,B1,36,gjgbn")
assertEquals(br.readLine, "3,C1,45,fgbhjf")
assertEquals(br.readLine, "4,D1,25,dfjf")
assertEquals(br.readLine, "A1,52,fghgh")
assertEquals(br.readLine, "B1,36,gjgbn")
assertEquals(br.readLine, "C1,45,fgbhjf")
assertEquals(br.readLine, "D1,25,dfjf")

Files.exists(
new File(
Expand Down Expand Up @@ -181,7 +187,7 @@ class ReaderTest extends RequireBrokerCluster {
.asScala
.map(row => (row.getAs[String]("key"), row.getAs[String]("value")))
.toMap
println(resultExchange)

assertEquals(1, resultExchange.size)
assertEquals(
"{\"age\":\"29\",\"name\":\"Michael\"}",
Expand Down Expand Up @@ -240,13 +246,12 @@ class ReaderTest extends RequireBrokerCluster {
@Test def jsonToByteTest(): Unit = {
val spark = createSpark("local[2]")

var data = Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
var data = Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
(0 to 10000).iterator.foreach(_ =>
data = data ++ Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
data = data ++ Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
)

val structType = new StructType()
.add("ID", "integer")
.add("name", "string")
.add("age", "integer")
.add("xx", "string")
Expand All @@ -256,7 +261,6 @@ class ReaderTest extends RequireBrokerCluster {
.add("fInt", "integer")

val columns = Seq(
DataColumn("ID", isPK = true, dataType = IntegerType),
DataColumn("name", isPK = false, dataType = StringType),
DataColumn("age", isPK = false, dataType = IntegerType),
DataColumn("xx", isPK = false, dataType = StringType),
Expand Down
34 changes: 13 additions & 21 deletions etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.{
`collection AsScalaIterable`,
`collection asJava`
}
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.concurrent.duration.Duration
import scala.util.Random

Expand Down Expand Up @@ -65,7 +62,7 @@ class Spark2KafkaTest extends RequireBrokerCluster {

val rowData = s2kType(rows)

rowData.forEach(record => assertEquals(records(record._1), record._2))
records.foreach(records => assertEquals(records._2, rowData(records._1)))
}

@Test
Expand Down Expand Up @@ -119,18 +116,16 @@ class Spark2KafkaTest extends RequireBrokerCluster {
.inclusive(0, 3)
.map(i =>
(
s"""{"${colNames(1)}":"${rows(
s"""{"${colNames.head}":"${rows(
i
).head}","${colNames(2)}":"${rows(i)(1)}"}""",
s"""{"${colNames(3)}":"${rows(
).head}","${colNames(1)}":"${rows(i)(1)}"}""",
s"""{"${colNames(2)}":"${rows(
i
)(
2
)}","${colNames(1)}":"${rows(
)}","${colNames.head}":"${rows(
i
).head}","${colNames.head}":"${i + 1}","${colNames(2)}":"${rows(i)(
1
)}"}"""
).head}","${colNames(1)}":"${rows(i)(1)}"}"""
)
)
.toMap
Expand All @@ -143,7 +138,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
private val source: String = tempPath + "/source"
private val sinkD: String = tempPath + "/sink"
private val COL_NAMES =
"ID=integer,FirstName=string,SecondName=string,Age=integer"
"FirstName=string,SecondName=string,Age=integer"

@BeforeAll
def setup(): Unit = {
Expand All @@ -162,10 +157,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
sinkDir.getPath,
checkoutDir.getPath
)
Spark2Kafka.executor(
Array(myPropDir.toString),
20
)
Spark2Kafka.executor(Array(myPropDir.toString), 20)
}

private def writeProperties(
Expand All @@ -192,7 +184,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
properties.setProperty(SINK_PATH, sinkPath)
properties.setProperty(
COLUMN_NAME,
"ID=integer,FirstName=string,SecondName=string,Age=integer"
"FirstName=string,SecondName=string,Age=integer"
)
properties.setProperty(PRIMARY_KEYS, "FirstName=string,SecondName=string")
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers())
Expand All @@ -209,11 +201,11 @@ object Spark2KafkaTest extends RequireBrokerCluster {

private def rows: List[List[String]] = {
val columnOne: List[String] =
List("Michael", "Andy", "Justin", "LuLu")
List("Michael", "Andy", "Justin", "")
val columnTwo: List[String] =
List("A.K", "B.C", "C.L", "C.C")
List("A.K", "B.C", "C.L", "")
val columnThree: List[String] =
List("29", "30", "19", "18")
List("29", "30", "19", "")

columnOne
.zip(columnTwo.zip(columnThree))
Expand Down

0 comments on commit cfa7c5b

Please sign in to comment.