Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SparkConnector] Add proper time column support for Spark connector segment writer #14556

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,29 @@ class PinotDataWriter[InternalRow](
private[pinot] val savePath: String = writeOptions.savePath
private[pinot] val bufferedRecordReader: PinotBufferedRecordReader = new PinotBufferedRecordReader()

private val timeColumnName = writeOptions.timeColumnName
private val timeColumnIndex = if (timeColumnName != null) writeSchema.fieldIndex(timeColumnName) else -1

private var isTimeColumnNumeric = false
if (timeColumnIndex > -1) {
isTimeColumnNumeric = writeSchema.fields(timeColumnIndex).dataType match {
case org.apache.spark.sql.types.IntegerType => true
case org.apache.spark.sql.types.LongType => true
case _ => false
}
}
private var startTime = Long.MaxValue
private var endTime = 0L

override def write(record: catalyst.InternalRow): Unit = {
bufferedRecordReader.write(internalRowToGenericRow(record))

// Tracking startTime and endTime for segment name generation purposes
if (timeColumnIndex > -1 && isTimeColumnNumeric) {
val time = record.getLong(timeColumnIndex)
startTime = Math.min(startTime, time)
endTime = Math.max(endTime, time)
}
}

override def commit(): WriterCommitMessage = {
Expand All @@ -77,7 +98,10 @@ class PinotDataWriter[InternalRow](
/** This method is used to generate the segment name based on the format
* provided in the write options (segmentNameFormat).
* The format can contain variables like {partitionId}.
* Currently supported variables are `partitionId`, `table`
* Currently supported variables are `partitionId`, `table`, `startTime` and `endTime`
*
* `startTime` and `endTime` are the minimum and maximum values of the time column in the records
* and it is only available if the time column is numeric.
*
* It also supports the following, python inspired format specifier for digit formatting:
* `{partitionId:05}`
Expand All @@ -88,12 +112,15 @@ class PinotDataWriter[InternalRow](
* "{partitionId:05}_{table}" -> "00012_airlineStats"
* "{table}_{partitionId}" -> "airlineStats_12"
* "{table}_20240805" -> "airlineStats_20240805"
* "{table}_{startTime}_{endTime}_{partitionId:03}" -> "airlineStats_1234567890_1234567891_012"
*/
private[pinot] def getSegmentName: String = {
val format = writeOptions.segmentNameFormat
val variables = Map(
"partitionId" -> partitionId,
"table" -> tableName,
"startTime" -> startTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can also update javadocs to show the new variables for segment name generation. But do call out that it only works for numeric time columns.

"endTime" -> endTime,
)

val pattern = Pattern.compile("\\{(\\w+)(?::(\\d+))?}")
Expand Down Expand Up @@ -145,11 +172,14 @@ class PinotDataWriter[InternalRow](
indexingConfig: IndexingConfig,
outputDir: File,
): SegmentGeneratorConfig = {
val segmentsValidationAndRetentionConfig = new SegmentsValidationAndRetentionConfig()
segmentsValidationAndRetentionConfig.setTimeColumnName(timeColumnName)

// Mostly dummy tableConfig, sufficient for segment generation purposes
val tableConfig = new TableConfig(
tableName,
"OFFLINE",
new SegmentsValidationAndRetentionConfig(),
segmentsValidationAndRetentionConfig,
new TenantConfig(null, null, null),
indexingConfig,
new TableCustomConfig(null),
Expand Down Expand Up @@ -192,6 +222,8 @@ class PinotDataWriter[InternalRow](
gr.putValue(field.name, record.getBoolean(idx))
case org.apache.spark.sql.types.ByteType =>
gr.putValue(field.name, record.getByte(idx))
case org.apache.spark.sql.types.BinaryType =>
gr.putValue(field.name, record.getBinary(idx))
case org.apache.spark.sql.types.ShortType =>
gr.putValue(field.name, record.getShort(idx))
case org.apache.spark.sql.types.ArrayType(elementType, _) =>
Expand All @@ -210,6 +242,8 @@ class PinotDataWriter[InternalRow](
gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Boolean]))
case org.apache.spark.sql.types.ByteType =>
gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Byte]))
case org.apache.spark.sql.types.BinaryType =>
gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Array[Byte]]))
case org.apache.spark.sql.types.ShortType =>
gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Short]))
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class PinotWrite(
) extends Write with BatchWrite {
private[pinot] val writeOptions: PinotDataSourceWriteOptions = PinotDataSourceWriteOptions.from(logicalWriteInfo.options())
private[pinot] val writeSchema: StructType = logicalWriteInfo.schema()
private[pinot] val pinotSchema: Schema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
private[pinot] val pinotSchema: Schema = SparkToPinotTypeTranslator.translate(
writeSchema, writeOptions.tableName, writeOptions.timeColumnName, writeOptions.timeFormat, writeOptions.timeGranularity)

override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): DataWriterFactory = {
// capture the values to allow lambda serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,32 @@
*/
package org.apache.pinot.connector.spark.v3.datasource

import org.apache.pinot.spi.data.FieldSpec
import org.apache.pinot.spi.data.Schema
import org.apache.pinot.spi.data.{FieldSpec, Schema}
import org.apache.pinot.spi.data.Schema.SchemaBuilder
import org.apache.spark.sql.types._


object SparkToPinotTypeTranslator {
// TODO: incorporate time column
def translate(sparkSchema: StructType, tableName: String): Schema = {
def translate(sparkSchema: StructType,
tableName: String,
timeColumn: String,
timeFormat: String,
timeGranularity: String): Schema = {
val schemaBuilder = new SchemaBuilder
schemaBuilder.setSchemaName(tableName)
for (field <- sparkSchema.fields) {
val fieldName = field.name
val sparkType = field.dataType
val pinotType = translateType(sparkType)

if (pinotType != null) {
if (sparkType.isInstanceOf[ArrayType]) {
schemaBuilder.addMultiValueDimension(fieldName, pinotType)
} else {
schemaBuilder.addSingleValueDimension(fieldName, pinotType)
(fieldName, sparkType) match {
case (`timeColumn`, _) =>
schemaBuilder.addDateTime(fieldName, pinotType, timeFormat, timeGranularity);
case (_, _: ArrayType) =>
schemaBuilder.addMultiValueDimension(fieldName, pinotType)
case _ =>
schemaBuilder.addSingleValueDimension(fieldName, pinotType)
}
}
else throw new UnsupportedOperationException("Unsupported data type: " + sparkType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ object ExampleSparkPinotConnectorWriteTest extends Logging {
.option("noDictionaryColumns", "airport,state")
.option("bloomFilterColumns", "airport")
.option("timeColumnName", "ts")
.option("timeFormat", "EPOCH|SECONDS")
.option("timeGranularity", "1:SECONDS")
.save("myPath")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource

import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, BinaryType}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.scalatest.matchers.should.Matchers
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -55,6 +55,8 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter
tableName = "testTable",
savePath = "/tmp/pinot",
timeColumnName = "ts",
timeFormat = "EPOCH|SECONDS",
timeGranularity = "1:SECONDS",
segmentNameFormat = "{table}_{partitionId:03}",
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
Expand All @@ -63,14 +65,18 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
StructField("age", IntegerType, nullable = false),
StructField("ts", LongType, nullable = false),
StructField("bin", BinaryType, nullable = false),
))

val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
val pinotSchema = SparkToPinotTypeTranslator.translate(
writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema)

val record1 = new TestInternalRow(Array[Any]("Alice", 30))
val record2 = new TestInternalRow(Array[Any]("Bob", 25))
val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, "Alice".getBytes))
val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, "Bob".getBytes))

writer.write(record1)
writer.write(record2)
Expand All @@ -92,34 +98,42 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter
tableName = "testTable",
savePath = tmpDir.getAbsolutePath,
timeColumnName = "ts",
segmentNameFormat = "{table}_{partitionId:03}",
timeFormat = "EPOCH|SECONDS",
timeGranularity = "1:SECONDS",
segmentNameFormat = "{table}_{startTime}_{endTime}_{partitionId:03}",
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
bloomFilterColumns = Array("name"),
rangeIndexColumns = Array()
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
StructField("age", IntegerType, nullable = false),
StructField("ts", LongType, nullable = false),
StructField("bin", BinaryType, nullable = false),
))
val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
val pinotSchema = SparkToPinotTypeTranslator.translate(
writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema)
val record1 = new TestInternalRow(Array[Any]("Alice", 30))
val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, "Alice".getBytes))
writer.write(record1)
val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, "Bob".getBytes))
writer.write(record2)

val commitMessage: WriterCommitMessage = writer.commit()
commitMessage shouldBe a[SuccessWriterCommitMessage]

// Verify that the segment is created and stored in the target location
val fs = FileSystem.get(new URI(writeOptions.savePath), new org.apache.hadoop.conf.Configuration())
val segmentPath = new Path(writeOptions.savePath + "/testTable_000.tar.gz")
val segmentPath = new Path(writeOptions.savePath + "/testTable_1234567890_1234567891_000.tar.gz")
fs.exists(segmentPath) shouldBe true

// Verify the contents of the segment tar file
TarCompressionUtils.untar(
new File(writeOptions.savePath + "/testTable_000.tar.gz"),
new File(writeOptions.savePath + "/testTable_1234567890_1234567891_000.tar.gz"),
new File(writeOptions.savePath))
val untarDir = Paths.get(writeOptions.savePath + "/testTable_000/v3/")
val untarDir = Paths.get(writeOptions.savePath + "/testTable_1234567890_1234567891_000/v3/")
Files.exists(untarDir) shouldBe true

val segmentFiles = Files.list(untarDir).toArray.map(_.toString)
Expand All @@ -132,34 +146,46 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter
val metadataSrc = Source.fromFile(untarDir + "/metadata.properties")
val metadataContent = metadataSrc.getLines.mkString("\n")
metadataSrc.close()
metadataContent should include ("segment.name = testTable_000")

metadataContent should include ("segment.name = testTable_1234567890_1234567891_000")
metadataContent should include ("segment.time.column.name = ts")
metadataContent should include ("segment.start.time = 1234567890")
metadataContent should include ("segment.end.time = 1234567891")
}

test("getSegmentName should format segment name correctly with custom format") {
val testCases = Seq(
("{table}_{partitionId}", "airlineStats_12"),
("{partitionId:05}_{table}", "00012_airlineStats"),
("{table}_20240805", "airlineStats_20240805")
("{table}_20240805", "airlineStats_20240805"),
("{table}_{startTime}_{endTime}_{partitionId:03}", "airlineStats_1234567890_1234567891_012"),
)

testCases.foreach { case (format, expected) =>
val writeOptions = PinotDataSourceWriteOptions(
tableName = "airlineStats",
savePath = "/tmp/pinot",
timeColumnName = "ts",
timeFormat = "EPOCH|SECONDS",
timeGranularity = "1:SECONDS",
segmentNameFormat = format,
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
bloomFilterColumns = Array("name"),
rangeIndexColumns = Array()
rangeIndexColumns = Array(),
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
StructField("age", IntegerType, nullable = false),
StructField("ts", LongType, nullable = false),
))

val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
val pinotSchema = SparkToPinotTypeTranslator.translate(
writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](12, 0, writeOptions, writeSchema, pinotSchema)
writer.write(new TestInternalRow(Array[Any]("Alice", 30, 1234567890L)))
writer.write(new TestInternalRow(Array[Any]("Bob", 25, 1234567891L)))

val segmentName = writer.getSegmentName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource

import org.apache.pinot.spi.data.Schema
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
Expand All @@ -35,6 +35,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
"segmentNameFormat" -> "my_segment_format",
"path" -> "/path/to/save",
"timeColumnName" -> "timeCol",
"timeFormat" -> "EPOCH|SECONDS",
"timeGranularity" -> "1:SECONDS",
"invertedIndexColumns" -> "col1,col2",
"noDictionaryColumns" -> "col3,col4",
"bloomFilterColumns" -> "col5,col6",
Expand All @@ -43,7 +45,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {

val schema = StructType(Seq(
StructField("col1", StringType),
StructField("col2", StringType)
StructField("col2", StringType),
StructField("timeCol", LongType),
))

val logicalWriteInfo = new TestLogicalWriteInfo(new CaseInsensitiveStringMap(options.asJava), schema)
Expand All @@ -54,6 +57,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
pinotWrite.writeOptions.segmentNameFormat shouldEqual "my_segment_format"
pinotWrite.writeOptions.savePath shouldEqual "/path/to/save"
pinotWrite.writeOptions.timeColumnName shouldEqual "timeCol"
pinotWrite.writeOptions.timeFormat shouldEqual "EPOCH|SECONDS"
pinotWrite.writeOptions.timeGranularity shouldEqual "1:SECONDS"
pinotWrite.writeOptions.invertedIndexColumns shouldEqual Array("col1", "col2")
pinotWrite.writeOptions.noDictionaryColumns shouldEqual Array("col3", "col4")
pinotWrite.writeOptions.bloomFilterColumns shouldEqual Array("col5", "col6")
Expand All @@ -68,7 +73,15 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
| "dimensionFieldSpecs": [
| {"name": "col1", "dataType": "STRING"},
| {"name": "col2", "dataType": "STRING"}
| ]
| ],
| "dateTimeFieldSpecs" : [ {
| "name" : "timeCol",
| "dataType" : "LONG",
| "fieldType" : "DATE_TIME",
| "notNull" : false,
| "format" : "EPOCH|SECONDS",
| "granularity" : "1:SECONDS"
| } ]
|}
|""".stripMargin)
pinotWrite.pinotSchema shouldEqual expectedPinotSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.connector.spark.v3.datasource

import org.apache.pinot.spi.data.FieldSpec
import org.apache.pinot.spi.data.FieldSpec.FieldType
import org.apache.spark.sql.types._
import org.scalatest.funsuite.AnyFunSuite

Expand All @@ -40,11 +41,25 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
for ((sparkType, expectedPinotType) <- typeMappings) {
val fieldName = s"${sparkType.simpleString}Field"
val sparkSchema = StructType(Array(StructField(fieldName, sparkType)))
val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table")
val pinotSchema = SparkToPinotTypeTranslator.translate(
sparkSchema, "table", null, null, null)
assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == expectedPinotType)
}
}

test("Translate time column") {
val sparkSchema = StructType(Array(StructField("timeField", LongType)))
val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table", "timeField",
"EPOCH|SECONDS", "1:SECONDS")

val dateTimeField = pinotSchema.getDateTimeSpec("timeField")

assert(dateTimeField != null)
assert(dateTimeField.getFieldType == FieldType.DATE_TIME)
assert(dateTimeField.getFormat == "EPOCH|SECONDS")
assert(dateTimeField.getGranularity == "1:SECONDS")
}

test("Translate multi value data types") {
val arrayTypeMappings = List(
(ArrayType(StringType), FieldSpec.DataType.STRING),
Expand All @@ -61,7 +76,8 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
for ((sparkArrayType, expectedPinotType) <- arrayTypeMappings) {
val fieldName = s"${sparkArrayType.simpleString}Field"
val sparkSchema = StructType(Array(StructField(fieldName, sparkArrayType)))
val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table")
val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table",
null, null, null)
assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == expectedPinotType)
assert(!pinotSchema.getFieldSpecFor(fieldName).isSingleValueField)
}
Expand Down
Loading
Loading