diff --git a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index 2d9934166..eb6a153c0 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -160,11 +160,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.isSkewJoin) case plan: WindowExec => if (!enableColumnarWindow) return false - new ColumnarWindowExec( + val window = ColumnarWindowExec.create( plan.windowExpression, plan.partitionSpec, plan.orderSpec, plan.child) + window case p => p } diff --git a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index afa4d995b..8272ba667 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -67,13 +67,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { new ColumnarBatchScanExec(plan.output, plan.scan) } case plan: ProjectExec => - val columnarPlan = replaceWithColumnarPlan(plan.child) + val columnarChild = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - if (columnarPlan.isInstanceOf[ColumnarConditionProjectExec]) { - val cur_plan = columnarPlan.asInstanceOf[ColumnarConditionProjectExec] - ColumnarConditionProjectExec(cur_plan.condition, plan.projectList, cur_plan.child) - } else { - ColumnarConditionProjectExec(null, plan.projectList, columnarPlan) + columnarChild match { + case ch: ColumnarConditionProjectExec => + if (ch.projectList == null) { + ColumnarConditionProjectExec(ch.condition, plan.projectList, ch.child) + } else { + ColumnarConditionProjectExec(null, plan.projectList, columnarChild) + } + case _ => + ColumnarConditionProjectExec(null, plan.projectList, columnarChild) } case plan: FilterExec => val child = replaceWithColumnarPlan(plan.child) @@ -234,11 +238,12 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") try { - return new ColumnarWindowExec( + val window = ColumnarWindowExec.create( plan.windowExpression, plan.partitionSpec, plan.orderSpec, coalesceBatchRemoved) + return window } catch { case _: Throwable => logInfo("Columnar Window: Falling back to regular Window...") diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 4ea693dca..e59a732dd 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -27,20 +27,21 @@ import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, NamedExpression, Rank, SortOrder, WindowExpression, WindowFunction} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Sum} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, MakeDecimal, NamedExpression, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, Sum} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, DoubleType, LongType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import scala.util.Random class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -248,3 +249,106 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def isComplex: Boolean = false } } + +object ColumnarWindowExec { + + def createWithProjection( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): SparkPlan = { + + def makeInputProject(ex: Expression, inputProjects: ListBuffer[NamedExpression]): Expression = { + ex match { + case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case func @ (_: AggregateFunction | _: WindowFunction) => + val params = func.children + // rewrite + val rewritten = func match { + case _: Average => + // rewrite params for AVG + params.map { + param => + param.dataType match { + case _: LongType | _: DecimalType => + Cast(param, DoubleType) + case _ => param + } + } + case _ => params + } + + // alias + func.withNewChildren(rewritten.map { + case param @ (_: Cast | _: UnscaledValue) => + val aliasName = "__alias_%d__".format(Random.nextLong()) + val alias = Alias(param, aliasName)() + inputProjects.append(alias) + alias.toAttribute + case other => other + }) + case other => other + } + } + + def sameType(from: DataType, to: DataType): Boolean = { + if (from == null || to == null) { + throw new IllegalArgumentException("null type found during type enforcement") + } + if (from == to) { + return true + } + DataType.equalsStructurally(from, to) + } + + def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = { + val out = ex match { + case we: WindowExpression => + val aliasName = "__alias_%d__".format(Random.nextLong()) + val alias = Alias(makeInputProject(we, inputProjects), aliasName)() + windows.append(alias) + alias.toAttribute + case _ => + ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects))) + } + // forcibly cast to original type against possible rewriting + val casted = try { + if (sameType(out.dataType, ex.dataType)) { + out + } else { + Cast(out, ex.dataType) + } + } catch { + case t: Throwable => + System.err.println("Warning: " + t.getMessage) + Cast(out, ex.dataType) + } + casted + } + + val windows = ListBuffer[NamedExpression]() + val inProjectExpressions = ListBuffer[NamedExpression]() + val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) + .map { a => + a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) + .asInstanceOf[NamedExpression] + } + + val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child) + + val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + + val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window) + + outputProject + } + + def create( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): SparkPlan = { + createWithProjection(windowExpression, partitionSpec, orderSpec, child) + } +} diff --git a/core/src/test/resources/tpcds-queries-double/q1.sql b/core/src/test/resources/tpcds-queries/q1.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q1.sql rename to core/src/test/resources/tpcds-queries/q1.sql diff --git a/core/src/test/resources/tpcds-queries-double/q10.sql b/core/src/test/resources/tpcds-queries/q10.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q10.sql rename to core/src/test/resources/tpcds-queries/q10.sql diff --git a/core/src/test/resources/tpcds-queries-double/q11.sql b/core/src/test/resources/tpcds-queries/q11.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q11.sql rename to core/src/test/resources/tpcds-queries/q11.sql diff --git a/core/src/test/resources/tpcds-queries-double/q12.sql b/core/src/test/resources/tpcds-queries/q12.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q12.sql rename to core/src/test/resources/tpcds-queries/q12.sql diff --git a/core/src/test/resources/tpcds-queries-double/q13.sql b/core/src/test/resources/tpcds-queries/q13.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q13.sql rename to core/src/test/resources/tpcds-queries/q13.sql diff --git a/core/src/test/resources/tpcds-queries-double/q14a.sql b/core/src/test/resources/tpcds-queries/q14a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q14a.sql rename to core/src/test/resources/tpcds-queries/q14a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q14b.sql b/core/src/test/resources/tpcds-queries/q14b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q14b.sql rename to core/src/test/resources/tpcds-queries/q14b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q15.sql b/core/src/test/resources/tpcds-queries/q15.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q15.sql rename to core/src/test/resources/tpcds-queries/q15.sql diff --git a/core/src/test/resources/tpcds-queries-double/q16.sql b/core/src/test/resources/tpcds-queries/q16.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q16.sql rename to core/src/test/resources/tpcds-queries/q16.sql diff --git a/core/src/test/resources/tpcds-queries-double/q17.sql b/core/src/test/resources/tpcds-queries/q17.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q17.sql rename to core/src/test/resources/tpcds-queries/q17.sql diff --git a/core/src/test/resources/tpcds-queries-double/q18.sql b/core/src/test/resources/tpcds-queries/q18.sql similarity index 66% rename from core/src/test/resources/tpcds-queries-double/q18.sql rename to core/src/test/resources/tpcds-queries/q18.sql index 567d4da66..4055c80fd 100755 --- a/core/src/test/resources/tpcds-queries-double/q18.sql +++ b/core/src/test/resources/tpcds-queries/q18.sql @@ -3,13 +3,13 @@ SELECT ca_country, ca_state, ca_county, - avg(cast(cs_quantity AS DOUBLE)) agg1, - avg(cast(cs_list_price AS DOUBLE)) agg2, - avg(cast(cs_coupon_amt AS DOUBLE)) agg3, - avg(cast(cs_sales_price AS DOUBLE)) agg4, - avg(cast(cs_net_profit AS DOUBLE)) agg5, - avg(cast(c_birth_year AS DOUBLE)) agg6, - avg(cast(cd1.cd_dep_count AS DOUBLE)) agg7 + avg(cast(cs_quantity AS DECIMAL(12, 2))) agg1, + avg(cast(cs_list_price AS DECIMAL(12, 2))) agg2, + avg(cast(cs_coupon_amt AS DECIMAL(12, 2))) agg3, + avg(cast(cs_sales_price AS DECIMAL(12, 2))) agg4, + avg(cast(cs_net_profit AS DECIMAL(12, 2))) agg5, + avg(cast(c_birth_year AS DECIMAL(12, 2))) agg6, + avg(cast(cd1.cd_dep_count AS DECIMAL(12, 2))) agg7 FROM catalog_sales, customer_demographics cd1, customer_demographics cd2, customer, customer_address, date_dim, item WHERE cs_sold_date_sk = d_date_sk AND diff --git a/core/src/test/resources/tpcds-queries-double/q19.sql b/core/src/test/resources/tpcds-queries/q19.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q19.sql rename to core/src/test/resources/tpcds-queries/q19.sql diff --git a/core/src/test/resources/tpcds-queries-double/q2.sql b/core/src/test/resources/tpcds-queries/q2.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q2.sql rename to core/src/test/resources/tpcds-queries/q2.sql diff --git a/core/src/test/resources/tpcds-queries-double/q20.sql b/core/src/test/resources/tpcds-queries/q20.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q20.sql rename to core/src/test/resources/tpcds-queries/q20.sql diff --git a/core/src/test/resources/tpcds-queries-double/q21.sql b/core/src/test/resources/tpcds-queries/q21.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q21.sql rename to core/src/test/resources/tpcds-queries/q21.sql diff --git a/core/src/test/resources/tpcds-queries-double/q22.sql b/core/src/test/resources/tpcds-queries/q22.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q22.sql rename to core/src/test/resources/tpcds-queries/q22.sql diff --git a/core/src/test/resources/tpcds-queries-double/q23a.sql b/core/src/test/resources/tpcds-queries/q23a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q23a.sql rename to core/src/test/resources/tpcds-queries/q23a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q23b.sql b/core/src/test/resources/tpcds-queries/q23b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q23b.sql rename to core/src/test/resources/tpcds-queries/q23b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q24a.sql b/core/src/test/resources/tpcds-queries/q24a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q24a.sql rename to core/src/test/resources/tpcds-queries/q24a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q24b.sql b/core/src/test/resources/tpcds-queries/q24b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q24b.sql rename to core/src/test/resources/tpcds-queries/q24b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q25.sql b/core/src/test/resources/tpcds-queries/q25.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q25.sql rename to core/src/test/resources/tpcds-queries/q25.sql diff --git a/core/src/test/resources/tpcds-queries-double/q26.sql b/core/src/test/resources/tpcds-queries/q26.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q26.sql rename to core/src/test/resources/tpcds-queries/q26.sql diff --git a/core/src/test/resources/tpcds-queries-double/q27.sql b/core/src/test/resources/tpcds-queries/q27.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q27.sql rename to core/src/test/resources/tpcds-queries/q27.sql diff --git a/core/src/test/resources/tpcds-queries-double/q28.sql b/core/src/test/resources/tpcds-queries/q28.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q28.sql rename to core/src/test/resources/tpcds-queries/q28.sql diff --git a/core/src/test/resources/tpcds-queries-double/q29.sql b/core/src/test/resources/tpcds-queries/q29.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q29.sql rename to core/src/test/resources/tpcds-queries/q29.sql diff --git a/core/src/test/resources/tpcds-queries-double/q3.sql b/core/src/test/resources/tpcds-queries/q3.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q3.sql rename to core/src/test/resources/tpcds-queries/q3.sql diff --git a/core/src/test/resources/tpcds-queries-double/q30.sql b/core/src/test/resources/tpcds-queries/q30.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q30.sql rename to core/src/test/resources/tpcds-queries/q30.sql diff --git a/core/src/test/resources/tpcds-queries-double/q31.sql b/core/src/test/resources/tpcds-queries/q31.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q31.sql rename to core/src/test/resources/tpcds-queries/q31.sql diff --git a/core/src/test/resources/tpcds-queries-double/q32.sql b/core/src/test/resources/tpcds-queries/q32.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q32.sql rename to core/src/test/resources/tpcds-queries/q32.sql diff --git a/core/src/test/resources/tpcds-queries-double/q33.sql b/core/src/test/resources/tpcds-queries/q33.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q33.sql rename to core/src/test/resources/tpcds-queries/q33.sql diff --git a/core/src/test/resources/tpcds-queries-double/q34.sql b/core/src/test/resources/tpcds-queries/q34.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q34.sql rename to core/src/test/resources/tpcds-queries/q34.sql diff --git a/core/src/test/resources/tpcds-queries-double/q35.sql b/core/src/test/resources/tpcds-queries/q35.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q35.sql rename to core/src/test/resources/tpcds-queries/q35.sql diff --git a/core/src/test/resources/tpcds-queries-double/q36.sql b/core/src/test/resources/tpcds-queries/q36.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q36.sql rename to core/src/test/resources/tpcds-queries/q36.sql diff --git a/core/src/test/resources/tpcds-queries-double/q37.sql b/core/src/test/resources/tpcds-queries/q37.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q37.sql rename to core/src/test/resources/tpcds-queries/q37.sql diff --git a/core/src/test/resources/tpcds-queries-double/q38.sql b/core/src/test/resources/tpcds-queries/q38.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q38.sql rename to core/src/test/resources/tpcds-queries/q38.sql diff --git a/core/src/test/resources/tpcds-queries-double/q39a.sql b/core/src/test/resources/tpcds-queries/q39a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q39a.sql rename to core/src/test/resources/tpcds-queries/q39a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q39b.sql b/core/src/test/resources/tpcds-queries/q39b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q39b.sql rename to core/src/test/resources/tpcds-queries/q39b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q4.sql b/core/src/test/resources/tpcds-queries/q4.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q4.sql rename to core/src/test/resources/tpcds-queries/q4.sql diff --git a/core/src/test/resources/tpcds-queries-double/q40.sql b/core/src/test/resources/tpcds-queries/q40.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q40.sql rename to core/src/test/resources/tpcds-queries/q40.sql diff --git a/core/src/test/resources/tpcds-queries-double/q41.sql b/core/src/test/resources/tpcds-queries/q41.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q41.sql rename to core/src/test/resources/tpcds-queries/q41.sql diff --git a/core/src/test/resources/tpcds-queries-double/q42.sql b/core/src/test/resources/tpcds-queries/q42.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q42.sql rename to core/src/test/resources/tpcds-queries/q42.sql diff --git a/core/src/test/resources/tpcds-queries-double/q43.sql b/core/src/test/resources/tpcds-queries/q43.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q43.sql rename to core/src/test/resources/tpcds-queries/q43.sql diff --git a/core/src/test/resources/tpcds-queries-double/q44.sql b/core/src/test/resources/tpcds-queries/q44.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q44.sql rename to core/src/test/resources/tpcds-queries/q44.sql diff --git a/core/src/test/resources/tpcds-queries-double/q45.sql b/core/src/test/resources/tpcds-queries/q45.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q45.sql rename to core/src/test/resources/tpcds-queries/q45.sql diff --git a/core/src/test/resources/tpcds-queries-double/q46.sql b/core/src/test/resources/tpcds-queries/q46.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q46.sql rename to core/src/test/resources/tpcds-queries/q46.sql diff --git a/core/src/test/resources/tpcds-queries-double/q47.sql b/core/src/test/resources/tpcds-queries/q47.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q47.sql rename to core/src/test/resources/tpcds-queries/q47.sql diff --git a/core/src/test/resources/tpcds-queries-double/q48.sql b/core/src/test/resources/tpcds-queries/q48.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q48.sql rename to core/src/test/resources/tpcds-queries/q48.sql diff --git a/core/src/test/resources/tpcds-queries-double/q49.sql b/core/src/test/resources/tpcds-queries/q49.sql similarity index 75% rename from core/src/test/resources/tpcds-queries-double/q49.sql rename to core/src/test/resources/tpcds-queries/q49.sql index 2f7f497cf..9568d8b92 100755 --- a/core/src/test/resources/tpcds-queries-double/q49.sql +++ b/core/src/test/resources/tpcds-queries/q49.sql @@ -18,10 +18,10 @@ FROM ( FROM (SELECT ws.ws_item_sk AS item, - (cast(sum(coalesce(wr.wr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(ws.ws_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(wr.wr_return_amt, 0)) AS DOUBLE) / - cast(sum(coalesce(ws.ws_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(wr.wr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(ws.ws_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(wr.wr_return_amt, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(ws.ws_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM web_sales ws LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number AND @@ -60,10 +60,10 @@ FROM ( FROM (SELECT cs.cs_item_sk AS item, - (cast(sum(coalesce(cr.cr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(cs.cs_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(cr.cr_return_amount, 0)) AS DOUBLE) / - cast(sum(coalesce(cs.cs_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(cr.cr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(cs.cs_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(cr.cr_return_amount, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(cs.cs_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM catalog_sales cs LEFT OUTER JOIN catalog_returns cr ON (cs.cs_order_number = cr.cr_order_number AND @@ -102,10 +102,10 @@ FROM ( FROM (SELECT sts.ss_item_sk AS item, - (cast(sum(coalesce(sr.sr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(sts.ss_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(sr.sr_return_amt, 0)) AS DOUBLE) / - cast(sum(coalesce(sts.ss_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(sr.sr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(sts.ss_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(sr.sr_return_amt, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(sts.ss_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM store_sales sts LEFT OUTER JOIN store_returns sr ON (sts.ss_ticket_number = sr.sr_ticket_number AND sts.ss_item_sk = sr.sr_item_sk) diff --git a/core/src/test/resources/tpcds-queries-double/q5.sql b/core/src/test/resources/tpcds-queries/q5.sql similarity index 85% rename from core/src/test/resources/tpcds-queries-double/q5.sql rename to core/src/test/resources/tpcds-queries/q5.sql index e242d008e..b87cf3a44 100755 --- a/core/src/test/resources/tpcds-queries-double/q5.sql +++ b/core/src/test/resources/tpcds-queries/q5.sql @@ -11,15 +11,15 @@ WITH ssr AS ss_sold_date_sk AS date_sk, ss_ext_sales_price AS sales_price, ss_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM store_sales UNION ALL SELECT sr_store_sk AS store_sk, sr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, sr_return_amt AS return_amt, sr_net_loss AS net_loss FROM store_returns) @@ -42,15 +42,15 @@ WITH ssr AS cs_sold_date_sk AS date_sk, cs_ext_sales_price AS sales_price, cs_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM catalog_sales UNION ALL SELECT cr_catalog_page_sk AS page_sk, cr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, cr_return_amount AS return_amt, cr_net_loss AS net_loss FROM catalog_returns @@ -74,15 +74,15 @@ WITH ssr AS ws_sold_date_sk AS date_sk, ws_ext_sales_price AS sales_price, ws_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM web_sales UNION ALL SELECT ws_web_site_sk AS wsr_web_site_sk, wr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, wr_return_amt AS return_amt, wr_net_loss AS net_loss FROM web_returns diff --git a/core/src/test/resources/tpcds-queries-double/q50.sql b/core/src/test/resources/tpcds-queries/q50.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q50.sql rename to core/src/test/resources/tpcds-queries/q50.sql diff --git a/core/src/test/resources/tpcds-queries-double/q51.sql b/core/src/test/resources/tpcds-queries/q51.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q51.sql rename to core/src/test/resources/tpcds-queries/q51.sql diff --git a/core/src/test/resources/tpcds-queries-double/q52.sql b/core/src/test/resources/tpcds-queries/q52.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q52.sql rename to core/src/test/resources/tpcds-queries/q52.sql diff --git a/core/src/test/resources/tpcds-queries-double/q53.sql b/core/src/test/resources/tpcds-queries/q53.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q53.sql rename to core/src/test/resources/tpcds-queries/q53.sql diff --git a/core/src/test/resources/tpcds-queries-double/q54.sql b/core/src/test/resources/tpcds-queries/q54.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q54.sql rename to core/src/test/resources/tpcds-queries/q54.sql diff --git a/core/src/test/resources/tpcds-queries-double/q55.sql b/core/src/test/resources/tpcds-queries/q55.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q55.sql rename to core/src/test/resources/tpcds-queries/q55.sql diff --git a/core/src/test/resources/tpcds-queries-double/q56.sql b/core/src/test/resources/tpcds-queries/q56.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q56.sql rename to core/src/test/resources/tpcds-queries/q56.sql diff --git a/core/src/test/resources/tpcds-queries-double/q57.sql b/core/src/test/resources/tpcds-queries/q57.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q57.sql rename to core/src/test/resources/tpcds-queries/q57.sql diff --git a/core/src/test/resources/tpcds-queries-double/q58.sql b/core/src/test/resources/tpcds-queries/q58.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q58.sql rename to core/src/test/resources/tpcds-queries/q58.sql diff --git a/core/src/test/resources/tpcds-queries-double/q59.sql b/core/src/test/resources/tpcds-queries/q59.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q59.sql rename to core/src/test/resources/tpcds-queries/q59.sql diff --git a/core/src/test/resources/tpcds-queries-double/q6.sql b/core/src/test/resources/tpcds-queries/q6.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q6.sql rename to core/src/test/resources/tpcds-queries/q6.sql diff --git a/core/src/test/resources/tpcds-queries-double/q60.sql b/core/src/test/resources/tpcds-queries/q60.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q60.sql rename to core/src/test/resources/tpcds-queries/q60.sql diff --git a/core/src/test/resources/tpcds-queries-double/q61.sql b/core/src/test/resources/tpcds-queries/q61.sql similarity index 93% rename from core/src/test/resources/tpcds-queries-double/q61.sql rename to core/src/test/resources/tpcds-queries/q61.sql index 79e5d975c..b0a872b4b 100755 --- a/core/src/test/resources/tpcds-queries-double/q61.sql +++ b/core/src/test/resources/tpcds-queries/q61.sql @@ -1,7 +1,7 @@ SELECT promotions, total, - cast(promotions AS DOUBLE) / cast(total AS DOUBLE) * 100 + cast(promotions AS DECIMAL(15, 4)) / cast(total AS DECIMAL(15, 4)) * 100 FROM (SELECT sum(ss_ext_sales_price) promotions FROM store_sales, store, promotion, date_dim, customer, customer_address, item diff --git a/core/src/test/resources/tpcds-queries-double/q62.sql b/core/src/test/resources/tpcds-queries/q62.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q62.sql rename to core/src/test/resources/tpcds-queries/q62.sql diff --git a/core/src/test/resources/tpcds-queries-double/q63.sql b/core/src/test/resources/tpcds-queries/q63.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q63.sql rename to core/src/test/resources/tpcds-queries/q63.sql diff --git a/core/src/test/resources/tpcds-queries-double/q64.sql b/core/src/test/resources/tpcds-queries/q64.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q64.sql rename to core/src/test/resources/tpcds-queries/q64.sql diff --git a/core/src/test/resources/tpcds-queries-double/q65.sql b/core/src/test/resources/tpcds-queries/q65.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q65.sql rename to core/src/test/resources/tpcds-queries/q65.sql diff --git a/core/src/test/resources/tpcds-queries-double/q66.sql b/core/src/test/resources/tpcds-queries/q66.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q66.sql rename to core/src/test/resources/tpcds-queries/q66.sql diff --git a/core/src/test/resources/tpcds-queries-double/q67.sql b/core/src/test/resources/tpcds-queries/q67.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q67.sql rename to core/src/test/resources/tpcds-queries/q67.sql diff --git a/core/src/test/resources/tpcds-queries-double/q68.sql b/core/src/test/resources/tpcds-queries/q68.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q68.sql rename to core/src/test/resources/tpcds-queries/q68.sql diff --git a/core/src/test/resources/tpcds-queries-double/q69.sql b/core/src/test/resources/tpcds-queries/q69.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q69.sql rename to core/src/test/resources/tpcds-queries/q69.sql diff --git a/core/src/test/resources/tpcds-queries-double/q7.sql b/core/src/test/resources/tpcds-queries/q7.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q7.sql rename to core/src/test/resources/tpcds-queries/q7.sql diff --git a/core/src/test/resources/tpcds-queries-double/q70.sql b/core/src/test/resources/tpcds-queries/q70.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q70.sql rename to core/src/test/resources/tpcds-queries/q70.sql diff --git a/core/src/test/resources/tpcds-queries-double/q71.sql b/core/src/test/resources/tpcds-queries/q71.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q71.sql rename to core/src/test/resources/tpcds-queries/q71.sql diff --git a/core/src/test/resources/tpcds-queries-double/q72.sql b/core/src/test/resources/tpcds-queries/q72.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q72.sql rename to core/src/test/resources/tpcds-queries/q72.sql diff --git a/core/src/test/resources/tpcds-queries-double/q73.sql b/core/src/test/resources/tpcds-queries/q73.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q73.sql rename to core/src/test/resources/tpcds-queries/q73.sql diff --git a/core/src/test/resources/tpcds-queries-double/q74.sql b/core/src/test/resources/tpcds-queries/q74.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q74.sql rename to core/src/test/resources/tpcds-queries/q74.sql diff --git a/core/src/test/resources/tpcds-queries-double/q75.sql b/core/src/test/resources/tpcds-queries/q75.sql similarity index 96% rename from core/src/test/resources/tpcds-queries-double/q75.sql rename to core/src/test/resources/tpcds-queries/q75.sql index 3f7b67926..2a143232b 100755 --- a/core/src/test/resources/tpcds-queries-double/q75.sql +++ b/core/src/test/resources/tpcds-queries/q75.sql @@ -71,6 +71,6 @@ WHERE curr_yr.i_brand_id = prev_yr.i_brand_id AND curr_yr.i_manufact_id = prev_yr.i_manufact_id AND curr_yr.d_year = 2002 AND prev_yr.d_year = 2002 - 1 - AND CAST(curr_yr.sales_cnt AS DOUBLE) / CAST(prev_yr.sales_cnt AS DOUBLE) < 0.9 + AND CAST(curr_yr.sales_cnt AS DECIMAL(17, 2)) / CAST(prev_yr.sales_cnt AS DECIMAL(17, 2)) < 0.9 ORDER BY sales_cnt_diff LIMIT 100 diff --git a/core/src/test/resources/tpcds-queries-double/q76.sql b/core/src/test/resources/tpcds-queries/q76.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q76.sql rename to core/src/test/resources/tpcds-queries/q76.sql diff --git a/core/src/test/resources/tpcds-queries-double/q77.sql b/core/src/test/resources/tpcds-queries/q77.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q77.sql rename to core/src/test/resources/tpcds-queries/q77.sql diff --git a/core/src/test/resources/tpcds-queries-double/q78.sql b/core/src/test/resources/tpcds-queries/q78.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q78.sql rename to core/src/test/resources/tpcds-queries/q78.sql diff --git a/core/src/test/resources/tpcds-queries-double/q79.sql b/core/src/test/resources/tpcds-queries/q79.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q79.sql rename to core/src/test/resources/tpcds-queries/q79.sql diff --git a/core/src/test/resources/tpcds-queries-double/q8.sql b/core/src/test/resources/tpcds-queries/q8.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q8.sql rename to core/src/test/resources/tpcds-queries/q8.sql diff --git a/core/src/test/resources/tpcds-queries-double/q80.sql b/core/src/test/resources/tpcds-queries/q80.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q80.sql rename to core/src/test/resources/tpcds-queries/q80.sql diff --git a/core/src/test/resources/tpcds-queries-double/q81.sql b/core/src/test/resources/tpcds-queries/q81.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q81.sql rename to core/src/test/resources/tpcds-queries/q81.sql diff --git a/core/src/test/resources/tpcds-queries-double/q82.sql b/core/src/test/resources/tpcds-queries/q82.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q82.sql rename to core/src/test/resources/tpcds-queries/q82.sql diff --git a/core/src/test/resources/tpcds-queries-double/q83.sql b/core/src/test/resources/tpcds-queries/q83.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q83.sql rename to core/src/test/resources/tpcds-queries/q83.sql diff --git a/core/src/test/resources/tpcds-queries-double/q84.sql b/core/src/test/resources/tpcds-queries/q84.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q84.sql rename to core/src/test/resources/tpcds-queries/q84.sql diff --git a/core/src/test/resources/tpcds-queries-double/q85.sql b/core/src/test/resources/tpcds-queries/q85.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q85.sql rename to core/src/test/resources/tpcds-queries/q85.sql diff --git a/core/src/test/resources/tpcds-queries-double/q86.sql b/core/src/test/resources/tpcds-queries/q86.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q86.sql rename to core/src/test/resources/tpcds-queries/q86.sql diff --git a/core/src/test/resources/tpcds-queries-double/q87.sql b/core/src/test/resources/tpcds-queries/q87.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q87.sql rename to core/src/test/resources/tpcds-queries/q87.sql diff --git a/core/src/test/resources/tpcds-queries-double/q88.sql b/core/src/test/resources/tpcds-queries/q88.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q88.sql rename to core/src/test/resources/tpcds-queries/q88.sql diff --git a/core/src/test/resources/tpcds-queries-double/q89.sql b/core/src/test/resources/tpcds-queries/q89.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q89.sql rename to core/src/test/resources/tpcds-queries/q89.sql diff --git a/core/src/test/resources/tpcds-queries-double/q9.sql b/core/src/test/resources/tpcds-queries/q9.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q9.sql rename to core/src/test/resources/tpcds-queries/q9.sql diff --git a/core/src/test/resources/tpcds-queries-double/q90.sql b/core/src/test/resources/tpcds-queries/q90.sql similarity index 91% rename from core/src/test/resources/tpcds-queries-double/q90.sql rename to core/src/test/resources/tpcds-queries/q90.sql index 2ecf7d571..85e35bf8b 100755 --- a/core/src/test/resources/tpcds-queries-double/q90.sql +++ b/core/src/test/resources/tpcds-queries/q90.sql @@ -1,4 +1,4 @@ -SELECT cast(amc AS DOUBLE) / cast(pmc AS DOUBLE) am_pm_ratio +SELECT cast(amc AS DECIMAL(15, 4)) / cast(pmc AS DECIMAL(15, 4)) am_pm_ratio FROM (SELECT count(*) amc FROM web_sales, household_demographics, time_dim, web_page WHERE ws_sold_time_sk = time_dim.t_time_sk diff --git a/core/src/test/resources/tpcds-queries-double/q91.sql b/core/src/test/resources/tpcds-queries/q91.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q91.sql rename to core/src/test/resources/tpcds-queries/q91.sql diff --git a/core/src/test/resources/tpcds-queries-double/q92.sql b/core/src/test/resources/tpcds-queries/q92.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q92.sql rename to core/src/test/resources/tpcds-queries/q92.sql diff --git a/core/src/test/resources/tpcds-queries-double/q93.sql b/core/src/test/resources/tpcds-queries/q93.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q93.sql rename to core/src/test/resources/tpcds-queries/q93.sql diff --git a/core/src/test/resources/tpcds-queries-double/q94.sql b/core/src/test/resources/tpcds-queries/q94.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q94.sql rename to core/src/test/resources/tpcds-queries/q94.sql diff --git a/core/src/test/resources/tpcds-queries-double/q95.sql b/core/src/test/resources/tpcds-queries/q95.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q95.sql rename to core/src/test/resources/tpcds-queries/q95.sql diff --git a/core/src/test/resources/tpcds-queries-double/q96.sql b/core/src/test/resources/tpcds-queries/q96.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q96.sql rename to core/src/test/resources/tpcds-queries/q96.sql diff --git a/core/src/test/resources/tpcds-queries-double/q97.sql b/core/src/test/resources/tpcds-queries/q97.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q97.sql rename to core/src/test/resources/tpcds-queries/q97.sql diff --git a/core/src/test/resources/tpcds-queries-double/q98.sql b/core/src/test/resources/tpcds-queries/q98.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q98.sql rename to core/src/test/resources/tpcds-queries/q98.sql diff --git a/core/src/test/resources/tpcds-queries-double/q99.sql b/core/src/test/resources/tpcds-queries/q99.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q99.sql rename to core/src/test/resources/tpcds-queries/q99.sql diff --git a/core/src/test/resources/tpch-queries-double/q1.sql b/core/src/test/resources/tpch-queries/q1.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q1.sql rename to core/src/test/resources/tpch-queries/q1.sql diff --git a/core/src/test/resources/tpch-queries-double/q10.sql b/core/src/test/resources/tpch-queries/q10.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q10.sql rename to core/src/test/resources/tpch-queries/q10.sql diff --git a/core/src/test/resources/tpch-queries-double/q11.sql b/core/src/test/resources/tpch-queries/q11.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q11.sql rename to core/src/test/resources/tpch-queries/q11.sql diff --git a/core/src/test/resources/tpch-queries-double/q12.sql b/core/src/test/resources/tpch-queries/q12.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q12.sql rename to core/src/test/resources/tpch-queries/q12.sql diff --git a/core/src/test/resources/tpch-queries-double/q13.sql b/core/src/test/resources/tpch-queries/q13.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q13.sql rename to core/src/test/resources/tpch-queries/q13.sql diff --git a/core/src/test/resources/tpch-queries-double/q14.sql b/core/src/test/resources/tpch-queries/q14.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q14.sql rename to core/src/test/resources/tpch-queries/q14.sql diff --git a/core/src/test/resources/tpch-queries-double/q15.sql b/core/src/test/resources/tpch-queries/q15.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q15.sql rename to core/src/test/resources/tpch-queries/q15.sql diff --git a/core/src/test/resources/tpch-queries-double/q16.sql b/core/src/test/resources/tpch-queries/q16.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q16.sql rename to core/src/test/resources/tpch-queries/q16.sql diff --git a/core/src/test/resources/tpch-queries-double/q17.sql b/core/src/test/resources/tpch-queries/q17.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q17.sql rename to core/src/test/resources/tpch-queries/q17.sql diff --git a/core/src/test/resources/tpch-queries-double/q18.sql b/core/src/test/resources/tpch-queries/q18.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q18.sql rename to core/src/test/resources/tpch-queries/q18.sql diff --git a/core/src/test/resources/tpch-queries-double/q19.sql b/core/src/test/resources/tpch-queries/q19.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q19.sql rename to core/src/test/resources/tpch-queries/q19.sql diff --git a/core/src/test/resources/tpch-queries-double/q2.sql b/core/src/test/resources/tpch-queries/q2.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q2.sql rename to core/src/test/resources/tpch-queries/q2.sql diff --git a/core/src/test/resources/tpch-queries-double/q20.sql b/core/src/test/resources/tpch-queries/q20.sql similarity index 92% rename from core/src/test/resources/tpch-queries-double/q20.sql rename to core/src/test/resources/tpch-queries/q20.sql index 472a3b8c5..e161d340b 100644 --- a/core/src/test/resources/tpch-queries-double/q20.sql +++ b/core/src/test/resources/tpch-queries/q20.sql @@ -23,7 +23,7 @@ where ) and ps_availqty > ( select - 0.5 * sum(CAST(l_quantity AS DOUBLE)) + 0.5 * sum(l_quantity) from lineitem where diff --git a/core/src/test/resources/tpch-queries-double/q21.sql b/core/src/test/resources/tpch-queries/q21.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q21.sql rename to core/src/test/resources/tpch-queries/q21.sql diff --git a/core/src/test/resources/tpch-queries-double/q22.sql b/core/src/test/resources/tpch-queries/q22.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q22.sql rename to core/src/test/resources/tpch-queries/q22.sql diff --git a/core/src/test/resources/tpch-queries-double/q3.sql b/core/src/test/resources/tpch-queries/q3.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q3.sql rename to core/src/test/resources/tpch-queries/q3.sql diff --git a/core/src/test/resources/tpch-queries-double/q4.sql b/core/src/test/resources/tpch-queries/q4.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q4.sql rename to core/src/test/resources/tpch-queries/q4.sql diff --git a/core/src/test/resources/tpch-queries-double/q5.sql b/core/src/test/resources/tpch-queries/q5.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q5.sql rename to core/src/test/resources/tpch-queries/q5.sql diff --git a/core/src/test/resources/tpch-queries-double/q6.sql b/core/src/test/resources/tpch-queries/q6.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q6.sql rename to core/src/test/resources/tpch-queries/q6.sql diff --git a/core/src/test/resources/tpch-queries-double/q7.sql b/core/src/test/resources/tpch-queries/q7.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q7.sql rename to core/src/test/resources/tpch-queries/q7.sql diff --git a/core/src/test/resources/tpch-queries-double/q8.sql b/core/src/test/resources/tpch-queries/q8.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q8.sql rename to core/src/test/resources/tpch-queries/q8.sql diff --git a/core/src/test/resources/tpch-queries-double/q9.sql b/core/src/test/resources/tpch-queries/q9.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q9.sql rename to core/src/test/resources/tpch-queries/q9.sql diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index feedd53d6..b9cec3729 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SharedSparkSession class TPCDSSuite extends QueryTest with SharedSparkSession { private val MAX_DIRECT_MEMORY = "6g" - private val TPCDS_QUERIES_RESOURCE = "tpcds-queries-double" + private val TPCDS_QUERIES_RESOURCE = "tpcds-queries" private val TPCDS_WRITE_PATH = "/tmp/tpcds-generated" private var runner: TPCRunner = _ @@ -88,6 +88,34 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { test("window query") { runner.runTPCQuery("q67", 1, true) } + + test("window function with non-decimal input") { + val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_category_id)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input") { + val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input 2") { + val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" + + " OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input 3") { + val df = spark.sql("SELECT i_item_sk, i_class_id, AVG(i_current_price)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } } object TPCDSSuite { diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala index e2bdaa264..0cbd13232 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala @@ -177,21 +177,21 @@ object TPCDSTableGen { StructField("cs_promo_sk", LongType), StructField("cs_order_number", LongType), StructField("cs_quantity", LongType), - StructField("cs_wholesale_cost", DoubleType), - StructField("cs_list_price", DoubleType), - StructField("cs_sales_price", DoubleType), - StructField("cs_ext_discount_amt", DoubleType), - StructField("cs_ext_sales_price", DoubleType), - StructField("cs_ext_wholesale_cost", DoubleType), - StructField("cs_ext_list_price", DoubleType), - StructField("cs_ext_tax", DoubleType), - StructField("cs_coupon_amt", DoubleType), - StructField("cs_ext_ship_cost", DoubleType), - StructField("cs_net_paid", DoubleType), - StructField("cs_net_paid_inc_tax", DoubleType), - StructField("cs_net_paid_inc_ship", DoubleType), - StructField("cs_net_paid_inc_ship_tax", DoubleType), - StructField("cs_net_profit", DoubleType) + StructField("cs_wholesale_cost", DecimalType(7, 2)), + StructField("cs_list_price", DecimalType(7, 2)), + StructField("cs_sales_price", DecimalType(7, 2)), + StructField("cs_ext_discount_amt", DecimalType(7, 2)), + StructField("cs_ext_sales_price", DecimalType(7, 2)), + StructField("cs_ext_wholesale_cost", DecimalType(7, 2)), + StructField("cs_ext_list_price", DecimalType(7, 2)), + StructField("cs_ext_tax", DecimalType(7, 2)), + StructField("cs_coupon_amt", DecimalType(7, 2)), + StructField("cs_ext_ship_cost", DecimalType(7, 2)), + StructField("cs_net_paid", DecimalType(7, 2)), + StructField("cs_net_paid_inc_tax", DecimalType(7, 2)), + StructField("cs_net_paid_inc_ship", DecimalType(7, 2)), + StructField("cs_net_paid_inc_ship_tax", DecimalType(7, 2)), + StructField("cs_net_profit", DecimalType(7, 2)) )) } private def catalogReturnsSchema = { @@ -214,15 +214,15 @@ object TPCDSTableGen { StructField("cr_reason_sk", LongType), StructField("cr_order_number", LongType), StructField("cr_return_quantity", LongType), - StructField("cr_return_amount", DoubleType), - StructField("cr_return_tax", DoubleType), - StructField("cr_return_amt_inc_tax", DoubleType), - StructField("cr_fee", DoubleType), - StructField("cr_return_ship_cost", DoubleType), - StructField("cr_refunded_cash", DoubleType), - StructField("cr_reversed_charge", DoubleType), - StructField("cr_store_credit", DoubleType), - StructField("cr_net_loss", DoubleType) + StructField("cr_return_amount", DecimalType(7, 2)), + StructField("cr_return_tax", DecimalType(7, 2)), + StructField("cr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("cr_fee", DecimalType(7, 2)), + StructField("cr_return_ship_cost", DecimalType(7, 2)), + StructField("cr_refunded_cash", DecimalType(7, 2)), + StructField("cr_reversed_charge", DecimalType(7, 2)), + StructField("cr_store_credit", DecimalType(7, 2)), + StructField("cr_net_loss", DecimalType(7, 2)) )) } private def inventorySchema = { @@ -246,18 +246,18 @@ object TPCDSTableGen { StructField("ss_promo_sk", LongType), StructField("ss_ticket_number", LongType), StructField("ss_quantity", LongType), - StructField("ss_wholesale_cost", DoubleType), - StructField("ss_list_price", DoubleType), - StructField("ss_sales_price", DoubleType), - StructField("ss_ext_discount_amt", DoubleType), - StructField("ss_ext_sales_price", DoubleType), - StructField("ss_ext_wholesale_cost", DoubleType), - StructField("ss_ext_list_price", DoubleType), - StructField("ss_ext_tax", DoubleType), - StructField("ss_coupon_amt", DoubleType), - StructField("ss_net_paid", DoubleType), - StructField("ss_net_paid_inc_tax", DoubleType), - StructField("ss_net_profit", DoubleType) + StructField("ss_wholesale_cost", DecimalType(7, 2)), + StructField("ss_list_price", DecimalType(7, 2)), + StructField("ss_sales_price", DecimalType(7, 2)), + StructField("ss_ext_discount_amt", DecimalType(7, 2)), + StructField("ss_ext_sales_price", DecimalType(7, 2)), + StructField("ss_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ss_ext_list_price", DecimalType(7, 2)), + StructField("ss_ext_tax", DecimalType(7, 2)), + StructField("ss_coupon_amt", DecimalType(7, 2)), + StructField("ss_net_paid", DecimalType(7, 2)), + StructField("ss_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ss_net_profit", DecimalType(7, 2)) )) } private def storeReturnsSchema = { @@ -273,15 +273,15 @@ object TPCDSTableGen { StructField("sr_reason_sk", LongType), StructField("sr_ticket_number", LongType), StructField("sr_return_quantity", LongType), - StructField("sr_return_amt", DoubleType), - StructField("sr_return_tax", DoubleType), - StructField("sr_return_amt_inc_tax", DoubleType), - StructField("sr_fee", DoubleType), - StructField("sr_return_ship_cost", DoubleType), - StructField("sr_refunded_cash", DoubleType), - StructField("sr_reversed_charge", DoubleType), - StructField("sr_store_credit", DoubleType), - StructField("sr_net_loss", DoubleType) + StructField("sr_return_amt", DecimalType(7, 2)), + StructField("sr_return_tax", DecimalType(7, 2)), + StructField("sr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("sr_fee", DecimalType(7, 2)), + StructField("sr_return_ship_cost", DecimalType(7, 2)), + StructField("sr_refunded_cash", DecimalType(7, 2)), + StructField("sr_reversed_charge", DecimalType(7, 2)), + StructField("sr_store_credit", DecimalType(7, 2)), + StructField("sr_net_loss", DecimalType(7, 2)) )) } private def webSalesSchema = { @@ -305,21 +305,21 @@ object TPCDSTableGen { StructField("ws_promo_sk", LongType), StructField("ws_order_number", LongType), StructField("ws_quantity", LongType), - StructField("ws_wholesale_cost", DoubleType), - StructField("ws_list_price", DoubleType), - StructField("ws_sales_price", DoubleType), - StructField("ws_ext_discount_amt", DoubleType), - StructField("ws_ext_sales_price", DoubleType), - StructField("ws_ext_wholesale_cost", DoubleType), - StructField("ws_ext_list_price", DoubleType), - StructField("ws_ext_tax", DoubleType), - StructField("ws_coupon_amt", DoubleType), - StructField("ws_ext_ship_cost", DoubleType), - StructField("ws_net_paid", DoubleType), - StructField("ws_net_paid_inc_tax", DoubleType), - StructField("ws_net_paid_inc_ship", DoubleType), - StructField("ws_net_paid_inc_ship_tax", DoubleType), - StructField("ws_net_profit", DoubleType) + StructField("ws_wholesale_cost", DecimalType(7, 2)), + StructField("ws_list_price", DecimalType(7, 2)), + StructField("ws_sales_price", DecimalType(7, 2)), + StructField("ws_ext_discount_amt", DecimalType(7, 2)), + StructField("ws_ext_sales_price", DecimalType(7, 2)), + StructField("ws_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ws_ext_list_price", DecimalType(7, 2)), + StructField("ws_ext_tax", DecimalType(7, 2)), + StructField("ws_coupon_amt", DecimalType(7, 2)), + StructField("ws_ext_ship_cost", DecimalType(7, 2)), + StructField("ws_net_paid", DecimalType(7, 2)), + StructField("ws_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ws_net_paid_inc_ship", DecimalType(7, 2)), + StructField("ws_net_paid_inc_ship_tax", DecimalType(7, 2)), + StructField("ws_net_profit", DecimalType(7, 2)) )) } private def webReturnsSchema = { @@ -339,15 +339,15 @@ object TPCDSTableGen { StructField("wr_reason_sk", LongType), StructField("wr_order_number", LongType), StructField("wr_return_quantity", LongType), - StructField("wr_return_amt", DoubleType), - StructField("wr_return_tax", DoubleType), - StructField("wr_return_amt_inc_tax", DoubleType), - StructField("wr_fee", DoubleType), - StructField("wr_return_ship_cost", DoubleType), - StructField("wr_refunded_cash", DoubleType), - StructField("wr_reversed_charge", DoubleType), - StructField("wr_account_credit", DoubleType), - StructField("wr_net_loss", DoubleType) + StructField("wr_return_amt", DecimalType(7, 2)), + StructField("wr_return_tax", DecimalType(7, 2)), + StructField("wr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("wr_fee", DecimalType(7, 2)), + StructField("wr_return_ship_cost", DecimalType(7, 2)), + StructField("wr_refunded_cash", DecimalType(7, 2)), + StructField("wr_reversed_charge", DecimalType(7, 2)), + StructField("wr_account_credit", DecimalType(7, 2)), + StructField("wr_net_loss", DecimalType(7, 2)) )) } private def callCenterSchema = { @@ -381,8 +381,8 @@ object TPCDSTableGen { StructField("cc_state", StringType), StructField("cc_zip", StringType), StructField("cc_country", StringType), - StructField("cc_gmt_offset", DoubleType), - StructField("cc_tax_percentage", DoubleType) + StructField("cc_gmt_offset", DecimalType(5, 2)), + StructField("cc_tax_percentage", DecimalType(5, 2)) )) } private def catalogPageSchema = { @@ -433,7 +433,7 @@ object TPCDSTableGen { StructField("ca_state", StringType), StructField("ca_zip", StringType), StructField("ca_country", StringType), - StructField("ca_gmt_offset", DoubleType), + StructField("ca_gmt_offset", DecimalType(5, 2)), StructField("ca_location_type", StringType) )) } @@ -505,8 +505,8 @@ object TPCDSTableGen { StructField("i_rec_start_date", StringType), StructField("i_rec_end_date", StringType), StructField("i_item_desc", StringType), - StructField("i_current_price", DoubleType), - StructField("i_wholesale_cost", DoubleType), + StructField("i_current_price", DecimalType(7, 2)), + StructField("i_wholesale_cost", DecimalType(7, 2)), StructField("i_brand_id", LongType), StructField("i_brand", StringType), StructField("i_class_id", LongType), @@ -531,7 +531,7 @@ object TPCDSTableGen { StructField("p_start_date_sk", LongType), StructField("p_end_date_sk", LongType), StructField("p_item_sk", LongType), - StructField("p_cost", DoubleType), + StructField("p_cost", DecimalType(15, 2)), StructField("p_response_target", LongType), StructField("p_promo_name", StringType), StructField("p_channel_dmail", StringType), @@ -593,8 +593,8 @@ object TPCDSTableGen { StructField("s_state", StringType), StructField("s_zip", StringType), StructField("s_country", StringType), - StructField("s_gmt_offset", DoubleType), - StructField("s_tax_precentage", DoubleType) + StructField("s_gmt_offset", DecimalType(5, 2)), + StructField("s_tax_precentage", DecimalType(5, 2)) )) } private def timeDimSchema = { @@ -626,7 +626,7 @@ object TPCDSTableGen { StructField("w_state", StringType), StructField("w_zip", StringType), StructField("w_country", StringType), - StructField("w_gmt_offset", DoubleType) + StructField("w_gmt_offset", DecimalType(5, 2)) )) } private def webPageSchema = { @@ -674,7 +674,7 @@ object TPCDSTableGen { StructField("web_zip", StringType), StructField("web_country", StringType), StructField("web_gmt_offset", StringType), - StructField("web_tax_percentage", DoubleType) + StructField("web_tax_percentage", DecimalType(5, 2)) )) } } diff --git a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala index 7776a2b34..81a55569b 100644 --- a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala @@ -41,7 +41,7 @@ import scala.collection.mutable.ArrayBuffer class TPCHSuite extends QueryTest with SharedSparkSession { private val MAX_DIRECT_MEMORY = "6g" - private val TPCH_QUERIES_RESOURCE = "tpch-queries-double" + private val TPCH_QUERIES_RESOURCE = "tpch-queries" private val TPCH_WRITE_PATH = "/tmp/tpch-generated" private var runner: TPCRunner = _ diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index c2ac266c7..236237e2e 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -116,6 +116,10 @@ add_dependencies(jni_proto protobuf::libprotobuf) set(PROTO_SRCS "${PROTO_OUTPUT_DIR}/Exprs.pb.cc") set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Exprs.pb.h") +if(DEBUG) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -DDEBUG -DDEBUG_LEVEL_1 -DDEBUG_LEVEL_2") +endif() + if(USE_AVX512) # Only enable additional instruction sets if they are supported message(STATUS "System processor: ${CMAKE_SYSTEM_PROCESSOR}") @@ -167,10 +171,6 @@ endmacro() add_subdirectory(benchmarks) endif() -if(DEBUG) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -DDEBUG -DDEBUG_LEVEL_1 -DDEBUG_LEVEL_2") -endif() - find_library(ARROW_LIB arrow) find_library(GANDIVA_LIB gandiva) diff --git a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index c8051150b..4ede6fd30 100644 --- a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -134,10 +134,17 @@ class WindowAggregateFunctionKernel : public KernalBase { std::shared_ptr* out); arrow::Status Evaluate(const ArrayList& in) override; arrow::Status Finish(ArrayList* out) override; - template - arrow::Status Finish0(ArrayList* out); private: + template + arrow::Status Finish0(ArrayList* out, std::shared_ptr data_type); + + template + typename arrow::enable_if_decimal128>> createBuilder(std::shared_ptr data_type); + + template + typename arrow::enable_if_number>> createBuilder(std::shared_ptr data_type); + arrow::compute::ExecContext* ctx_; std::shared_ptr action_; std::vector> accumulated_group_ids_; diff --git a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index 1d793f08a..d8ec9db3c 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -33,12 +33,13 @@ class WindowAggregateFunctionKernel::ActionFactory { static arrow::Status Make(std::string action_name, arrow::compute::ExecContext *ctx, std::shared_ptr type, + std::shared_ptr return_type, std::shared_ptr *out) { std::shared_ptr action; if (action_name == "sum") { - RETURN_NOT_OK(MakeSumAction(ctx, type, {type}, &action)); + RETURN_NOT_OK(MakeSumAction(ctx, type, {return_type}, &action)); } else if (action_name == "avg") { - RETURN_NOT_OK(MakeAvgAction(ctx, type, {type}, &action)); + RETURN_NOT_OK(MakeAvgAction(ctx, type, {return_type}, &action)); } else { return arrow::Status::Invalid("window aggregate function: unsupported action name: " + action_name); } @@ -65,7 +66,7 @@ arrow::Status WindowAggregateFunctionKernel::Make(arrow::compute::ExecContext *c std::shared_ptr action; if (function_name == "sum" || function_name == "avg") { - RETURN_NOT_OK(ActionFactory::Make(function_name, ctx, type_list[0], &action)); + RETURN_NOT_OK(ActionFactory::Make(function_name, ctx, type_list[0], result_type, &action)); } else { return arrow::Status::Invalid("window function not supported: " + function_name); } @@ -127,45 +128,48 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList &in) { return arrow::Status::OK(); } -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) +#define PROCESS_SUPPORTED_TYPES_WINDOW(PROC) \ + PROC(arrow::UInt8Type, arrow::UInt8Builder, arrow::UInt8Array) \ + PROC(arrow::Int8Type, arrow::Int8Builder, arrow::Int8Array) \ + PROC(arrow::UInt16Type, arrow::UInt16Builder, arrow::UInt16Array) \ + PROC(arrow::Int16Type, arrow::Int16Builder, arrow::Int16Array) \ + PROC(arrow::UInt32Type, arrow::UInt32Builder, arrow::UInt32Array) \ + PROC(arrow::Int32Type, arrow::Int32Builder, arrow::Int32Array) \ + PROC(arrow::UInt64Type, arrow::UInt64Builder, arrow::UInt64Array) \ + PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ + PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ + PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) arrow::Status WindowAggregateFunctionKernel::Finish(ArrayList *out) { std::shared_ptr value_type = result_type_; switch (value_type->id()) { -#define PROCESS(NUMERIC_TYPE) \ - case NUMERIC_TYPE::type_id: { \ - RETURN_NOT_OK(Finish0(out)); \ + +#define PROCESS(VALUE_TYPE, BUILDER_TYPE, ARRAY_TYPE) \ + case VALUE_TYPE::type_id: { \ + RETURN_NOT_OK((Finish0(out, value_type))); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS - default:return arrow::Status::Invalid("window function: unsupported input type"); + default: return arrow::Status::Invalid("window function: unsupported input type: " + value_type->name()); } return arrow::Status::OK(); } -template -arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out) { +template +arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out, std::shared_ptr data_type) { ArrayList action_output; RETURN_NOT_OK(action_->Get()->Finish(&action_output)); if (action_output.size() != 1) { return arrow::Status::Invalid("window function: got invalid result from corresponding action"); } - auto action_output_values = std::dynamic_pointer_cast>(action_output.at(0)); + auto action_output_values = std::dynamic_pointer_cast(action_output.at(0)); for (const auto &accumulated_group_ids_single_part : accumulated_group_ids_) { - std::unique_ptr> output_builder - = std::make_unique>(ctx_->memory_pool()); + std::shared_ptr output_builder; + ARROW_ASSIGN_OR_RAISE(output_builder, (createBuilder(data_type))) for (int i = 0; i < accumulated_group_ids_single_part->length(); i++) { if (accumulated_group_ids_single_part->IsNull(i)) { @@ -182,6 +186,18 @@ arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out) { return arrow::Status::OK(); } +template +typename arrow::enable_if_decimal128>> + WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(data_type, ctx_->memory_pool()); +} + +template +typename arrow::enable_if_number>> + WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(ctx_->memory_pool()); +} + WindowRankKernel::WindowRankKernel(arrow::compute::ExecContext *ctx, std::vector> type_list, std::shared_ptr sorter, @@ -215,15 +231,15 @@ arrow::Status WindowRankKernel::Make(arrow::compute::ExecContext *ctx, result_schema, nulls_first, asc)); } else { switch (key_field->type()->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using CType = typename arrow::TypeTraits::CType; \ + using CType = typename TypeTraits::CType; \ sorter.reset(new WindowSortOnekeyKernel(ctx, key_fields, result_schema, nulls_first, asc)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS default: { - std::cout << "WindowSortOnekeyKernel type not supported, type is " + std::cout << "WindowRankKernel type not supported, type is " << key_field->type() << std::endl; } break; @@ -352,12 +368,11 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { bool s; std::shared_ptr type = type_list_.at(column_id); switch (type->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using ArrayType = typename arrow::TypeTraits::ArrayType; \ - RETURN_NOT_OK(AreTheSameValue(values, column_id, index, last_index, &s)); \ + RETURN_NOT_OK(AreTheSameValue(values, column_id, index, last_index, &s)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS default: { std::cout << "WindowRankKernel: type not supported: " @@ -458,6 +473,8 @@ arrow::Status WindowRankKernel::AreTheSameValue(const std::vector& va return arrow::Status::OK(); } +#undef PROCESS_SUPPORTED_TYPES_WINDOW + } } } diff --git a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index 5ada38824..087c15577 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -696,115 +696,6 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { uint64_t num_batches_ = 0; uint64_t col_num_; int key_id_; - -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::ExecContext* ctx, - std::shared_ptr schema, - std::shared_ptr result_schema, - std::shared_ptr indices_in, - std::vector& cached) - : ctx_(ctx), - result_schema_(result_schema), - indices_in_cache_(indices_in), - total_length_(indices_in->length()), - cached_in_(cached) { - col_num_ = result_schema->num_fields(); - indices_begin_ = (ArrayItemIndex*)indices_in->value_data(); - for (uint64_t i = 0; i < col_num_; i++) { - auto field = result_schema->field(i); - if (field->type()->id() == arrow::Type::STRING) { - auto app_ptr = std::make_shared>(ctx); - auto appender = std::dynamic_pointer_cast(app_ptr); - appender_list_.push_back(appender); - } else { - switch (field->type()->id()) { -#define PROCESS(InType) \ - case InType::type_id: { \ - auto app_ptr = std::make_shared>(ctx); \ - auto appender = std::dynamic_pointer_cast(app_ptr); \ - appender_list_.push_back(appender); \ - } break; - PROCESS_SUPPORTED_TYPES(PROCESS) - default: { - std::cout << "WindowSortOnekeyKernel type not supported, type is " - << field->type() << std::endl; - } break; -#undef PROCESS - } - } - } - for (int i = 0; i < col_num_; i++) { - arrow::ArrayVector array_vector = cached_in_[i]; - int array_num = array_vector.size(); - for (int array_id = 0; array_id < array_num; array_id++) { - auto arr = array_vector[array_id]; - appender_list_[i]->AddArray(arr); - } - } - batch_size_ = GetBatchSize(); - } - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ - : (total_length_ - offset_); - uint64_t count = 0; - for (int i = 0; i < col_num_; i++) { - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); - } - count = 0; - } - offset_ += length; - ArrayList arrays; - for (int i = 0; i < col_num_; i++) { - std::shared_ptr out_; - RETURN_NOT_OK(appender_list_[i]->Finish(&out_)); - arrays.push_back(out_); - appender_list_[i]->Reset(); - } - - *out = arrow::RecordBatch::Make(result_schema_, length, arrays); - return arrow::Status::OK(); - } - - private: - uint64_t offset_ = 0; - const uint64_t total_length_; - std::shared_ptr schema_; - std::shared_ptr result_schema_; - arrow::compute::ExecContext* ctx_; - uint64_t batch_size_; - uint64_t col_num_; - ArrayItemIndex* indices_begin_; - std::vector cached_in_; - std::vector> type_list_; - std::vector> appender_list_; - std::vector> array_list_; - std::shared_ptr indices_in_cache_; - }; -#undef PROCESS_SUPPORTED_TYPES }; arrow::Status WindowSortKernel::Make( @@ -816,17 +707,20 @@ arrow::Status WindowSortKernel::Make( nulls_first, asc); return arrow::Status::OK(); } -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) + +#define PROCESS_SUPPORTED_TYPES_WINDOW_SORT(PROC) \ + PROC(arrow::UInt8Type, arrow::UInt8Builder, arrow::UInt8Array) \ + PROC(arrow::Int8Type, arrow::Int8Builder, arrow::Int8Array) \ + PROC(arrow::UInt16Type, arrow::UInt16Builder, arrow::UInt16Array) \ + PROC(arrow::Int16Type, arrow::Int16Builder, arrow::Int16Array) \ + PROC(arrow::UInt32Type, arrow::UInt32Builder, arrow::UInt32Array) \ + PROC(arrow::Int32Type, arrow::Int32Builder, arrow::Int32Array) \ + PROC(arrow::UInt64Type, arrow::UInt64Builder, arrow::UInt64Array) \ + PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ + PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ + PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) + WindowSortKernel::WindowSortKernel( arrow::compute::ExecContext* ctx, std::vector> key_field_list, @@ -841,12 +735,12 @@ WindowSortKernel::WindowSortKernel( result_schema, nulls_first, asc)); } else { switch (key_field_list[0]->type()->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using CType = typename arrow::TypeTraits::CType; \ + using CType = typename TypeTraits::CType; \ impl_.reset(new WindowSortOnekeyKernel(ctx, key_field_list, result_schema, nulls_first, asc)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW_SORT(PROCESS) #undef PROCESS default: { std::cout << "WindowSortOnekeyKernel type not supported, type is " @@ -864,7 +758,7 @@ WindowSortKernel::WindowSortKernel( } kernel_name_ = "WindowSortKernel"; } -#undef PROCESS_SUPPORTED_TYPES +#undef PROCESS_SUPPORTED_TYPES_WINDOW_SORT arrow::Status WindowSortKernel::Evaluate(const ArrayList& in) { return impl_->Evaluate(in); diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index f01c5c149..2d677f18a 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -7,3 +7,5 @@ package_add_test(TestArrowComputeCondition arrow_compute_test_check_condition.cc package_add_test(TestArrowComputeWSCG arrow_compute_test_wscg.cc) package_add_test(TestArrowComputeJoinWOCG arrow_compute_test_join_wocg.cc) package_add_test(TestShuffleSplit shuffle_split_test.cc) +package_add_test(TestArrowComputeWindow arrow_compute_test_window.cc) + diff --git a/cpp/src/tests/arrow_compute_test_window.cc b/cpp/src/tests/arrow_compute_test_window.cc new file mode 100644 index 000000000..cd98793ab --- /dev/null +++ b/cpp/src/tests/arrow_compute_test_window.cc @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "precompile/array.h" +#include "tests/test_utils.h" +#include "codegen/code_generator.h" +#include "codegen/code_generator_factory.h" +#include "precompile/gandiva.h" + +using arrow::int64; +using arrow::uint32; +using gandiva::TreeExprBuilder; + +namespace sparkcolumnarplugin { +namespace codegen { + +TEST(TestArrowComputeWindow, DoubleTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dou", arrow::float64())}); + std::vector input_data = { + "[1, 2, 1]", + "[35.612, 37.244, 82.664]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::float64()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("sum", + {TreeExprBuilder::MakeField(field("col_dou", arrow::float64()))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[118.276, 37.244, 118.276]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, LongAvgTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_long", arrow::int64())}); + std::vector input_data = { + "[1, 2, 1]", + "[35612, 37244, 82664]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int64()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("avg", + {TreeExprBuilder::MakeField(field("col_long", arrow::int64()))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[59138, 37244, 59138]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"82.664\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::decimal128(8, 3)); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("sum", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[\"118.276\", \"37.244\", \"118.276\"]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalAvgTest) { + return; // fixme decimal avg not supported? + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"82.664\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::decimal128(8, 3)); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("avg", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[\"118.276\", \"37.244\", \"118.276\"]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalRankTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"35.613\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int32()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("rank_desc", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[2, 1, 1]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalRankTest2) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"35.612\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int32()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("rank_desc", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[1, 1, 1]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +} // namespace codegen +} // namespace sparkcolumnarplugin