From b1cb4fb4e3da7ed54ac875afc20a81f25310fa87 Mon Sep 17 00:00:00 2001 From: chuxi Date: Thu, 21 Aug 2014 20:47:25 +0800 Subject: [PATCH 1/3] Correctly parse dot notations for accessing an array of structs --- .../apache/spark/sql/catalyst/SqlParser.scala | 10 +- .../catalyst/expressions/complexTypes.scala | 99 +++++++++++++++++++ .../catalyst/plans/logical/LogicalPlan.scala | 5 +- .../org/apache/spark/sql/json/JsonSuite.scala | 23 +++-- 4 files changed, 125 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 2c73a80f64eb..edc3ad90ce8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -343,6 +343,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) protected lazy val baseExpression: PackratParser[Expression] = + expression ~ "[" ~ expression ~ "]" ~ expression ^^ { + case base ~ _ ~ ordinal ~ _ ~ field => GetArrayOfStructItem(base, ordinal, field) + } | expression ~ "[" ~ expression <~ "]" ^^ { case base ~ _ ~ ordinal => GetItem(base, ordinal) } | @@ -373,8 +376,11 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical { ) override lazy val token: Parser[Token] = ( - identChar ~ rep( identChar | digit ) ^^ - { case first ~ rest => processIdent(first :: rest mkString "") } + identChar ~ rep( identChar | digit ) ^^ + { + case first ~ rest if(first != '.') => processIdent(first :: rest mkString "") + case first ~ rest if(first == '.') => StringLit(rest mkString "") + } | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { case i ~ None => NumericLit(i mkString "") case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString("")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index c1154eb81c31..0246d8633c6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -101,3 +101,102 @@ case class GetField(child: Expression, fieldName: String) extends UnaryExpressio override def toString = s"$child.$fieldName" } + +/** + * Returns the value of fields[] in the Struct `child`. + * for array of structs + */ + +case class GetArrayField(child: Expression, fieldName: String) extends UnaryExpression { + type EvaluatedType = Any + + def dataType = field.dataType + override def nullable = child.nullable || field.nullable + override def foldable = child.foldable + + protected def arrayType = child.dataType match { + case s: ArrayType => s.elementType match { + case t :StructType => t + case otherType => sys.error(s"GetArrayField is not valid on fields of type $otherType") + } + case otherType => sys.error(s"GetArrayField is not valid on fields of type $otherType") + } + + lazy val field = + arrayType.fields + .find(_.name == fieldName) + .getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}")) + + + lazy val ordinal = arrayType.fields.indexOf(field) + + override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[ArrayType] + + override def eval(input: Row): Any = { + val value : Seq[Row] = child.eval(input).asInstanceOf[Seq[Row]] + val v = value.map{ t => + if (t == null) null else t(ordinal) + } + v + } + + override def toString = s"$child.$fieldName" +} + + +/** + * Returns the field at `ordinal` in the ArrayOfStruct `child` + */ +case class GetArrayOfStructItem(child: Expression, ordinal: Expression, field :Expression) extends Expression { + type EvaluatedType = Any + + val fieldName = field.toString + + val children = child :: ordinal :: field :: Nil + /** `Null` is returned for invalid ordinals. */ + override def nullable = true + override def foldable = child.foldable && ordinal.foldable && field.foldable + override def references = children.flatMap(_.references).toSet + def dataType = ifield.dataType + + def fieldType = child.dataType match { + case a :ArrayType => a.elementType match { + case t :StructType => t + case otherType => sys.error(s"GetArrayOfStructItem is not valid on fields of type $otherType") + } + case otherType => sys.error(s"GetArrayOfStructItem is not valid on fields of type $otherType") + } + override lazy val resolved = + childrenResolved && + (child.dataType.isInstanceOf[ArrayType]) + + override def toString = s"$child[$ordinal].$field" + + lazy val ifield = fieldType.fields.find(_.name == fieldName).getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}")) + + lazy val fordinal = fieldType.fields.indexOf(ifield) + + override def eval(input: Row): Any = { + val value = child.eval(input) + if (value == null) { + null + } else { + val key = ordinal.eval(input) + if (key == null) { + null + } else { + + // TODO: consider using Array[_] for ArrayType child to avoid + // boxing of primitives + val baseValue = value.asInstanceOf[Seq[_]] + val o = key.asInstanceOf[Int] + if (o >= baseValue.size || o < 0) { + null + } else { + baseValue(o).asInstanceOf[Row](fordinal) + } + + } + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 278569f0cb14..48424e56954d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.catalyst.types.{ArrayType, StructType} import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { @@ -94,6 +94,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { // matches the name or where the first part matches the scope and the second part matches the // name. Return these matches along with any remaining parts, which represent dotted access to // struct fields. + val options = input.flatMap { option => // If the first part of the desired name matches a qualifier for this possible match, drop it. val remainingParts = @@ -108,6 +109,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { a.dataType match { case StructType(fields) => Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) + case fields :ArrayType => + Some(Alias(nestedFields.foldLeft(a :Expression)(GetArrayField), nestedFields.last)()) case _ => None // Don't know how to resolve these field references } case Seq() => None // No matches. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 58b1e23891a3..c6ca2c5e7cc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -292,24 +292,29 @@ class JsonSuite extends QueryTest { sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), (5, null) :: Nil ) - } - ignore("Complex field and type inferring (Ignored)") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType) - jsonSchemaRDD.registerTempTable("jsonTable") + checkAnswer( + sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"), + (Seq(true, false, null), Seq("str1", null, null)) :: Nil + ) - // Right now, "field1" and "field2" are treated as aliases. We should fix it. checkAnswer( sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), (true, "str1") :: Nil ) - // Right now, the analyzer cannot resolve arrayOfStruct.field1 and arrayOfStruct.field2. - // Getting all values of a specific field from an array of structs. + } + + ignore("Complex field and type inferring (Ignored)") { + val jsonSchemaRDD = jsonRDD(complexFieldAndType) + jsonSchemaRDD.registerTempTable("jsonTable") + + // still need add filter??? I am not sure whether this function is necessary. quite complex checkAnswer( - sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"), - (Seq(true, false), Seq("str1", null)) :: Nil + sql("select arrayOfStruct.field1 from jsonTable where arrayOfStruct.field1 = true"), + (Seq(true)) :: Nil ) + } test("Type conflict in primitive field values") { From ebf033bfb2a658d4ca25cfdbe4d7def105793486 Mon Sep 17 00:00:00 2001 From: chuxi Date: Sun, 24 Aug 2014 19:29:40 +0800 Subject: [PATCH 2/3] improve the code of SPARK-2096 [SQL] --- .../apache/spark/sql/catalyst/SqlParser.scala | 8 ++- .../catalyst/expressions/complexTypes.scala | 60 +------------------ .../catalyst/plans/logical/LogicalPlan.scala | 6 +- 3 files changed, 10 insertions(+), 64 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index edc3ad90ce8d..96f5431f766c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -344,7 +344,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val baseExpression: PackratParser[Expression] = expression ~ "[" ~ expression ~ "]" ~ expression ^^ { - case base ~ _ ~ ordinal ~ _ ~ field => GetArrayOfStructItem(base, ordinal, field) + case base ~ _ ~ ordinal ~ _ ~ field => GetField(GetItem(base, ordinal), field.toString) } | expression ~ "[" ~ expression <~ "]" ^^ { case base ~ _ ~ ordinal => GetItem(base, ordinal) @@ -378,8 +378,10 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical { override lazy val token: Parser[Token] = ( identChar ~ rep( identChar | digit ) ^^ { - case first ~ rest if(first != '.') => processIdent(first :: rest mkString "") - case first ~ rest if(first == '.') => StringLit(rest mkString "") + case first ~ rest => first match { + case '.' => StringLit(rest mkString "") + case _ => processIdent(first :: rest mkString "") + } } | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { case i ~ None => NumericLit(i mkString "") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index 0246d8633c6a..705cd8f27922 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -141,62 +141,4 @@ case class GetArrayField(child: Expression, fieldName: String) extends UnaryExpr } override def toString = s"$child.$fieldName" -} - - -/** - * Returns the field at `ordinal` in the ArrayOfStruct `child` - */ -case class GetArrayOfStructItem(child: Expression, ordinal: Expression, field :Expression) extends Expression { - type EvaluatedType = Any - - val fieldName = field.toString - - val children = child :: ordinal :: field :: Nil - /** `Null` is returned for invalid ordinals. */ - override def nullable = true - override def foldable = child.foldable && ordinal.foldable && field.foldable - override def references = children.flatMap(_.references).toSet - def dataType = ifield.dataType - - def fieldType = child.dataType match { - case a :ArrayType => a.elementType match { - case t :StructType => t - case otherType => sys.error(s"GetArrayOfStructItem is not valid on fields of type $otherType") - } - case otherType => sys.error(s"GetArrayOfStructItem is not valid on fields of type $otherType") - } - override lazy val resolved = - childrenResolved && - (child.dataType.isInstanceOf[ArrayType]) - - override def toString = s"$child[$ordinal].$field" - - lazy val ifield = fieldType.fields.find(_.name == fieldName).getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}")) - - lazy val fordinal = fieldType.fields.indexOf(ifield) - - override def eval(input: Row): Any = { - val value = child.eval(input) - if (value == null) { - null - } else { - val key = ordinal.eval(input) - if (key == null) { - null - } else { - - // TODO: consider using Array[_] for ArrayType child to avoid - // boxing of primitives - val baseValue = value.asInstanceOf[Seq[_]] - val o = key.asInstanceOf[Int] - if (o >= baseValue.size || o < 0) { - null - } else { - baseValue(o).asInstanceOf[Row](fordinal) - } - - } - } - } -} +} \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 48424e56954d..6ebe13303dbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -109,8 +109,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { a.dataType match { case StructType(fields) => Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) - case fields :ArrayType => - Some(Alias(nestedFields.foldLeft(a :Expression)(GetArrayField), nestedFields.last)()) + case ArrayType(fields, _) => nestedFields.length match { + case 1 => Some(Alias(GetArrayField(a, nestedFields.head), nestedFields.last)()) + case _ => None // can't resolve arrayOfStruct.field1._ + } case _ => None // Don't know how to resolve these field references } case Seq() => None // No matches. From e5a3db19bf5fc7365e4280f17d9bba27b08d29dd Mon Sep 17 00:00:00 2001 From: chuxi Date: Thu, 28 Aug 2014 12:41:42 +0800 Subject: [PATCH 3/3] modify scala code style by sbt scalastyle --- .../sql/catalyst/expressions/complexTypes.scala | 15 ++++++--------- .../sql/catalyst/plans/logical/LogicalPlan.scala | 1 - 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index 705cd8f27922..823e407bdde0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -103,10 +103,9 @@ case class GetField(child: Expression, fieldName: String) extends UnaryExpressio } /** - * Returns the value of fields[] in the Struct `child`. - * for array of structs + * Returns an array containing the value of fieldName + * for each element in the input array of type struct */ - case class GetArrayField(child: Expression, fieldName: String) extends UnaryExpression { type EvaluatedType = Any @@ -115,17 +114,15 @@ case class GetArrayField(child: Expression, fieldName: String) extends UnaryExpr override def foldable = child.foldable protected def arrayType = child.dataType match { - case s: ArrayType => s.elementType match { - case t :StructType => t - case otherType => sys.error(s"GetArrayField is not valid on fields of type $otherType") - } + case ArrayType(s: StructType, _) => s case otherType => sys.error(s"GetArrayField is not valid on fields of type $otherType") } - lazy val field = + lazy val field = if (arrayType.isInstanceOf[StructType]) { arrayType.fields .find(_.name == fieldName) .getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}")) + } else null lazy val ordinal = arrayType.fields.indexOf(field) @@ -141,4 +138,4 @@ case class GetArrayField(child: Expression, fieldName: String) extends UnaryExpr } override def toString = s"$child.$fieldName" -} \ No newline at end of file +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6ebe13303dbc..d08c293e910d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -94,7 +94,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { // matches the name or where the first part matches the scope and the second part matches the // name. Return these matches along with any remaining parts, which represent dotted access to // struct fields. - val options = input.flatMap { option => // If the first part of the desired name matches a qualifier for this possible match, drop it. val remainingParts =