From a252da05fd770428d87022cee9b3cddf21715671 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Mon, 13 Oct 2025 23:35:55 -0700 Subject: [PATCH 01/10] Fix src Signed-off-by: Andy HF Kwok --- pom.xml | 14 +++++++------- .../apache/comet/parquet/SourceFilterSerde.scala | 4 ++-- .../scala/org/apache/comet/serde/literals.scala | 4 ++-- .../spark/sql/comet/CometBatchScanExec.scala | 2 +- .../sql/comet/CometBroadcastExchangeExec.scala | 2 +- .../spark/sql/comet/CometCollectLimitExec.scala | 2 +- .../spark/sql/comet/CometColumnarToRowExec.scala | 4 ++-- .../org/apache/spark/sql/comet/CometScanExec.scala | 4 ++-- .../spark/sql/comet/CometSparkToColumnarExec.scala | 4 ++-- .../sql/comet/CometTakeOrderedAndProjectExec.scala | 2 +- .../shuffle/CometBlockStoreShuffleReader.scala | 2 +- .../shuffle/CometShuffleExchangeExec.scala | 8 ++++---- 12 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 6a6254dfcf..e67e88d391 100644 --- a/pom.xml +++ b/pom.xml @@ -738,14 +738,14 @@ under the License. scala-maven-plugin - -deprecation - -unchecked - -feature - -Xlint:_ - -Ywarn-dead-code + + + + + -Ywarn-numeric-widen - -Ywarn-value-discard - -Ywarn-unused:imports,patvars,privates,locals,params,-implicits + + -Xfatal-warnings diff --git a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala index ac6a89ca3b..6618a7ddb0 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala @@ -80,8 +80,8 @@ object SourceFilterSerde extends Logging { // refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala dataType match { case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) - case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) - case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) + case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt) + case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt) case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int]) case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long]) case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float]) diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala b/spark/src/main/scala/org/apache/comet/serde/literals.scala index e24b55449c..fff9474179 100644 --- a/spark/src/main/scala/org/apache/comet/serde/literals.scala +++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala @@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { exprBuilder.setIsNull(false) dataType match { case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) - case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) - case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) + case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt) + case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt) case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) case _: LongType | _: TimestampType | _: TimestampNTZType => exprBuilder.setLongVal(value.asInstanceOf[Long]) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala index a4af5f1f34..3d30776812 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala @@ -70,7 +70,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres override def next(): ColumnarBatch = { val batch = batches.next() - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala index 95770592fd..8da97271a2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala @@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec( longMetric("numOutputRows") += numRows if (numRows >= maxBroadcastRows) { throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError( - maxBroadcastRows, + maxBroadcastRows.toLong, numRows) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 09794e8e26..5dd45a3345 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -88,7 +88,7 @@ case class CometCollectLimitExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) new CometShuffledBatchRDD(dep, readMetrics) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index d965a6ff7b..c6f1dd14d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan) val toUnsafe = UnsafeProjection.create(localOutput, localOutput) batches.flatMap { batch => numInputBatches += 1 - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch.rowIterator().asScala.map(toUnsafe) } } @@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan) .flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName)) .flatMap { batch => numInputBatches += 1 - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch.rowIterator().asScala.map(toUnsafe) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 4dd889d231..2aee93effe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -242,7 +242,7 @@ case class CometScanExec( driverMetrics("staticFilesSize") = filesSize } if (relation.partitionSchema.nonEmpty) { - driverMetrics("numPartitions") = partitions.length + driverMetrics("numPartitions") = partitions.length.toLong } } @@ -277,7 +277,7 @@ case class CometScanExec( override def next(): ColumnarBatch = { val batch = batches.next() - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index bcf8918575..630a33172f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan) val startNs = System.nanoTime() val batch = iter.next() conversionTime += System.nanoTime() - startNs - numInputRows += batch.numRows() + numInputRows += batch.numRows().toLong numOutputBatches += 1 batch } @@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan) CometArrowConverters.rowToArrowBatchIter( sparkBatches, schema, - maxRecordsPerBatch, + maxRecordsPerBatch.toLong, timeZoneId, context) createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index aa89dec137..027aacb602 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) new CometShuffledBatchRDD(dep, readMetrics) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala index 1283a745a6..0a9c0bed12 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala @@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C]( // Update the context task metrics for each record read. val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => - readMetrics.incRecordsRead(record._2.numRows()) + readMetrics.incRecordsRead(record._2.numRows().toLong) record }, context.taskMetrics().mergeShuffleReadMetrics()) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1f7d37a108..14a6dbe589 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -137,7 +137,7 @@ case class CometShuffleExchangeExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates( sparkContext, @@ -151,7 +151,7 @@ case class CometShuffleExchangeExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates( sparkContext, @@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { // end up being almost the same regardless of the index. substantially scrambling the // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() - var position = new XORShiftRandom(partitionId).nextInt(numPartitions) + var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions) (_: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 @@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = { // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. result.isNull = false - result.value = row.hashCode() + result.value = row.hashCode().toLong result } } From b264240c6bd4d9ee0b563da274388aeb3dae1bab Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Mon, 13 Oct 2025 23:42:37 -0700 Subject: [PATCH 02/10] Fix test - pt1 Signed-off-by: Andy HF Kwok --- .../sql/benchmark/CometReadBenchmark.scala | 29 +++++++++++------ .../sql/benchmark/CometShuffleBenchmark.scala | 12 +++---- .../CometStringExpressionBenchmark.scala | 32 +++++++++---------- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index a5db4f290d..536d02f663 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -52,7 +52,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def numericScanBenchmark(values: Int, dataType: DataType): Unit = { // Benchmarks running through spark sql. val sqlBenchmark = - new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output = output) + new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -101,7 +101,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = { // Benchmarks running through spark sql. val sqlBenchmark = - new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan", values, output = output) + new Benchmark( + s"SQL Single ${dataType.sql} Encrypted Column Scan", + values.toLong, + output = output) val encoder = Base64.getEncoder val footerKey = @@ -189,7 +192,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def decimalScanBenchmark(values: Int, precision: Int, scale: Int): Unit = { val sqlBenchmark = new Benchmark( s"SQL Single Decimal(precision: $precision, scale: $scale) Column Scan", - values, + values.toLong, output = output) withTempPath { dir => @@ -237,7 +240,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def readerBenchmark(values: Int, dataType: DataType): Unit = { val sqlBenchmark = - new Benchmark(s"Parquet reader benchmark for $dataType", values, output = output) + new Benchmark(s"Parquet reader benchmark for $dataType", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -318,7 +321,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def numericFilterScanBenchmark(values: Int, fractionOfZeros: Double): Unit = { val percentageOfZeros = fractionOfZeros * 100 val benchmark = - new Benchmark(s"Numeric Filter Scan ($percentageOfZeros% zeros)", values, output = output) + new Benchmark( + s"Numeric Filter Scan ($percentageOfZeros% zeros)", + values.toLong, + output = output) withTempPath { dir => withTempTable("parquetV1Table", "parquetV2Table") { @@ -365,7 +371,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def stringWithDictionaryScanBenchmark(values: Int): Unit = { val sqlBenchmark = - new Benchmark("String Scan with Dictionary Encoding", values, output = output) + new Benchmark("String Scan with Dictionary Encoding", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table", "parquetV2Table") { @@ -424,7 +430,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = { val percentageOfNulls = fractionOfNulls * 100 val benchmark = - new Benchmark(s"String with Nulls Scan ($percentageOfNulls%)", values, output = output) + new Benchmark( + s"String with Nulls Scan ($percentageOfNulls%)", + values.toLong, + output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -483,7 +492,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { def columnsBenchmark(values: Int, width: Int): Unit = { val benchmark = - new Benchmark(s"Single Column Scan from $width columns", values, output = output) + new Benchmark(s"Single Column Scan from $width columns", values.toLong, output = output) withTempPath { dir => withTempTable("t1", "parquetV1Table") { @@ -533,7 +542,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"Large String Filter Scan ($percentageOfZeros% zeros)", - values, + values.toLong, output = output) withTempPath { dir => @@ -584,7 +593,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"Sorted Lg Str Filter Scan ($percentageOfZeros% zeros)", - values, + values.toLong, output = output) withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala index 69e0d8b9a0..48fa38b17e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala @@ -70,7 +70,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL ${dataType.sql} shuffle on array ($partitionNum Partition)", - values, + values.toLong, output = output) withTempPath { dir => @@ -122,7 +122,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL ${dataType.sql} shuffle on struct ($partitionNum Partition)", - values, + values.toLong, output = output) withTempPath { dir => @@ -181,7 +181,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL ${dataType.sql} Dictionary Shuffle($partitionNum Partition)", - values, + values.toLong, output = output) withTempPath { dir => @@ -272,7 +272,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL Single ${dataType.sql} Shuffle($partitionNum Partition) $randomTitle", - values, + values.toLong, output = output) withTempPath { dir => @@ -358,7 +358,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL Wide ($width cols) ${dataType.sql} Shuffle($partitionNum Partition)", - values, + values.toLong, output = output) val projection = (1 to width) @@ -428,7 +428,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"SQL Wide ($width cols) ${dataType.sql} Range Partition Shuffle($partitionNum Partition)", - values, + values.toLong, output = output) val projection = (1 to width) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index 0546c91738..ac3d30454a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -32,7 +32,7 @@ import org.apache.comet.CometConf object CometStringExpressionBenchmark extends CometBenchmarkBase { def subStringExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Substring Expr", values, output = output) + val benchmark = new Benchmark("Substring Expr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -62,7 +62,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def stringSpaceExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("StringSpace Expr", values, output = output) + val benchmark = new Benchmark("StringSpace Expr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -92,7 +92,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def asciiExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr ascii", values, output = output) + val benchmark = new Benchmark("Expr ascii", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -122,7 +122,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def bitLengthExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr bit_length", values, output = output) + val benchmark = new Benchmark("Expr bit_length", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -152,7 +152,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def octetLengthExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr octet_length", values, output = output) + val benchmark = new Benchmark("Expr octet_length", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -182,7 +182,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def upperExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr upper", values, output = output) + val benchmark = new Benchmark("Expr upper", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -213,7 +213,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def lowerExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr lower", values, output = output) + val benchmark = new Benchmark("Expr lower", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -243,7 +243,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def chrExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr chr", values, output = output) + val benchmark = new Benchmark("Expr chr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -273,7 +273,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def initCapExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr initCap", values, output = output) + val benchmark = new Benchmark("Expr initCap", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -303,7 +303,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def trimExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr trim", values, output = output) + val benchmark = new Benchmark("Expr trim", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -333,7 +333,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def concatwsExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr concatws", values, output = output) + val benchmark = new Benchmark("Expr concatws", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -363,7 +363,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def lengthExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr length", values, output = output) + val benchmark = new Benchmark("Expr length", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -393,7 +393,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def repeatExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr repeat", values, output = output) + val benchmark = new Benchmark("Expr repeat", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -423,7 +423,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def reverseExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr reverse", values, output = output) + val benchmark = new Benchmark("Expr reverse", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -483,7 +483,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def replaceExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr replace", values, output = output) + val benchmark = new Benchmark("Expr replace", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -513,7 +513,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def translateExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr translate", values, output = output) + val benchmark = new Benchmark("Expr translate", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { From 3a1754a07ff282b5f5c42d69bee15fb8ae7131f8 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Tue, 14 Oct 2025 22:11:54 -0700 Subject: [PATCH 03/10] Pt2 Signed-off-by: Andy HF Kwok --- .../apache/comet/CometExpressionSuite.scala | 2 +- .../apache/comet/exec/CometExecSuite.scala | 6 ++-- .../comet/parquet/ParquetReadSuite.scala | 24 ++++++++-------- .../org/apache/spark/sql/CometTestBase.scala | 28 +++++++++---------- .../org/apache/spark/sql/GenTPCHData.scala | 6 ++-- .../benchmark/CometAggregateBenchmark.scala | 8 +++--- .../benchmark/CometArithmeticBenchmark.scala | 4 +-- .../sql/benchmark/CometBenchmarkBase.scala | 4 +-- .../CometConditionalExpressionBenchmark.scala | 4 +-- .../CometDatetimeExpressionBenchmark.scala | 4 +-- .../sql/benchmark/CometExecBenchmark.scala | 13 +++++---- .../CometPredicateExpressionBenchmark.scala | 2 +- .../CometStringExpressionBenchmark.scala | 2 +- 13 files changed, 56 insertions(+), 51 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e4fb08101d..806c3d913a 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1202,7 +1202,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded spark - .range(num) + .range(num.toLong) .map(_ % div) // Parquet doesn't allow column names with spaces, have to add an alias here. // Minus 500 here so that negative decimals are also tested. diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index fb8af7efbc..1b5b4e690a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1700,7 +1700,7 @@ class CometExecSuite extends CometTestBase { withTable("t1") { val numRows = 10 spark - .range(numRows) + .range(numRows.toLong) .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") .repartition(3) // Move data across multiple partitions .write @@ -1737,7 +1737,7 @@ class CometExecSuite extends CometTestBase { withTable("t1") { val numRows = 10 spark - .range(numRows) + .range(numRows.toLong) .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") .repartition(3) // Force repartition to test data will come to single partition .write @@ -1768,7 +1768,7 @@ class CometExecSuite extends CometTestBase { withTable("t1") { val numRows = 10 spark - .range(numRows) + .range(numRows.toLong) .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") .repartition(3) // Force repartition to test data will come to single partition .write diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 73ddf750bd..31d66b78b2 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -416,15 +416,15 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) + record.add(1, i.toByte.toInt) + record.add(2, i.toShort.toInt) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) + record.add(8, (-i).toByte.toInt) + record.add(9, (-i).toShort.toInt) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -639,8 +639,8 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) + record.add(1, i.toByte.toInt) + record.add(2, i.toShort.toInt) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) @@ -1577,15 +1577,15 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) + record.add(1, i.toByte.toInt) + record.add(2, i.toShort.toInt) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) + record.add(8, (-i).toByte.toInt) + record.add(9, (-i).toShort.toInt) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -1674,7 +1674,7 @@ abstract class ParquetReadSuite extends CometTestBase { val record = new SimpleGroup(schema) opt match { case Some(i) => - record.add(0, i.toShort) + record.add(0, i.toShort.toInt) record.add(1, i) record.add(2, i.toLong) case _ => @@ -1767,7 +1767,7 @@ abstract class ParquetReadSuite extends CometTestBase { } private def withId(id: Int) = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id.toLong).build() // Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct") test("array of nested struct with and without field id") { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 7c9ca6ea08..81a7bbbf6d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -682,15 +682,15 @@ abstract class CometTestBase opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) + record.add(1, i.toByte.toInt) + record.add(2, i.toShort.toInt) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) + record.add(8, (-i).toByte.toInt) + record.add(9, (-i).toShort.toInt) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -711,15 +711,15 @@ abstract class CometTestBase val i = rand.nextLong() val record = new SimpleGroup(schema) record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) + record.add(1, i.toByte.toInt) + record.add(2, i.toShort.toInt) record.add(3, i.toInt) record.add(4, i) record.add(5, java.lang.Float.intBitsToFloat(i.toInt)) record.add(6, java.lang.Double.longBitsToDouble(i)) record.add(7, i.toString * 24) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) + record.add(8, (-i).toByte.toInt) + record.add(9, (-i).toShort.toInt) record.add(10, (-i).toInt) record.add(11, -i) record.add(12, i.toString) @@ -768,7 +768,7 @@ abstract class CometTestBase if (rand.nextBoolean()) { None } else { - Some(getValue(i, div)) + Some(getValue(i.toLong, div.toLong)) } } expected.foreach { opt => @@ -822,7 +822,7 @@ abstract class CometTestBase if (rand.nextBoolean()) { None } else { - Some(getValue(i, div)) + Some(getValue(i.toLong, div.toLong)) } } expected.foreach { opt => @@ -1000,7 +1000,7 @@ abstract class CometTestBase val div = if (dictionaryEnabled) 10 else n // maps value to a small range for dict to kick in val expected = (0 until n).map { i => - Some(getValue(i, div)) + Some(getValue(i.toLong, div.toLong)) } expected.foreach { opt => val timestampFormats = List( @@ -1048,7 +1048,7 @@ abstract class CometTestBase def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded spark - .range(num) + .range(num.toLong) .map(_ % div) // Parquet doesn't allow column names with spaces, have to add an alias here. // Minus 500 here so that negative decimals are also tested. @@ -1228,8 +1228,8 @@ abstract class CometTestBase val record = new SimpleGroup(schema) opt match { case Some(i) => - record.add(0, i.toByte) - record.add(1, i.toShort) + record.add(0, i.toByte.toInt) + record.add(1, i.toShort.toInt) record.add(2, i) record.add(3, i.toLong) record.add(4, rand.nextFloat()) diff --git a/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala b/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala index e25d4e51e4..4087bbd28c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala +++ b/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala @@ -65,7 +65,9 @@ object GenTPCHData { // Install the data generators in all nodes // TODO: think a better way to install on each worker node // such as https://stackoverflow.com/a/40876671 - spark.range(0, workers, 1, workers).foreach(worker => installDBGEN(baseDir)(worker)) + spark + .range(0L, workers.toLong, 1L, workers) + .foreach(worker => installDBGEN(baseDir)(worker)) s"${baseDir}/dbgen" } else { config.dbgenDir @@ -91,7 +93,7 @@ object GenTPCHData { // Clean up if (defaultDbgenDir != null) { - spark.range(0, workers, 1, workers).foreach { _ => + spark.range(0L, workers.toLong, 1L, workers).foreach { _ => val _ = FileUtils.deleteQuietly(defaultDbgenDir) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala index 5fb2d63fbf..df98dc5013 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala @@ -66,7 +66,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { new Benchmark( s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + s"single aggregate ${aggregateFunction.toString}", - values, + values.toLong, output = output) withTempPath { dir => @@ -104,7 +104,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { new Benchmark( s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + s"single aggregate ${aggregateFunction.toString} on decimal", - values, + values.toLong, output = output) val df = makeDecimalDataFrame(values, dataType, false); @@ -145,7 +145,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { new Benchmark( s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " + s"single aggregate ${aggregateFunction.toString}", - values, + values.toLong, output = output) withTempPath { dir => @@ -186,7 +186,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { new Benchmark( s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " + s"multiple aggregates ${aggregateFunction.toString}", - values, + values.toLong, output = output) withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala index c6fe55b56b..af6648241a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala @@ -37,7 +37,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { val dataType = IntegerType val benchmark = new Benchmark( s"Binary op ${dataType.sql}, dictionary = $useDictionary", - values, + values.toLong, output = output) withTempPath { dir => @@ -78,7 +78,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { useDictionary: Boolean): Unit = { val benchmark = new Benchmark( s"Binary op ${dataType.sql}, dictionary = $useDictionary", - values, + values.toLong, output = output) val df = makeDecimalDataFrame(values, dataType, useDictionary) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 1cbe27be91..22bb5b0b0b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -81,7 +81,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { withTempTable(tbl) { import spark.implicits._ spark - .range(values) + .range(values.toLong) .map(_ => if (useDictionary) Random.nextLong % 5 else Random.nextLong) .createOrReplaceTempView(tbl) runBenchmark(benchmarkName)(f(values)) @@ -168,7 +168,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { val div = if (useDictionary) 5 else values spark - .range(values) + .range(values.toLong) .map(_ % div) .select((($"value" - 500) / 100.0) cast decimal as Symbol("dec")) } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala index 0dddfb36a5..4495c9d075 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala @@ -32,7 +32,7 @@ import org.apache.comet.CometConf object CometConditionalExpressionBenchmark extends CometBenchmarkBase { def caseWhenExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Case When Expr", values, output = output) + val benchmark = new Benchmark("Case When Expr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -65,7 +65,7 @@ object CometConditionalExpressionBenchmark extends CometBenchmarkBase { } def ifExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("If Expr", values, output = output) + val benchmark = new Benchmark("If Expr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 0af1ecade5..d9e49a6ca3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -39,7 +39,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level => val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Date Truncate $isDictionary - $level", values) { + runWithComet(s"Date Truncate $isDictionary - $level", values.toLong) { spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop() } } @@ -68,7 +68,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { "WEEK", "QUARTER").foreach { level => val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Timestamp Truncate $isDictionary - $level", values) { + runWithComet(s"Timestamp Truncate $isDictionary - $level", values.toLong) { spark.sql(s"select date_trunc('$level', ts) from parquetV1Table").noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index 277cbdae62..f55a8a32a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -70,7 +70,10 @@ object CometExecBenchmark extends CometBenchmarkBase { def numericFilterExecBenchmark(values: Int, fractionOfZeros: Double): Unit = { val percentageOfZeros = fractionOfZeros * 100 val benchmark = - new Benchmark(s"Project + Filter Exec ($percentageOfZeros% zeros)", values, output = output) + new Benchmark( + s"Project + Filter Exec ($percentageOfZeros% zeros)", + values.toLong, + output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -114,7 +117,7 @@ object CometExecBenchmark extends CometBenchmarkBase { } def subqueryExecBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Subquery", values, output = output) + val benchmark = new Benchmark("Subquery", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -154,7 +157,7 @@ object CometExecBenchmark extends CometBenchmarkBase { } def sortExecBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Sort Exec", values, output = output) + val benchmark = new Benchmark("Sort Exec", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -184,7 +187,7 @@ object CometExecBenchmark extends CometBenchmarkBase { } def expandExecBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expand Exec", values, output = output) + val benchmark = new Benchmark("Expand Exec", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { @@ -231,7 +234,7 @@ object CometExecBenchmark extends CometBenchmarkBase { val benchmark = new Benchmark( s"BloomFilterAggregate Exec (cardinality $cardinality)", - values, + values.toLong, output = output) val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala index 2ca924821c..43971ea9cc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala @@ -32,7 +32,7 @@ import org.apache.comet.CometConf object CometPredicateExpressionBenchmark extends CometBenchmarkBase { def inExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("in Expr", values, output = output) + val benchmark = new Benchmark("in Expr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index ac3d30454a..3e42a2ece0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -453,7 +453,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { } def instrExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Expr instr", values, output = output) + val benchmark = new Benchmark("Expr instr", values.toLong, output = output) withTempPath { dir => withTempTable("parquetV1Table") { From 5661a67bd1955644de8c9c5da73925870170b408 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Tue, 14 Oct 2025 22:17:49 -0700 Subject: [PATCH 04/10] Spark - pt3 Signed-off-by: Andy HF Kwok --- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 81a7bbbf6d..ed7356da0e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql import java.util.concurrent.atomic.AtomicInteger +import scala.annotation.nowarn import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -550,6 +551,7 @@ abstract class CometTestBase case None => f(spark.read.format("parquet").load(path)) } + @nowarn("cat=deprecation") protected def createParquetWriter( schema: MessageType, path: Path, @@ -559,7 +561,6 @@ abstract class CometTestBase pageRowCountLimit: Int = ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT, rowGroupSize: Long = 1024 * 1024L): ParquetWriter[Group] = { val hadoopConf = spark.sessionState.newHadoopConf() - ExampleParquetWriter .builder(path) .withDictionaryEncoding(dictionaryEnabled) From 3c8d1788172e5573133a3cde73fe2d5e5a6adc93 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Thu, 16 Oct 2025 21:59:02 -0700 Subject: [PATCH 05/10] Revert pom --- pom.xml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index e67e88d391..6a6254dfcf 100644 --- a/pom.xml +++ b/pom.xml @@ -738,14 +738,14 @@ under the License. scala-maven-plugin - - - - - + -deprecation + -unchecked + -feature + -Xlint:_ + -Ywarn-dead-code -Ywarn-numeric-widen - - + -Ywarn-value-discard + -Ywarn-unused:imports,patvars,privates,locals,params,-implicits -Xfatal-warnings From 07d8f3580ddc48c7fc65e7e73082f80680e5d729 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Sat, 1 Nov 2025 10:14:11 -0700 Subject: [PATCH 06/10] WIP --- pom.xml | 14 +++++++------- .../apache/comet/parquet/ParquetReadSuite.scala | 4 +++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 6a6254dfcf..e67e88d391 100644 --- a/pom.xml +++ b/pom.xml @@ -738,14 +738,14 @@ under the License. scala-maven-plugin - -deprecation - -unchecked - -feature - -Xlint:_ - -Ywarn-dead-code + + + + + -Ywarn-numeric-widen - -Ywarn-value-discard - -Ywarn-unused:imports,patvars,privates,locals,params,-implicits + + -Xfatal-warnings diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 31d66b78b2..ddd280866f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -23,6 +23,7 @@ import java.io.{File, FileFilter} import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} +import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -1532,6 +1533,7 @@ abstract class ParquetReadSuite extends CometTestBase { } test("test pre-fetching multiple files") { + @nowarn("msg=implicit numeric widening") def makeRawParquetFile( path: Path, dictionaryEnabled: Boolean, @@ -1577,7 +1579,7 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte.toInt) + record.add(1, i.toByte) record.add(2, i.toShort.toInt) record.add(3, i) record.add(4, i.toLong) From 0e54d60023d2dd119ddcdae8b12a542e0446b003 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Sat, 8 Nov 2025 15:50:28 -0800 Subject: [PATCH 07/10] Revert widen changes --- .../comet/parquet/ParquetReadSuite.scala | 21 ++++++++++--------- .../org/apache/spark/sql/CometTestBase.scala | 20 ++++++++++-------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index ddd280866f..75ec0e776d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -53,6 +53,7 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.rules.CometScanTypeChecker +@nowarn("cat=w-flag-numeric-widen") abstract class ParquetReadSuite extends CometTestBase { import testImplicits._ @@ -417,15 +418,15 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte.toInt) - record.add(2, i.toShort.toInt) + record.add(1, i.toByte) + record.add(2, i.toShort) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte.toInt) - record.add(9, (-i).toShort.toInt) + record.add(8, (-i).toByte) + record.add(9, (-i).toShort) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -640,8 +641,8 @@ abstract class ParquetReadSuite extends CometTestBase { opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte.toInt) - record.add(2, i.toShort.toInt) + record.add(1, i.toByte) + record.add(2, i.toShort) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) @@ -1580,14 +1581,14 @@ abstract class ParquetReadSuite extends CometTestBase { case Some(i) => record.add(0, i % 2 == 0) record.add(1, i.toByte) - record.add(2, i.toShort.toInt) + record.add(2, i.toShort) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte.toInt) - record.add(9, (-i).toShort.toInt) + record.add(8, (-i).toByte) + record.add(9, (-i).toShort) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -1676,7 +1677,7 @@ abstract class ParquetReadSuite extends CometTestBase { val record = new SimpleGroup(schema) opt match { case Some(i) => - record.add(0, i.toShort.toInt) + record.add(0, i.toShort) record.add(1, i) record.add(2, i.toLong) case _ => diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index ed7356da0e..c248d91b60 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -560,6 +560,7 @@ abstract class CometTestBase dictionaryPageSize: Int = 1024, pageRowCountLimit: Int = ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT, rowGroupSize: Long = 1024 * 1024L): ParquetWriter[Group] = { + val hadoopConf = spark.sessionState.newHadoopConf() ExampleParquetWriter .builder(path) @@ -648,6 +649,7 @@ abstract class CometTestBase } } + @nowarn("cat=w-flag-numeric-widen") def makeParquetFileAllPrimitiveTypes( path: Path, dictionaryEnabled: Boolean, @@ -683,15 +685,15 @@ abstract class CometTestBase opt match { case Some(i) => record.add(0, i % 2 == 0) - record.add(1, i.toByte.toInt) - record.add(2, i.toShort.toInt) + record.add(1, i.toByte) + record.add(2, i.toShort) record.add(3, i) record.add(4, i.toLong) record.add(5, i.toFloat) record.add(6, i.toDouble) record.add(7, i.toString * 48) - record.add(8, (-i).toByte.toInt) - record.add(9, (-i).toShort.toInt) + record.add(8, (-i).toByte) + record.add(9, (-i).toShort) record.add(10, -i) record.add(11, (-i).toLong) record.add(12, i.toString) @@ -712,15 +714,15 @@ abstract class CometTestBase val i = rand.nextLong() val record = new SimpleGroup(schema) record.add(0, i % 2 == 0) - record.add(1, i.toByte.toInt) - record.add(2, i.toShort.toInt) - record.add(3, i.toInt) + record.add(1, i.toByte) + record.add(2, i.toShort) + record.add(3, i) record.add(4, i) record.add(5, java.lang.Float.intBitsToFloat(i.toInt)) record.add(6, java.lang.Double.longBitsToDouble(i)) record.add(7, i.toString * 24) - record.add(8, (-i).toByte.toInt) - record.add(9, (-i).toShort.toInt) + record.add(8, (-i).toByte) + record.add(9, (-i).toShort) record.add(10, (-i).toInt) record.add(11, -i) record.add(12, i.toString) From 666f5ded41fc98c6d02fb7ee2b738d1b33268969 Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Sat, 8 Nov 2025 15:54:38 -0800 Subject: [PATCH 08/10] revert pt 2 --- .../src/test/scala/org/apache/spark/sql/CometTestBase.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index c248d91b60..9aa9837c80 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -1178,6 +1178,7 @@ abstract class CometTestBase df.showString(_numRows, truncate, vertical) } + @nowarn("cat=w-flag-numeric-widen") def makeParquetFile( path: Path, total: Int, @@ -1231,8 +1232,8 @@ abstract class CometTestBase val record = new SimpleGroup(schema) opt match { case Some(i) => - record.add(0, i.toByte.toInt) - record.add(1, i.toShort.toInt) + record.add(0, i.toByte) + record.add(1, i.toShort) record.add(2, i) record.add(3, i.toLong) record.add(4, rand.nextFloat()) From 8712bde1e9c42064dc9f83c2725e481aa61f736a Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Sat, 8 Nov 2025 15:55:58 -0800 Subject: [PATCH 09/10] Revert mvn changes --- pom.xml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index e67e88d391..6a6254dfcf 100644 --- a/pom.xml +++ b/pom.xml @@ -738,14 +738,14 @@ under the License. scala-maven-plugin - - - - - + -deprecation + -unchecked + -feature + -Xlint:_ + -Ywarn-dead-code -Ywarn-numeric-widen - - + -Ywarn-value-discard + -Ywarn-unused:imports,patvars,privates,locals,params,-implicits -Xfatal-warnings From 4fa23a77fbe99cef833244df41a7ca12788a9ccc Mon Sep 17 00:00:00 2001 From: Andy HF Kwok Date: Sat, 8 Nov 2025 15:57:09 -0800 Subject: [PATCH 10/10] Revert-pt3 --- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 9aa9837c80..b58399ee5a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -716,7 +716,7 @@ abstract class CometTestBase record.add(0, i % 2 == 0) record.add(1, i.toByte) record.add(2, i.toShort) - record.add(3, i) + record.add(3, i.toInt) record.add(4, i) record.add(5, java.lang.Float.intBitsToFloat(i.toInt)) record.add(6, java.lang.Double.longBitsToDouble(i))