Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH-186] Support RangePartitioning #524

Merged
merged 33 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
828ac27
enable sort op on clickhouse
lgbo-ustc Oct 18, 2022
44c48da
add union UT for velox but ignore them
lgbo-ustc Oct 21, 2022
dec2c37
WIP
lgbo-ustc Oct 28, 2022
e39d01a
WIP
lgbo-ustc Nov 1, 2022
b863a03
WIP
lgbo-ustc Nov 3, 2022
80d3e7c
wip
lgbo-ustc Nov 7, 2022
c6679de
WIP
lgbo-ustc Nov 8, 2022
ae4b677
update comment
lgbo-ustc Nov 8, 2022
b8fd861
run spotless
lgbo-ustc Nov 10, 2022
6cedecf
add private scope
lgbo-ustc Nov 10, 2022
e37fe9f
rebase
lgbo-ustc Nov 12, 2022
d213b21
updated
lgbo-ustc Nov 14, 2022
ce2abbc
rebase
lgbo-ustc Nov 14, 2022
1246f0f
udpate
lgbo-ustc Nov 14, 2022
e82981b
update
lgbo-ustc Nov 14, 2022
e874f8c
update
lgbo-ustc Nov 14, 2022
98d1b8d
update ut for columnar range partition
lgbo-ustc Nov 15, 2022
62379d2
test
lgbo-ustc Nov 15, 2022
75415e1
disable native range partitioning when have complex expression in ord…
lgbo-ustc Nov 16, 2022
92ff90b
remove unused codes
lgbo-ustc Nov 16, 2022
13e1970
support caculating expressions in range partition
lgbo-ustc Nov 17, 2022
f216f1c
not support complex type in range partition
lgbo-ustc Nov 17, 2022
c5ddbe5
add a switch option for range partitioning
lgbo-ustc Nov 17, 2022
5e5cf0b
update
lgbo-ustc Nov 18, 2022
33cbb2d
rebase & format
lgbo-ustc Nov 18, 2022
653cb37
update
lgbo-ustc Nov 21, 2022
0bc9b88
updated
lgbo-ustc Nov 21, 2022
b303bc1
update ut
lgbo-ustc Nov 21, 2022
81c1f7b
replace play json with jackson
lgbo-ustc Nov 22, 2022
5cdd41e
updated
lgbo-ustc Nov 22, 2022
529f716
update
lgbo-ustc Nov 22, 2022
a095aef
rebase
Nov 22, 2022
b3489a8
enable sort exec
lgbo-ustc Nov 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,22 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ object CHBackendSettings extends BackendSettings {

override def utilizeShuffledHashJoinHint(): Boolean = true
override def excludeScanExecFromCollapsedStage(): Boolean = true
override def supportSortExec(): Boolean = {
GlutenConfig.getSessionConf.enableColumnarSort
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.glutenproject.substrait.expression.SelectionNode
import io.glutenproject.utils.InputPartitionsUtil

import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.connector.read.InputPartition
Expand Down Expand Up @@ -61,7 +62,8 @@ class CHTransformerApi extends ITransformerApi with Logging {
}
})
.exists(_ == false))
case RangePartitioning(_, _) => false
case RangePartitioning(orderings, _) =>
RangePartitionerBoundsGenerator.supportedOrderings(orderings)
case _ => true
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.utils

import io.glutenproject.execution.SortExecTransformer
import io.glutenproject.expression.ExpressionConverter
import io.glutenproject.expression.ExpressionTransformer
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.expression.ExpressionNode
import io.glutenproject.substrait.extensions.ExtensionBuilder
import io.glutenproject.substrait.plan.PlanBuilder
import io.glutenproject.substrait.plan.PlanNode
import io.glutenproject.substrait.rel.{RelBuilder, RelNode}
import io.glutenproject.vectorized.{BlockNativeConverter, BlockSplitIterator, CHNativeBlock, CloseablePartitionedBlockIterator, NativePartitioning}

import org.apache.spark.{Partitioner, RangePartitioner, ShuffleDependency}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BoundReference, NamedExpression, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.types._

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode

import java.util
import java.util.Base64

import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.control.Breaks.{break, breakable}
import scala.util.hashing.byteswap32

/**
* In spark RangePartitioner, the rangeBounds is private, so we make a copied-implementation here.
* It is based on the fact that, there has been a pre-projection before the range partition and
* remove all function expressions in the sort ordering expressions.
*/
class RangePartitionerBoundsGenerator[K: Ordering: ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
ordering: Seq[SortOrder],
inputAttributes: Seq[Attribute],
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20
) {

def getRangeBounds(): Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach {
case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}

/*
return json structure
{
"projection_plan":"xxxx",
"ordering":[
{
"column_ref":0,
"data_type":"xxx",
"is_nullable":true,
"direction":0
},
...
],
"range_bounds":[
{
"is_null":false,
"value": ...
},
{
"is_null":true
},
...
]
}
*/
private def getExpressionFieldReference(
context: SubstraitContext,
ordering: SortOrder,
attributes: Seq[Attribute]): Int = {
val funcs = context.registeredFunction
val colExpr =
ExpressionConverter.replaceWithExpressionTransformer(ordering.child, attributes)
val projExprNode = colExpr.asInstanceOf[ExpressionTransformer].doTransform(funcs)
val pb = projExprNode.toProtobuf
if (!pb.hasSelection()) {
throw new IllegalArgumentException(s"A sorting field should be an attribute")
} else {
pb.getSelection().getDirectReference().getStructField.getField()
}
}

private def buildProjectionPlan(
context: SubstraitContext,
sortExpressions: Seq[NamedExpression]): PlanNode = {
val args = context.registeredFunction
val columnarProjExprs = sortExpressions.map(
expr => {
ExpressionConverter
.replaceWithExpressionTransformer(expr, attributeSeq = inputAttributes)
})
val projExprNodeList = new java.util.ArrayList[ExpressionNode]()
for (expr <- columnarProjExprs) {
projExprNodeList.add(expr.asInstanceOf[ExpressionTransformer].doTransform(args))
}
val projectRel = RelBuilder.makeProjectRel(null, projExprNodeList, context, 0)
val outNames = new util.ArrayList[String]
val relNodes = new util.ArrayList[RelNode]()
relNodes.add(projectRel)
PlanBuilder.makePlan(context, relNodes, outNames)
}

private def buildProjectionAttributesByOrderings(
sortOrders: Seq[SortOrder]): (Seq[NamedExpression], Seq[SortOrder]) = {
val projectionAttrs = new util.ArrayList[NamedExpression]()
val newSortOrders = new util.ArrayList[SortOrder]()
var aliasNo = 0
sortOrders.foreach(
order => {
if (!order.child.isInstanceOf[Attribute]) {
val alias = new Alias(order.child, s"sort_col_$aliasNo")()
aliasNo += 1
projectionAttrs.add(alias)
newSortOrders.add(
SortOrder(
alias.toAttribute,
order.direction,
order.nullOrdering,
order.sameOrderExpressions))
} else {
newSortOrders.add(order)
}
})
(projectionAttrs.asScala, newSortOrders.asScala)
}

private def buildOrderingJson(
context: SubstraitContext,
orderings: Seq[SortOrder],
attributes: Seq[Attribute],
jsonMapper: ObjectMapper,
arrayNode: ArrayNode): Unit = {
orderings.foreach {
ordering =>
val node = jsonMapper.createObjectNode()
node.put("column_ref", getExpressionFieldReference(context, ordering, attributes))
node.put("data_type", ordering.dataType.toString)
node.put("is_nullable", ordering.nullable)
node.put(
"direction",
SortExecTransformer.transformSortDirection(
ordering.direction.sql,
ordering.nullOrdering.sql))
arrayNode.add(node)
}
}

private def buildRangeBoundJson(
row: UnsafeRow,
orderings: Seq[SortOrder],
jsonMapper: ObjectMapper): ArrayNode = {
val arrayNode = jsonMapper.createArrayNode()
(0 until row.numFields).foreach {
i =>
if (row.isNullAt(i)) {
val node = jsonMapper.createObjectNode()
node.put("is_null", true)
arrayNode.add(node)
} else {
val ordering = orderings(i)
val node = jsonMapper.createObjectNode()
node.put("is_null", false)
ordering.dataType match {
case _: BooleanType => node.put("value", row.getBoolean(i))
case _: ByteType => node.put("value", row.getByte(i))
case _: ShortType => node.put("value", row.getShort(i))
case _: IntegerType => node.put("value", row.getInt(i))
case _: LongType => node.put("value", row.getLong(i))
case _: FloatType => node.put("value", row.getFloat(i))
case _: DoubleType => node.put("value", row.getDouble(i))
case _: StringType => node.put("value", row.getString(i))
case _: DateType => node.put("value", row.getShort(i))
case d =>
throw new IllegalArgumentException(
s"Unsupported data type ${ordering.dataType.toString}")
}
arrayNode.add(node)
}
}
arrayNode
}

private def buildRangeBoundsJson(jsonMapper: ObjectMapper, arrayNode: ArrayNode): Unit = {
val bounds = getRangeBounds()
bounds.foreach {
bound =>
val row = bound.asInstanceOf[UnsafeRow]
arrayNode.add(buildRangeBoundJson(row, ordering, jsonMapper))
}
}

// Make a json structure that can be passed to native engine
def getRangeBoundsJsonString(): String = {
val context = new SubstraitContext()
val (sortExpressions, newOrderings) = buildProjectionAttributesByOrderings(ordering)
val totalAttributes = new util.ArrayList[Attribute]()
inputAttributes.foreach(attr => totalAttributes.add(attr))
sortExpressions.foreach(expr => totalAttributes.add(expr.toAttribute))
val mapper = new ObjectMapper
val rootNode = mapper.createObjectNode
val orderingArray = rootNode.putArray("ordering")
buildOrderingJson(context, newOrderings, totalAttributes.asScala, mapper, orderingArray)
val boundArray = rootNode.putArray("range_bounds")
buildRangeBoundsJson(mapper, boundArray);
if (sortExpressions.size != 0) {
// If there is any expressions in orderings, we build a projection plan and pass
// it to backend
val projectPlan = buildProjectionPlan(context, sortExpressions).toProtobuf
val serializeProjectPlan = Base64.getEncoder().encodeToString(projectPlan.toByteArray)
rootNode.put("projection_plan", serializeProjectPlan)
}
mapper.writeValueAsString(rootNode)
}
}

object RangePartitionerBoundsGenerator {
def supportedFieldType(dataType: DataType): Boolean = {
dataType match {
case _: BooleanType => true
case _: ByteType => true
case _: ShortType => true
case _: IntegerType => true
case _: LongType => true
case _: FloatType => true
case _: DoubleType => true
case _: StringType => true
case _: DateType => true
case _ => false
}
}

def supportedOrderings(orderings: Seq[SortOrder]): Boolean = {
var enableRangePartitioning = true
// TODO. support complex data type in orderings
breakable {
for (ordering <- orderings) {
if (!RangePartitionerBoundsGenerator.supportedFieldType(ordering.dataType)) {
enableRangePartitioning = false
break
}
}
}
enableRangePartitioning
}
}
Loading