Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Sep 26, 2024
1 parent 8cd85b2 commit ac5c755
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ abstract class PaimonBaseScanBuilder(table: Table)

protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())

protected var pushed: Array[(Filter, Predicate)] = Array.empty
protected var pushedPredicates: Array[(Filter, Predicate)] = Array.empty

protected var partitionFilter: Array[Filter] = Array.empty

protected var pushDownLimit: Option[Int] = None

override def build(): Scan = {
PaimonScan(table, requiredSchema, pushed.map(_._2), partitionFilter, pushDownLimit)
PaimonScan(table, requiredSchema, pushedPredicates.map(_._2), partitionFilter, pushDownLimit)
}

/**
Expand Down Expand Up @@ -74,7 +74,7 @@ abstract class PaimonBaseScanBuilder(table: Table)
}

if (pushable.nonEmpty) {
this.pushed = pushable.toArray
this.pushedPredicates = pushable.toArray
}
if (partitionFilter.nonEmpty) {
this.partitionFilter = partitionFilter.toArray
Expand All @@ -83,7 +83,7 @@ abstract class PaimonBaseScanBuilder(table: Table)
}

override def pushedFilters(): Array[Filter] = {
pushed.map(_._1)
pushedPredicates.map(_._1)
}

override def pruneColumns(requiredSchema: StructType): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,28 @@ class PaimonScanBuilder(table: Table)
}

// Only support with push down partition filter
if (pushed.length != partitionFilter.length) {
if (pushedPredicates.length != partitionFilter.length) {
return false
}

val aggregator = new LocalAggregator(table)
if (!aggregator.supportAggregation(aggregation)) {
if (!aggregator.pushAggregation(aggregation)) {
return false
}

val readBuilder = table.newReadBuilder
if (pushed.nonEmpty) {
val pushedPartitionPredicate = PredicateBuilder.and(pushed.map(_._2): _*)
if (pushedPredicates.nonEmpty) {
val pushedPartitionPredicate = PredicateBuilder.and(pushedPredicates.map(_._2): _*)
readBuilder.withFilter(pushedPartitionPredicate)
}
val scan = readBuilder.newScan()
scan.listPartitionEntries.asScala.foreach(aggregator.update)
localScan = Some(
PaimonLocalScan(aggregator.result(), aggregator.resultSchema(), table, pushed.map(_._1)))
PaimonLocalScan(
aggregator.result(),
aggregator.resultSchema(),
table,
pushedPredicates.map(_._1)))
true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType

class PaimonSplitScanBuilder(table: KnownSplitsTable) extends PaimonBaseScanBuilder(table) {
override def build(): Scan = {
PaimonSplitScan(table, table.splits(), requiredSchema, pushed.map(_._2))
PaimonSplitScan(table, table.splits(), requiredSchema, pushedPredicates.map(_._2))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class LocalAggregator(table: Table) {
}
}

def supportAggregation(aggregation: Aggregation): Boolean = {
def pushAggregation(aggregation: Aggregation): Boolean = {
if (
!table.isInstanceOf[DataTable] ||
!table.primaryKeys.isEmpty
Expand Down

0 comments on commit ac5c755

Please sign in to comment.