Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 8, 2022
1 parent 1ab40fa commit 3aafdc7
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,6 @@ class RangePartitionerBoundsGenerator [K : Ordering : ClassTag, V]

/*
return json structure
{
"ordering":[
{
"expression":"...",
"data_type":"...",
"direction":0
},
...
],
"range_bounds":[
[...],
[...],
...
]
}
{
"ordering":[
{
Expand Down Expand Up @@ -135,8 +120,6 @@ class RangePartitionerBoundsGenerator [K : Ordering : ClassTag, V]
}
}
*/
// we need resolve the column reference, so we utilize the
// io.substrait.proto.Expression here.
def getExpressionFiedlReference(ordering: SortOrder): Int = {
val substraitCtx = new SubstraitContext()
val funcs = substraitCtx.registeredFunction
Expand All @@ -145,7 +128,6 @@ class RangePartitionerBoundsGenerator [K : Ordering : ClassTag, V]
val projExprNode = colExpr.asInstanceOf[ExpressionTransformer].doTransform(funcs)
val pb = projExprNode.toProtobuf
if (!pb.hasSelection()) {
// scalastyle:off println
throw new IllegalArgumentException(s"A sorting field should be an attribute")
}
pb.getSelection().getDirectReference().getStructField.getField()
Expand Down Expand Up @@ -178,6 +160,7 @@ class RangePartitionerBoundsGenerator [K : Ordering : ClassTag, V]
case _: FloatType => Json.toJson(row.getFloat(field))
case _: DoubleType => Json.toJson(row.getDouble(field))
case _: StringType => Json.toJson(row.getString(field))
case _: DateType => Json.toJson(row.getShort(field))
case d =>
throw new IllegalArgumentException(s"Unsupported data type ${d.toString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ object CHExecUtil {
numPartitions, rddForSampling, sortingExpressions, outputAttributes,
true, samplePointsPerPartitionHint = 20)
val orderingAndRangeBounds = generator.getRangeBoundsJsonString()
// scalastyle:off println
println(s"xxx orderingAndRangeBounds=${orderingAndRangeBounds}")

new NativePartitioning("range", numPartitions, null, orderingAndRangeBounds.getBytes())
case p =>
throw new IllegalStateException(s"Unknow partition type: ${p.getClass.toString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ object SortExecTransformer {
(Seq[NamedExpression], Seq[SortOrder]) = {
val projectionAttrs = new util.ArrayList[NamedExpression]()
val newSortOrders = new util.ArrayList[SortOrder]()
// projectionAttrs.addAll(inputAttributes.asJava)
var aliasNo = 0
sortOrders.foreach(
order => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
case plan: ProjectExec =>
val columnarChild = replaceWithTransformerPlan(plan.child, isSupportAdaptive)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
logDebug(s"xxxx ${plan.projectList}; ${plan.output}; ${plan.child.output}")
ProjectExecTransformer(plan.projectList, columnarChild)
case plan: FilterExec =>
// Push down the left conditions in Filter into Scan.
Expand Down Expand Up @@ -157,7 +156,6 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
case plan: SortExec =>
val child = replaceWithTransformerPlan(plan.child, isSupportAdaptive)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
// SortExecTransformer(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
transformSupportSortWithProjection(plan, isSupportAdaptive)
case plan: ShuffleExchangeExec =>
val child = replaceWithTransformerPlan(plan.child, isSupportAdaptive)
Expand All @@ -167,12 +165,6 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
if (isSupportAdaptive) {
ColumnarShuffleExchangeAdaptor(plan.outputPartitioning, child)
} else {
plan.outputPartitioning match {
case RangePartitioning(ordering, n) =>
logDebug(s"is a range partitioning.${ordering} ${n}")
case _ =>
logDebug(s"not a range partitionoing")
}
CoalesceBatchesExec(ColumnarShuffleExchangeExec(plan.outputPartitioning, child))
}
} else {
Expand Down Expand Up @@ -258,7 +250,6 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
}

def transformSupportSortWithProjection(plan: SortExec, isSupportAdaptive: Boolean) : SparkPlan = {
logDebug(s"xxxx sort input atrtributes:${plan.child.output}")
val needProjection = SortExecTransformer.needProjection(plan.sortOrder)
if (!needProjection) {
val newChild = replaceWithTransformerPlan(plan.child, isSupportAdaptive)
Expand All @@ -284,7 +275,6 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
// range-partition shuffle
val child = ProjectExec(originalInputs ++ projectAttrs, shuffle.child)
TransformHints.tagTransformable(child)
logDebug(s"xxx projectNode1 ${child}; ${child.output}")
// the ordering in RangePartitioning should be the same as in
// the SortExec
val rangePartitioning = RangePartitioning(newOrderings, n)
Expand All @@ -300,11 +290,9 @@ case class TransformPreOverrides() extends Rule[SparkPlan] {
case c =>
replaceWithTransformerPlan(c, isSupportAdaptive)
}
logDebug(s"xxxx update sort child is:${sortChild}")
val newSort = SortExecTransformer(newOrderings, plan.global,
sortChild, plan.testSpillFrequency)
val ret = ProjectExecTransformer(originalInputs, newSort)
logDebug(s"final sort is : ${ret}")
ret
}
}
Expand Down

0 comments on commit 3aafdc7

Please sign in to comment.