Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark2kafka process blank line #1227

Merged
merged 10 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/spark2kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ topic.partitions =
topic.replicas =

#The rest of the topic can be configured parameters.For example: keyA=valueA,keyB=valueB,keyC=valueC...
topic.config =
topic.config =
3 changes: 2 additions & 1 deletion etl/src/main/scala/org/astraea/etl/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ object DataType {
ByteType,
IntegerType,
LongType,
ShortType
ShortType,
TimestampType
)
}
}
22 changes: 12 additions & 10 deletions etl/src/main/scala/org/astraea/etl/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ import scala.collection.JavaConverters._
* Set deployment model, which will be used in
* SparkSession.builder().master(deployment.model).Two settings are currently
* supported spark://HOST:PORT and local[*].
* @param checkpoint
* Spark checkpoint path.
*/
case class Metadata private (
var deployModel: String,
var sourcePath: File,
var sinkPath: File,
var column: Seq[DataColumn],
var kafkaBootstrapServers: String,
var topicName: String,
var numPartitions: Int,
var numReplicas: Short,
var topicConfig: Map[String, String],
var checkpoint: File
deployModel: String,
sourcePath: File,
sinkPath: File,
column: Seq[DataColumn],
kafkaBootstrapServers: String,
topicName: String,
numPartitions: Int,
numReplicas: Short,
topicConfig: Map[String, String],
checkpoint: File
)

object Metadata {
Expand Down
14 changes: 7 additions & 7 deletions etl/src/main/scala/org/astraea/etl/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.astraea.etl

import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.astraea.etl.DataType.StringType
import org.astraea.etl.Reader._
Expand Down Expand Up @@ -70,14 +68,16 @@ class Reader[PassedStep <: BuildStep] private (
def readCSV(
source: String
)(implicit ev: PassedStep =:= FullReader): DataFrameOp = {
var df = createSpark(deploymentModel).readStream
val df = createSpark(deploymentModel).readStream
.option("cleanSource", "archive")
.option("sourceArchiveDir", sinkPath)
.schema(userSchema)
.csv(source)
pk.foreach(str =>
df = df.withColumn(str, new Column(AssertNotNull(col(str).expr)))
)
.filter(row => {
val bool = (0 until row.length).exists(i => !row.isNullAt(i))
bool
})

new DataFrameOp(df)
}

Expand Down
11 changes: 1 addition & 10 deletions etl/src/test/scala/org/astraea/etl/FileCreator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,9 @@ object FileCreator {
): Try[Unit] = {
val str = sourceDir + "/local_kafka" + "-" + int.toString + ".csv"
val fileCSV2 = Files.createFile(new File(str).toPath)
writeCsvFile(fileCSV2.toAbsolutePath.toString, addPrefix(rows))
writeCsvFile(fileCSV2.toAbsolutePath.toString, rows)
}

def addPrefix(lls: List[List[String]]): List[List[String]] =
lls
.foldLeft((1, List.empty[List[String]])) {
case ((serial: Int, acc: List[List[String]]), value: List[String]) =>
(serial + 1, (serial.toString +: value) +: acc)
}
._2
.reverse

def writeCsvFile(
path: String,
rows: List[List[String]]
Expand Down
58 changes: 31 additions & 27 deletions etl/src/test/scala/org/astraea/etl/ReaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.astraea.etl.FileCreator.{createCSV, generateCSVF, getCSVFile, mkdir}
import org.astraea.etl.Reader.createSpark
import org.astraea.it.RequireBrokerCluster
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Test

import java.io._
import java.nio.file.Files
Expand All @@ -35,7 +35,7 @@ import scala.concurrent.duration.Duration
import scala.util.Random

class ReaderTest extends RequireBrokerCluster {
@Test def pkNonNullTest(): Unit = {
@Test def skipBlankLineTest(): Unit = {
val tempPath: String =
System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random
.nextInt()
Expand All @@ -45,13 +45,15 @@ class ReaderTest extends RequireBrokerCluster {
System.getProperty("java.io.tmpdir") + "/createSchemaNullTest" + Random
.nextInt()
mkdir(tempArchivePath)
val dataDir = mkdir(tempPath + "/data")
val checkoutDir = mkdir(tempPath + "/checkout")

val columnOne: List[String] =
List("A1", "B1", "C1", null)
List("A1", "B1", null, "D1")
val columnTwo: List[String] =
List("52", "36", "45", "25")
List("52", "36", null, "25")
val columnThree: List[String] =
List("fghgh", "gjgbn", "fgbhjf", "dfjf")
List("fghgh", "gjgbn", null, "dfjf")

val row = columnOne
.zip(columnTwo.zip(columnThree))
Expand All @@ -64,7 +66,6 @@ class ReaderTest extends RequireBrokerCluster {

val structType = Reader.createSchema(
Map(
"SerialNumber" -> IntegerType,
"RecordNumber" -> StringType,
"Size" -> IntegerType,
"Type" -> StringType
Expand All @@ -81,15 +82,21 @@ class ReaderTest extends RequireBrokerCluster {
.readCSV(new File(tempPath).getPath)
.dataFrame()

assertThrows(
classOf[StreamingQueryException],
() =>
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.start()
.awaitTermination(Duration(5, TimeUnit.SECONDS).toMillis)
)
df.writeStream
.format("csv")
.option("path", dataDir.getPath)
.option("checkpointLocation", checkoutDir.getPath)
.outputMode("append")
.start()
.awaitTermination(Duration(20, TimeUnit.SECONDS).toMillis)

val writeFile = getCSVFile(new File(dataDir.getPath)).head
val br = new BufferedReader(new FileReader(writeFile))

assertEquals(br.readLine, "A1,52,fghgh")
assertEquals(br.readLine, "B1,36,gjgbn")
assertEquals(br.readLine, "D1,25,dfjf")

}

@Test def sparkReadCSVTest(): Unit = {
Expand All @@ -104,20 +111,19 @@ class ReaderTest extends RequireBrokerCluster {

val structType = Reader.createSchema(
Map(
"SerialNumber" -> IntegerType,
"RecordNumber" -> StringType,
"Size" -> IntegerType,
"Type" -> StringType
)
)

assertEquals(structType.length, 4)
assertEquals(structType.length, 3)
val csvDF = Reader
.of()
.spark("local[2]")
.schema(structType)
.sinkPath(sinkDir.getPath)
.primaryKeys(Seq("SerialNumber"))
.primaryKeys(Seq("RecordNumber"))
.readCSV(sourceDir.getPath)
assertTrue(
csvDF.dataFrame().isStreaming,
Expand All @@ -137,10 +143,10 @@ class ReaderTest extends RequireBrokerCluster {
val writeFile = getCSVFile(new File(dataDir.getPath)).head
val br = new BufferedReader(new FileReader(writeFile))

assertEquals(br.readLine, "1,A1,52,fghgh")
assertEquals(br.readLine, "2,B1,36,gjgbn")
assertEquals(br.readLine, "3,C1,45,fgbhjf")
assertEquals(br.readLine, "4,D1,25,dfjf")
assertEquals(br.readLine, "A1,52,fghgh")
assertEquals(br.readLine, "B1,36,gjgbn")
assertEquals(br.readLine, "C1,45,fgbhjf")
assertEquals(br.readLine, "D1,25,dfjf")

Files.exists(
new File(
Expand Down Expand Up @@ -181,7 +187,7 @@ class ReaderTest extends RequireBrokerCluster {
.asScala
.map(row => (row.getAs[String]("key"), row.getAs[String]("value")))
.toMap
println(resultExchange)

assertEquals(1, resultExchange.size)
assertEquals(
"{\"age\":\"29\",\"name\":\"Michael\"}",
Expand Down Expand Up @@ -240,13 +246,12 @@ class ReaderTest extends RequireBrokerCluster {
@Test def jsonToByteTest(): Unit = {
val spark = createSpark("local[2]")

var data = Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
var data = Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
(0 to 10000).iterator.foreach(_ =>
data = data ++ Seq(Row(1, "A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
data = data ++ Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5))
)

val structType = new StructType()
.add("ID", "integer")
.add("name", "string")
.add("age", "integer")
.add("xx", "string")
Expand All @@ -256,7 +261,6 @@ class ReaderTest extends RequireBrokerCluster {
.add("fInt", "integer")

val columns = Seq(
DataColumn("ID", isPK = true, dataType = IntegerType),
DataColumn("name", isPK = false, dataType = StringType),
DataColumn("age", isPK = false, dataType = IntegerType),
DataColumn("xx", isPK = false, dataType = StringType),
Expand Down
34 changes: 13 additions & 21 deletions etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.{
`collection AsScalaIterable`,
`collection asJava`
}
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.concurrent.duration.Duration
import scala.util.Random

Expand Down Expand Up @@ -65,7 +62,7 @@ class Spark2KafkaTest extends RequireBrokerCluster {

val rowData = s2kType(rows)

rowData.forEach(record => assertEquals(records(record._1), record._2))
records.foreach(records => assertEquals(records._2, rowData(records._1)))
}

@Test
Expand Down Expand Up @@ -119,18 +116,16 @@ class Spark2KafkaTest extends RequireBrokerCluster {
.inclusive(0, 3)
.map(i =>
(
s"""{"${colNames(1)}":"${rows(
s"""{"${colNames.head}":"${rows(
i
).head}","${colNames(2)}":"${rows(i)(1)}"}""",
s"""{"${colNames(3)}":"${rows(
).head}","${colNames(1)}":"${rows(i)(1)}"}""",
s"""{"${colNames(2)}":"${rows(
i
)(
2
)}","${colNames(1)}":"${rows(
)}","${colNames.head}":"${rows(
i
).head}","${colNames.head}":"${i + 1}","${colNames(2)}":"${rows(i)(
1
)}"}"""
).head}","${colNames(1)}":"${rows(i)(1)}"}"""
)
)
.toMap
Expand All @@ -143,7 +138,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
private val source: String = tempPath + "/source"
private val sinkD: String = tempPath + "/sink"
private val COL_NAMES =
"ID=integer,FirstName=string,SecondName=string,Age=integer"
"FirstName=string,SecondName=string,Age=integer"

@BeforeAll
def setup(): Unit = {
Expand All @@ -162,10 +157,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
sinkDir.getPath,
checkoutDir.getPath
)
Spark2Kafka.executor(
Array(myPropDir.toString),
20
)
Spark2Kafka.executor(Array(myPropDir.toString), 20)
}

private def writeProperties(
Expand All @@ -192,7 +184,7 @@ object Spark2KafkaTest extends RequireBrokerCluster {
properties.setProperty(SINK_PATH, sinkPath)
properties.setProperty(
COLUMN_NAME,
"ID=integer,FirstName=string,SecondName=string,Age=integer"
"FirstName=string,SecondName=string,Age=integer"
)
properties.setProperty(PRIMARY_KEYS, "FirstName=string,SecondName=string")
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers())
Expand All @@ -209,11 +201,11 @@ object Spark2KafkaTest extends RequireBrokerCluster {

private def rows: List[List[String]] = {
val columnOne: List[String] =
List("Michael", "Andy", "Justin", "LuLu")
List("Michael", "Andy", "Justin", "")
val columnTwo: List[String] =
List("A.K", "B.C", "C.L", "C.C")
List("A.K", "B.C", "C.L", "")
val columnThree: List[String] =
List("29", "30", "19", "18")
List("29", "30", "19", "")

columnOne
.zip(columnTwo.zip(columnThree))
Expand Down