From dd0db02e53a9675cadda51057019720727ed4db1 Mon Sep 17 00:00:00 2001 From: Caner Balci Date: Wed, 20 Nov 2024 13:53:17 -0800 Subject: [PATCH 1/4] [SparkConnector] Add proper time column support for Spark connector segment writer --- .../spark/v3/datasource/PinotDataWriter.scala | 32 +++++++++- .../spark/v3/datasource/PinotWrite.scala | 3 +- .../SparkToPinotTypeTranslator.scala | 22 ++++--- .../ExampleSparkPinotConnectorWriteTest.scala | 2 + .../v3/datasource/PinotDataWriterTest.scala | 60 +++++++++++++------ .../SparkToPinotTypeTranslatorTest.scala | 20 ++++++- .../common/PinotDataSourceWriteOptions.scala | 16 +++++ .../PinotDataSourceWriteOptionsTest.scala | 4 ++ 8 files changed, 130 insertions(+), 29 deletions(-) diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala index 5a6a2a5ae38c..551100794be4 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala @@ -62,8 +62,29 @@ class PinotDataWriter[InternalRow]( private[pinot] val savePath: String = writeOptions.savePath private[pinot] val bufferedRecordReader: PinotBufferedRecordReader = new PinotBufferedRecordReader() + private val timeColumnName = writeOptions.timeColumnName + private val timeColumnIndex = if (timeColumnName != null) writeSchema.fieldIndex(timeColumnName) else -1 + + private var isTimeColumnNumeric = false + if (timeColumnIndex > -1) { + isTimeColumnNumeric = writeSchema.fields(timeColumnIndex).dataType match { + case org.apache.spark.sql.types.IntegerType => true + case org.apache.spark.sql.types.LongType => true + case _ => false + } + } + private var startTime = Long.MaxValue + private var endTime = 0L + override def write(record: catalyst.InternalRow): Unit = { bufferedRecordReader.write(internalRowToGenericRow(record)) + + // Tracking startTime and endTime for segment name generation purposes + if (timeColumnIndex > -1 && isTimeColumnNumeric) { + val time = record.getLong(timeColumnIndex) + startTime = Math.min(startTime, time) + endTime = Math.max(endTime, time) + } } override def commit(): WriterCommitMessage = { @@ -94,6 +115,8 @@ class PinotDataWriter[InternalRow]( val variables = Map( "partitionId" -> partitionId, "table" -> tableName, + "startTime" -> startTime, + "endTime" -> endTime, ) val pattern = Pattern.compile("\\{(\\w+)(?::(\\d+))?}") @@ -145,11 +168,14 @@ class PinotDataWriter[InternalRow]( indexingConfig: IndexingConfig, outputDir: File, ): SegmentGeneratorConfig = { + val segmentsValidationAndRetentionConfig = new SegmentsValidationAndRetentionConfig() + segmentsValidationAndRetentionConfig.setTimeColumnName(timeColumnName) + // Mostly dummy tableConfig, sufficient for segment generation purposes val tableConfig = new TableConfig( tableName, "OFFLINE", - new SegmentsValidationAndRetentionConfig(), + segmentsValidationAndRetentionConfig, new TenantConfig(null, null, null), indexingConfig, new TableCustomConfig(null), @@ -192,6 +218,8 @@ class PinotDataWriter[InternalRow]( gr.putValue(field.name, record.getBoolean(idx)) case org.apache.spark.sql.types.ByteType => gr.putValue(field.name, record.getByte(idx)) + case org.apache.spark.sql.types.BinaryType => + gr.putValue(field.name, record.getBinary(idx)) case org.apache.spark.sql.types.ShortType => gr.putValue(field.name, record.getShort(idx)) case org.apache.spark.sql.types.ArrayType(elementType, _) => @@ -210,6 +238,8 @@ class PinotDataWriter[InternalRow]( gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Boolean])) case org.apache.spark.sql.types.ByteType => gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Byte])) + case org.apache.spark.sql.types.BinaryType => + gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Array[Byte]])) case org.apache.spark.sql.types.ShortType => gr.putValue(field.name, record.getArray(idx).array.map(_.asInstanceOf[Short])) case _ => diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala index 0c0e44168604..678da35f5ec9 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala @@ -28,7 +28,8 @@ class PinotWrite( ) extends Write with BatchWrite { private[pinot] val writeOptions: PinotDataSourceWriteOptions = PinotDataSourceWriteOptions.from(logicalWriteInfo.options()) private[pinot] val writeSchema: StructType = logicalWriteInfo.schema() - private[pinot] val pinotSchema: Schema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + private[pinot] val pinotSchema: Schema = SparkToPinotTypeTranslator.translate( + writeSchema, writeOptions.tableName, writeOptions.timeColumnName, writeOptions.timeFormat, writeOptions.timeGranularity) override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): DataWriterFactory = { // capture the values to allow lambda serialization diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala index 5584a0233420..a26f923f9450 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala @@ -18,26 +18,32 @@ */ package org.apache.pinot.connector.spark.v3.datasource -import org.apache.pinot.spi.data.FieldSpec -import org.apache.pinot.spi.data.Schema +import org.apache.pinot.spi.data.{FieldSpec, Schema} import org.apache.pinot.spi.data.Schema.SchemaBuilder import org.apache.spark.sql.types._ object SparkToPinotTypeTranslator { - // TODO: incorporate time column - def translate(sparkSchema: StructType, tableName: String): Schema = { + def translate(sparkSchema: StructType, + tableName: String, + timeColumn: String, + timeFormat: String, + timeGranularity: String): Schema = { val schemaBuilder = new SchemaBuilder schemaBuilder.setSchemaName(tableName) for (field <- sparkSchema.fields) { val fieldName = field.name val sparkType = field.dataType val pinotType = translateType(sparkType) + if (pinotType != null) { - if (sparkType.isInstanceOf[ArrayType]) { - schemaBuilder.addMultiValueDimension(fieldName, pinotType) - } else { - schemaBuilder.addSingleValueDimension(fieldName, pinotType) + (fieldName, sparkType) match { + case (`timeColumn`, _) => + schemaBuilder.addDateTime(fieldName, pinotType, timeFormat, timeGranularity); + case (_, _: ArrayType) => + schemaBuilder.addMultiValueDimension(fieldName, pinotType) + case _ => + schemaBuilder.addSingleValueDimension(fieldName, pinotType) } } else throw new UnsupportedOperationException("Unsupported data type: " + sparkType) diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala index 45343a8999ee..64582c9df8b1 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala @@ -55,6 +55,8 @@ object ExampleSparkPinotConnectorWriteTest extends Logging { .option("noDictionaryColumns", "airport,state") .option("bloomFilterColumns", "airport") .option("timeColumnName", "ts") + .option("timeFormat", "EPOCH|SECONDS") + .option("timeGranularity", "1:SECONDS") .save("myPath") } } diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala index 33149ef952c2..165378545607 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala @@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, BinaryType} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.scalatest.matchers.should.Matchers import org.apache.hadoop.fs.{FileSystem, Path} @@ -55,6 +55,8 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter tableName = "testTable", savePath = "/tmp/pinot", timeColumnName = "ts", + timeFormat = "EPOCH|SECONDS", + timeGranularity = "1:SECONDS", segmentNameFormat = "{table}_{partitionId:03}", invertedIndexColumns = Array("name"), noDictionaryColumns = Array("age"), @@ -63,14 +65,18 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter ) val writeSchema = StructType(Seq( StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false) + StructField("age", IntegerType, nullable = false), + StructField("ts", LongType, nullable = false), + StructField("bin", BinaryType, nullable = false), )) - val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + val pinotSchema = SparkToPinotTypeTranslator.translate( + writeSchema, writeOptions.tableName, writeOptions.timeColumnName, + writeOptions.timeFormat, writeOptions.timeGranularity) val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema) - val record1 = new TestInternalRow(Array[Any]("Alice", 30)) - val record2 = new TestInternalRow(Array[Any]("Bob", 25)) + val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, "Alice".getBytes)) + val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, "Bob".getBytes)) writer.write(record1) writer.write(record2) @@ -92,7 +98,9 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter tableName = "testTable", savePath = tmpDir.getAbsolutePath, timeColumnName = "ts", - segmentNameFormat = "{table}_{partitionId:03}", + timeFormat = "EPOCH|SECONDS", + timeGranularity = "1:SECONDS", + segmentNameFormat = "{table}_{startTime}_{endTime}_{partitionId:03}", invertedIndexColumns = Array("name"), noDictionaryColumns = Array("age"), bloomFilterColumns = Array("name"), @@ -100,26 +108,32 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter ) val writeSchema = StructType(Seq( StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false) + StructField("age", IntegerType, nullable = false), + StructField("ts", LongType, nullable = false), + StructField("bin", BinaryType, nullable = false), )) - val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + val pinotSchema = SparkToPinotTypeTranslator.translate( + writeSchema, writeOptions.tableName, writeOptions.timeColumnName, + writeOptions.timeFormat, writeOptions.timeGranularity) val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema) - val record1 = new TestInternalRow(Array[Any]("Alice", 30)) + val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, "Alice".getBytes)) writer.write(record1) + val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, "Bob".getBytes)) + writer.write(record2) val commitMessage: WriterCommitMessage = writer.commit() commitMessage shouldBe a[SuccessWriterCommitMessage] // Verify that the segment is created and stored in the target location val fs = FileSystem.get(new URI(writeOptions.savePath), new org.apache.hadoop.conf.Configuration()) - val segmentPath = new Path(writeOptions.savePath + "/testTable_000.tar.gz") + val segmentPath = new Path(writeOptions.savePath + "/testTable_1234567890_1234567891_000.tar.gz") fs.exists(segmentPath) shouldBe true // Verify the contents of the segment tar file TarCompressionUtils.untar( - new File(writeOptions.savePath + "/testTable_000.tar.gz"), + new File(writeOptions.savePath + "/testTable_1234567890_1234567891_000.tar.gz"), new File(writeOptions.savePath)) - val untarDir = Paths.get(writeOptions.savePath + "/testTable_000/v3/") + val untarDir = Paths.get(writeOptions.savePath + "/testTable_1234567890_1234567891_000/v3/") Files.exists(untarDir) shouldBe true val segmentFiles = Files.list(untarDir).toArray.map(_.toString) @@ -132,14 +146,19 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter val metadataSrc = Source.fromFile(untarDir + "/metadata.properties") val metadataContent = metadataSrc.getLines.mkString("\n") metadataSrc.close() - metadataContent should include ("segment.name = testTable_000") + + metadataContent should include ("segment.name = testTable_1234567890_1234567891_000") + metadataContent should include ("segment.time.column.name = ts") + metadataContent should include ("segment.start.time = 1234567890") + metadataContent should include ("segment.end.time = 1234567891") } test("getSegmentName should format segment name correctly with custom format") { val testCases = Seq( ("{table}_{partitionId}", "airlineStats_12"), ("{partitionId:05}_{table}", "00012_airlineStats"), - ("{table}_20240805", "airlineStats_20240805") + ("{table}_20240805", "airlineStats_20240805"), + ("{table}_{startTime}_{endTime}_{partitionId:03}", "airlineStats_1234567890_1234567891_012"), ) testCases.foreach { case (format, expected) => @@ -147,19 +166,26 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter tableName = "airlineStats", savePath = "/tmp/pinot", timeColumnName = "ts", + timeFormat = "EPOCH|SECONDS", + timeGranularity = "1:SECONDS", segmentNameFormat = format, invertedIndexColumns = Array("name"), noDictionaryColumns = Array("age"), bloomFilterColumns = Array("name"), - rangeIndexColumns = Array() + rangeIndexColumns = Array(), ) val writeSchema = StructType(Seq( StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false) + StructField("age", IntegerType, nullable = false), + StructField("ts", LongType, nullable = false), )) - val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + val pinotSchema = SparkToPinotTypeTranslator.translate( + writeSchema, writeOptions.tableName, writeOptions.timeColumnName, + writeOptions.timeFormat, writeOptions.timeGranularity) val writer = new PinotDataWriter[InternalRow](12, 0, writeOptions, writeSchema, pinotSchema) + writer.write(new TestInternalRow(Array[Any]("Alice", 30, 1234567890L))) + writer.write(new TestInternalRow(Array[Any]("Bob", 25, 1234567891L))) val segmentName = writer.getSegmentName diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala index 6001339c273c..9d0f23374a91 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala @@ -19,6 +19,7 @@ package org.apache.pinot.connector.spark.v3.datasource import org.apache.pinot.spi.data.FieldSpec +import org.apache.pinot.spi.data.FieldSpec.FieldType import org.apache.spark.sql.types._ import org.scalatest.funsuite.AnyFunSuite @@ -40,11 +41,25 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite { for ((sparkType, expectedPinotType) <- typeMappings) { val fieldName = s"${sparkType.simpleString}Field" val sparkSchema = StructType(Array(StructField(fieldName, sparkType))) - val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table") + val pinotSchema = SparkToPinotTypeTranslator.translate( + sparkSchema, "table", null, null, null) assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == expectedPinotType) } } + test("Translate time column") { + val sparkSchema = StructType(Array(StructField("timeField", LongType))) + val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table", "timeField", + "EPOCH|SECONDS", "1:SECONDS") + + val dateTimeField = pinotSchema.getDateTimeSpec("timeField") + + assert(dateTimeField != null) + assert(dateTimeField.getFieldType == FieldType.DATE_TIME) + assert(dateTimeField.getFormat == "EPOCH|SECONDS") + assert(dateTimeField.getGranularity == "1:SECONDS") + } + test("Translate multi value data types") { val arrayTypeMappings = List( (ArrayType(StringType), FieldSpec.DataType.STRING), @@ -61,7 +76,8 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite { for ((sparkArrayType, expectedPinotType) <- arrayTypeMappings) { val fieldName = s"${sparkArrayType.simpleString}Field" val sparkSchema = StructType(Array(StructField(fieldName, sparkArrayType))) - val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table") + val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, "table", + null, null, null) assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == expectedPinotType) assert(!pinotSchema.getFieldSpecFor(fieldName).isSingleValueField) } diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala index b9bdee7daf9e..7a43e2392fff 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala @@ -29,6 +29,8 @@ object PinotDataSourceWriteOptions { val CONFIG_BLOOM_FILTER_COLUMNS = "bloomFilterColumns" val CONFIG_RANGE_INDEX_COLUMNS = "rangeIndexColumns" val CONFIG_TIME_COLUMN_NAME = "timeColumnName" + val CONFIG_TIME_FORMAT = "timeFormat" + val CONFIG_TIME_GRANULARITY = "timeGranularity" private[pinot] def from(options: util.Map[String, String]): PinotDataSourceWriteOptions = { if (!options.containsKey(CONFIG_TABLE_NAME)) { @@ -46,6 +48,8 @@ object PinotDataSourceWriteOptions { val bloomFilterColumns = options.getOrDefault(CONFIG_BLOOM_FILTER_COLUMNS, "").split(",").filter(_.nonEmpty) val rangeIndexColumns = options.getOrDefault(CONFIG_RANGE_INDEX_COLUMNS, "").split(",").filter(_.nonEmpty) val timeColumnName = options.getOrDefault(CONFIG_TIME_COLUMN_NAME, null) + val timeFormat = options.getOrDefault(CONFIG_TIME_FORMAT, null) + val timeGranularity = options.getOrDefault(CONFIG_TIME_GRANULARITY, null) if (tableName == null) { throw new IllegalArgumentException("Table name is required") @@ -56,12 +60,22 @@ object PinotDataSourceWriteOptions { if (segmentNameFormat == "") { throw new IllegalArgumentException("Segment name format cannot be empty string") } + if (timeColumnName != null) { + if (timeFormat.isEmpty == null) { + throw new IllegalArgumentException("Time format is required when time column name is specified") + } + if (timeGranularity == null) { + throw new IllegalArgumentException("Time granularity is required when time column name is specified") + } + } PinotDataSourceWriteOptions( tableName, segmentNameFormat, savePath, timeColumnName, + timeFormat, + timeGranularity, invertedIndexColumns, noDictionaryColumns, bloomFilterColumns, @@ -76,6 +90,8 @@ private[pinot] case class PinotDataSourceWriteOptions( segmentNameFormat: String, savePath: String, timeColumnName: String, + timeFormat: String, + timeGranularity: String, invertedIndexColumns: Array[String], noDictionaryColumns: Array[String], bloomFilterColumns: Array[String], diff --git a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala index aca91d06ea1a..72fbb0461d65 100644 --- a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala +++ b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala @@ -41,6 +41,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest { "segment_name", "/path/to/save", "timeCol", + "EPOCH|SECONDS", + "1:SECONDS", Array("col1", "col2"), Array("col3", "col4"), Array("col5", "col6"), @@ -109,6 +111,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest { "tbl-{partitionId:03}", "/path/to/save", null, + null, + null, Array.empty, Array.empty, Array.empty, From d509b2a3483ab72c617ec8a399dcbc1f9d650781 Mon Sep 17 00:00:00 2001 From: Caner Balci Date: Wed, 27 Nov 2024 13:58:17 -0800 Subject: [PATCH 2/4] Fix broken tests --- .../connector/spark/common/PinotDataSourceWriteOptions.scala | 2 +- .../spark/common/PinotDataSourceWriteOptionsTest.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala index 7a43e2392fff..79627447f17d 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala @@ -61,7 +61,7 @@ object PinotDataSourceWriteOptions { throw new IllegalArgumentException("Segment name format cannot be empty string") } if (timeColumnName != null) { - if (timeFormat.isEmpty == null) { + if (timeFormat == null) { throw new IllegalArgumentException("Time format is required when time column name is specified") } if (timeGranularity == null) { diff --git a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala index 72fbb0461d65..96d955ec8766 100644 --- a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala +++ b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala @@ -31,7 +31,9 @@ class PinotDataSourceWriteOptionsTest extends BaseTest { PinotDataSourceWriteOptions.CONFIG_NO_DICTIONARY_COLUMNS -> "col3,col4", PinotDataSourceWriteOptions.CONFIG_BLOOM_FILTER_COLUMNS -> "col5,col6", PinotDataSourceWriteOptions.CONFIG_RANGE_INDEX_COLUMNS -> "col7,col8", - PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol" + PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol", + PinotDataSourceWriteOptions.CONFIG_TIME_FORMAT -> "EPOCH|SECONDS", + PinotDataSourceWriteOptions.CONFIG_TIME_GRANULARITY -> "1:SECONDS", ) val pinotDataSourceWriteOptions = PinotDataSourceWriteOptions.from(options.asJava) From dab6d63650d04322a800a4b38df850dff93f54e7 Mon Sep 17 00:00:00 2001 From: Caner Balci Date: Wed, 27 Nov 2024 15:40:26 -0800 Subject: [PATCH 3/4] Fix write test --- .../spark/v3/datasource/PinotWriteTest.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala index c9197a67b9f5..0a30f0652979 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala @@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource import org.apache.pinot.spi.data.Schema import org.apache.spark.sql.connector.write.LogicalWriteInfo -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -35,6 +35,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers { "segmentNameFormat" -> "my_segment_format", "path" -> "/path/to/save", "timeColumnName" -> "timeCol", + "timeFormat" -> "EPOCH|SECONDS", + "timeGranularity" -> "1:SECONDS", "invertedIndexColumns" -> "col1,col2", "noDictionaryColumns" -> "col3,col4", "bloomFilterColumns" -> "col5,col6", @@ -43,7 +45,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers { val schema = StructType(Seq( StructField("col1", StringType), - StructField("col2", StringType) + StructField("col2", StringType), + StructField("timeCol", LongType), )) val logicalWriteInfo = new TestLogicalWriteInfo(new CaseInsensitiveStringMap(options.asJava), schema) @@ -54,6 +57,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers { pinotWrite.writeOptions.segmentNameFormat shouldEqual "my_segment_format" pinotWrite.writeOptions.savePath shouldEqual "/path/to/save" pinotWrite.writeOptions.timeColumnName shouldEqual "timeCol" + pinotWrite.writeOptions.timeFormat shouldEqual "EPOCH|SECONDS" + pinotWrite.writeOptions.timeGranularity shouldEqual "1:SECONDS" pinotWrite.writeOptions.invertedIndexColumns shouldEqual Array("col1", "col2") pinotWrite.writeOptions.noDictionaryColumns shouldEqual Array("col3", "col4") pinotWrite.writeOptions.bloomFilterColumns shouldEqual Array("col5", "col6") @@ -68,7 +73,15 @@ class PinotWriteTest extends AnyFunSuite with Matchers { | "dimensionFieldSpecs": [ | {"name": "col1", "dataType": "STRING"}, | {"name": "col2", "dataType": "STRING"} - | ] + | ], + | "dateTimeFieldSpecs" : [ { + | "name" : "timeCol", + | "dataType" : "LONG", + | "fieldType" : "DATE_TIME", + | "notNull" : false, + | "format" : "EPOCH|SECONDS", + | "granularity" : "1:SECONDS" + | } ] |} |""".stripMargin) pinotWrite.pinotSchema shouldEqual expectedPinotSchema From 2663d3b53cc0b936901a1d84d5f504a22ff64097 Mon Sep 17 00:00:00 2001 From: Caner Balci Date: Mon, 2 Dec 2024 13:54:08 -0800 Subject: [PATCH 4/4] Update javadocs to include startTime and endTime vars --- .../connector/spark/v3/datasource/PinotDataWriter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala index 551100794be4..41f86e4d2db8 100644 --- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala @@ -98,7 +98,10 @@ class PinotDataWriter[InternalRow]( /** This method is used to generate the segment name based on the format * provided in the write options (segmentNameFormat). * The format can contain variables like {partitionId}. - * Currently supported variables are `partitionId`, `table` + * Currently supported variables are `partitionId`, `table`, `startTime` and `endTime` + * + * `startTime` and `endTime` are the minimum and maximum values of the time column in the records + * and it is only available if the time column is numeric. * * It also supports the following, python inspired format specifier for digit formatting: * `{partitionId:05}` @@ -109,6 +112,7 @@ class PinotDataWriter[InternalRow]( * "{partitionId:05}_{table}" -> "00012_airlineStats" * "{table}_{partitionId}" -> "airlineStats_12" * "{table}_20240805" -> "airlineStats_20240805" + * "{table}_{startTime}_{endTime}_{partitionId:03}" -> "airlineStats_1234567890_1234567891_012" */ private[pinot] def getSegmentName: String = { val format = writeOptions.segmentNameFormat