From ac5c75537f4b84f5517a71d05900de196a40a900 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 26 Sep 2024 13:56:30 +0800 Subject: [PATCH] address comments --- .../paimon/spark/PaimonBaseScanBuilder.scala | 8 ++++---- .../apache/paimon/spark/PaimonScanBuilder.scala | 14 +++++++++----- .../org/apache/paimon/spark/PaimonSplitScan.scala | 2 +- .../paimon/spark/aggregate/LocalAggregator.scala | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index b7ffffe5bff2..da778bb1c5fa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -36,14 +36,14 @@ abstract class PaimonBaseScanBuilder(table: Table) protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType()) - protected var pushed: Array[(Filter, Predicate)] = Array.empty + protected var pushedPredicates: Array[(Filter, Predicate)] = Array.empty protected var partitionFilter: Array[Filter] = Array.empty protected var pushDownLimit: Option[Int] = None override def build(): Scan = { - PaimonScan(table, requiredSchema, pushed.map(_._2), partitionFilter, pushDownLimit) + PaimonScan(table, requiredSchema, pushedPredicates.map(_._2), partitionFilter, pushDownLimit) } /** @@ -74,7 +74,7 @@ abstract class PaimonBaseScanBuilder(table: Table) } if (pushable.nonEmpty) { - this.pushed = pushable.toArray + this.pushedPredicates = pushable.toArray } if (partitionFilter.nonEmpty) { this.partitionFilter = partitionFilter.toArray @@ -83,7 +83,7 @@ abstract class PaimonBaseScanBuilder(table: Table) } override def pushedFilters(): Array[Filter] = { - pushed.map(_._1) + pushedPredicates.map(_._1) } override def pruneColumns(requiredSchema: StructType): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala index f87427154f59..b5c0ee53cd1c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala @@ -53,24 +53,28 @@ class PaimonScanBuilder(table: Table) } // Only support with push down partition filter - if (pushed.length != partitionFilter.length) { + if (pushedPredicates.length != partitionFilter.length) { return false } val aggregator = new LocalAggregator(table) - if (!aggregator.supportAggregation(aggregation)) { + if (!aggregator.pushAggregation(aggregation)) { return false } val readBuilder = table.newReadBuilder - if (pushed.nonEmpty) { - val pushedPartitionPredicate = PredicateBuilder.and(pushed.map(_._2): _*) + if (pushedPredicates.nonEmpty) { + val pushedPartitionPredicate = PredicateBuilder.and(pushedPredicates.map(_._2): _*) readBuilder.withFilter(pushedPartitionPredicate) } val scan = readBuilder.newScan() scan.listPartitionEntries.asScala.foreach(aggregator.update) localScan = Some( - PaimonLocalScan(aggregator.result(), aggregator.resultSchema(), table, pushed.map(_._1))) + PaimonLocalScan( + aggregator.result(), + aggregator.resultSchema(), + table, + pushedPredicates.map(_._1))) true } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala index 191c0104ee08..8d9e643f9485 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType class PaimonSplitScanBuilder(table: KnownSplitsTable) extends PaimonBaseScanBuilder(table) { override def build(): Scan = { - PaimonSplitScan(table, table.splits(), requiredSchema, pushed.map(_._2)) + PaimonSplitScan(table, table.splits(), requiredSchema, pushedPredicates.map(_._2)) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala index b2a03ea0b43b..1631c6dedc12 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala @@ -42,7 +42,7 @@ class LocalAggregator(table: Table) { } } - def supportAggregation(aggregation: Aggregation): Boolean = { + def pushAggregation(aggregation: Aggregation): Boolean = { if ( !table.isInstanceOf[DataTable] || !table.primaryKeys.isEmpty