From b7861d2e8565d212e6fd545b8cda6dc7645817d9 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 21 Apr 2014 12:36:28 +0800 Subject: [PATCH 1/4] Add Support for Deferred Expression Evaluation --- .../sql/catalyst/expressions/predicates.scala | 44 +++++++++++++------ .../org/apache/spark/sql/hive/hiveUdfs.scala | 28 +++++++++--- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 6ee479939d25..a2d0bf11056e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -98,13 +98,17 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - val r = right.eval(input) - if (l == false || r == false) { + if(l == null) { + null + } else if(l == false) { false - } else if (l == null || r == null ) { - null } else { - true + val r = right.eval(input) + if(r == null) { + null + } else { + r + } } } } @@ -114,13 +118,17 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - val r = right.eval(input) - if (l == true || r == true) { - true - } else if (l == null || r == null) { + if(l == null) { null + } else if(l == true) { + true } else { - false + val r = right.eval(input) + if(r == null) { + null + } else { + r + } } } } @@ -133,8 +141,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison def symbol = "=" override def eval(input: Row): Any = { val l = left.eval(input) - val r = right.eval(input) - if (l == null || r == null) null else l == r + if (l == null) { + null + } else { + val r = right.eval(input) + if(r == null) null else l == r + } } } @@ -162,7 +174,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi extends Expression { def children = predicate :: trueValue :: falseValue :: Nil - def nullable = trueValue.nullable || falseValue.nullable + override def nullable = predicate.nullable || (trueValue.nullable && falseValue.nullable) def references = children.flatMap(_.references).toSet override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType def dataType = { @@ -175,8 +187,12 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi } type EvaluatedType = Any + override def eval(input: Row): Any = { - if (predicate.eval(input).asInstanceOf[Boolean]) { + val condition = predicate.eval(input) + if (null == condition) { + null + } else if(condition == true) { trueValue.eval(input) } else { falseValue.eval(input) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index d50e2c65b7b3..e578e6ecb209 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression]) isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable) } + protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({ + new DeferredObjectAdapter + }) + + // Adapter from Catalyst ExpressionResult to Hive DeferredObject + class DeferredObjectAdapter extends DeferredObject { + private var func: () => Any = _ + def set(func: () => Any) { + this.func = func + } + override def prepare(i: Int) = {} + override def get(): AnyRef = wrap(func()) + } + val dataType: DataType = inspectorToDataType(returnInspector) override def eval(input: Row): Any = { returnInspector // Make sure initialized. - val args = children.map { v => - new DeferredObject { - override def prepare(i: Int) = {} - override def get(): AnyRef = wrap(v.eval(input)) - } - }.toArray - unwrap(function.evaluate(args)) + var i = 0 + while(i < children.length) { + val idx = i + deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)}) + i += 1 + } + unwrap(function.evaluate(deferedObjects)) } } From af2236bfd971f414956667027cee333f77bd04ed Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 22 Apr 2014 08:35:40 +0800 Subject: [PATCH 2/4] revert the short-circuit expression evaluation for IF --- .../apache/spark/sql/catalyst/expressions/predicates.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index a2d0bf11056e..27758b0add46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -174,7 +174,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi extends Expression { def children = predicate :: trueValue :: falseValue :: Nil - override def nullable = predicate.nullable || (trueValue.nullable && falseValue.nullable) + override def nullable = trueValue.nullable || falseValue.nullable def references = children.flatMap(_.references).toSet override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType def dataType = { @@ -189,10 +189,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi type EvaluatedType = Any override def eval(input: Row): Any = { - val condition = predicate.eval(input) - if (null == condition) { - null - } else if(condition == true) { + if (true == predicate.eval(input)) { trueValue.eval(input) } else { falseValue.eval(input) From a08f09cabe7c2fe49ce8289c4c9b05ecab3870f7 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 22 Apr 2014 10:42:59 +0800 Subject: [PATCH 3/4] fix bug in or/and short-circuit evaluation --- .../sql/catalyst/expressions/predicates.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 27758b0add46..4f8d22e71c44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -98,16 +98,18 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - if(l == null) { - null - } else if(l == false) { - false + if(l == false) { + false } else { val r = right.eval(input) - if(r == null) { - null + if(r == false) { + false } else { - r + if(l != null && r != null) { + true + } else { + null + } } } } @@ -118,16 +120,18 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - if(l == null) { - null - } else if(l == true) { + if(l == true) { true } else { val r = right.eval(input) - if(r == null) { - null + if(r == true) { + true } else { - r + if(l != null && r != null) { + false + } else { + null + } } } } From d2729deea77df1f0f77ea2559682f4953c49f122 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 16 May 2014 09:41:04 +0800 Subject: [PATCH 4/4] Fix the codestyle issues --- .../sql/catalyst/expressions/predicates.scala | 14 +++++++------- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 4f8d22e71c44..d11157853050 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -98,14 +98,14 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - if(l == false) { + if (l == false) { false } else { val r = right.eval(input) - if(r == false) { + if (r == false) { false } else { - if(l != null && r != null) { + if (l != null && r != null) { true } else { null @@ -120,14 +120,14 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate { override def eval(input: Row): Any = { val l = left.eval(input) - if(l == true) { + if (l == true) { true } else { val r = right.eval(input) - if(r == true) { + if (r == true) { true } else { - if(l != null && r != null) { + if (l != null && r != null) { false } else { null @@ -149,7 +149,7 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison null } else { val r = right.eval(input) - if(r == null) null else l == r + if (r == null) null else l == r } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index e578e6ecb209..572902042337 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -267,7 +267,7 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression]) override def eval(input: Row): Any = { returnInspector // Make sure initialized. var i = 0 - while(i < children.length) { + while (i < children.length) { val idx = i deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)}) i += 1