From 6059f6fa9dcbf0083d6449dcd5f58f2568864376 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 10 Dec 2015 17:40:45 -0800 Subject: [PATCH 1/3] [SPARK-12271][SQL] Improve error message when Dataset.as has incompatible schemas. --- .../catalyst/encoders/ExpressionEncoder.scala | 21 +++++++++++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 9 +++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 3e8420ecb9cc..a6b186d06772 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -251,6 +251,27 @@ case class ExpressionEncoder[T]( val plan = Project(Alias(unbound, "")() :: Nil, LocalRelation(schema)) val analyzedPlan = SimpleAnalyzer.execute(plan) + if (!analyzedPlan.resolved) { + // We couldn't resolve, this means something is wrong with the schemas. + analyzedPlan match { + case p @ Project(projectList, _) => { + p.projectList.foreach { e: NamedExpression => e.foreach { _ match { + case u @ UpCast(child, dataType, walkedTypePath) => { + throw new AnalysisException(s"Cannot resolve `${child.prettyString}`. " + + "The type path of the target object is:\n" + + walkedTypePath.mkString("", "\n", "\n") + + "Ensure that the input schema contains this field.\n - Input schema is " + + schema.map(_.simpleString).mkString(", ")) + } + case _ => + } + }} + }} + throw new AnalysisException(s"Cannot resolve. Ensure the input and output schemas match.\n" + + " - Input schema = " + schema.map(_.simpleString).mkString(", ") + "\n" + + " - Target schema = " + this.schema.toAttributes.map(_.simpleString).mkString(", ")) + } + val optimizedPlan = SimplifyCasts(analyzedPlan) // In order to construct instances of inner classes (for example those declared in a REPL cell), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 542e4d6c43b9..6922b1125e4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -481,10 +481,17 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = Seq(2 -> 2.toByte, 3 -> 3.toByte).toDF("a", "b").as[ClassData] assert(ds.collect().toSeq == Seq(ClassData("2", 2), ClassData("3", 3))) } -} + test("verify mismatching field names fail with a good error") { + val ds = Seq(ClassData("a", 1)).toDS() + intercept[AnalysisException] { + ds.as[ClassData2].collect() + } + } +} case class ClassData(a: String, b: Int) +case class ClassData2(c: String, d: Int) case class ClassNullableData(a: String, b: Integer) /** From a78ad65c4c238bd6e7a880cfad8bc14d2241404a Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 14 Dec 2015 13:30:51 -0800 Subject: [PATCH 2/3] Fix the check to be more specific. --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index a6b186d06772..298eeb5abe87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -252,8 +252,9 @@ case class ExpressionEncoder[T]( val plan = Project(Alias(unbound, "")() :: Nil, LocalRelation(schema)) val analyzedPlan = SimpleAnalyzer.execute(plan) if (!analyzedPlan.resolved) { - // We couldn't resolve, this means something is wrong with the schemas. analyzedPlan match { + // Looked for UpCasts. If these still exist, it indicates they could not resolve and + // something about the schemas is mismatched. case p @ Project(projectList, _) => { p.projectList.foreach { e: NamedExpression => e.foreach { _ match { case u @ UpCast(child, dataType, walkedTypePath) => { @@ -261,15 +262,12 @@ case class ExpressionEncoder[T]( "The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + "Ensure that the input schema contains this field.\n - Input schema is " + - schema.map(_.simpleString).mkString(", ")) + schema.map { f => f.name + ": " + f.dataType }.mkString(", ")) } case _ => } }} }} - throw new AnalysisException(s"Cannot resolve. Ensure the input and output schemas match.\n" + - " - Input schema = " + schema.map(_.simpleString).mkString(", ") + "\n" + - " - Target schema = " + this.schema.toAttributes.map(_.simpleString).mkString(", ")) } val optimizedPlan = SimplifyCasts(analyzedPlan) From b5007d7c01287ef578d1153dd2123fbb869cfe1d Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 15 Dec 2015 11:43:20 -0800 Subject: [PATCH 3/3] Change to catch this using CheckAnalysis. Fixed a type erasure issue with wrap option. --- .../spark/sql/catalyst/ScalaReflection.scala | 2 +- .../catalyst/encoders/ExpressionEncoder.scala | 20 +------------------ .../sql/catalyst/expressions/objects.scala | 12 ++++++----- .../org/apache/spark/sql/DatasetSuite.scala | 3 ++- 4 files changed, 11 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 9013fd050b5f..ecff8605706d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -184,7 +184,7 @@ object ScalaReflection extends ScalaReflection { val TypeRef(_, _, Seq(optType)) = t val className = getClassNameFromType(optType) val newTypePath = s"""- option value class: "$className"""" +: walkedTypePath - WrapOption(constructorFor(optType, path, newTypePath)) + WrapOption(constructorFor(optType, path, newTypePath), dataTypeFor(optType)) case t if t <:< localTypeOf[java.lang.Integer] => val boxedType = classOf[java.lang.Integer] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 298eeb5abe87..363178b0e21a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -251,25 +251,7 @@ case class ExpressionEncoder[T]( val plan = Project(Alias(unbound, "")() :: Nil, LocalRelation(schema)) val analyzedPlan = SimpleAnalyzer.execute(plan) - if (!analyzedPlan.resolved) { - analyzedPlan match { - // Looked for UpCasts. If these still exist, it indicates they could not resolve and - // something about the schemas is mismatched. - case p @ Project(projectList, _) => { - p.projectList.foreach { e: NamedExpression => e.foreach { _ match { - case u @ UpCast(child, dataType, walkedTypePath) => { - throw new AnalysisException(s"Cannot resolve `${child.prettyString}`. " + - "The type path of the target object is:\n" + - walkedTypePath.mkString("", "\n", "\n") + - "Ensure that the input schema contains this field.\n - Input schema is " + - schema.map { f => f.name + ": " + f.dataType }.mkString(", ")) - } - case _ => - } - }} - }} - } - + SimpleAnalyzer.checkAnalysis(analyzedPlan) val optimizedPlan = SimplifyCasts(analyzedPlan) // In order to construct instances of inner classes (for example those declared in a REPL cell), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index 96bc4fe67a98..10ec75eca37f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -23,11 +23,9 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.serializer._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer -import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation} -import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} +import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.types._ /** @@ -295,13 +293,17 @@ case class UnwrapOption( * Converts the result of evaluating `child` into an option, checking both the isNull bit and * (in the case of reference types) equality with null. * @param child The expression to evaluate and wrap. + * @param optType The type of this option. */ -case class WrapOption(child: Expression) extends UnaryExpression { +case class WrapOption(child: Expression, optType: DataType) + extends UnaryExpression with ExpectsInputTypes { override def dataType: DataType = ObjectType(classOf[Option[_]]) override def nullable: Boolean = true + override def inputTypes: Seq[AbstractDataType] = optType :: Nil + override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 6922b1125e4f..8f8db318261d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -484,9 +484,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("verify mismatching field names fail with a good error") { val ds = Seq(ClassData("a", 1)).toDS() - intercept[AnalysisException] { + val e = intercept[AnalysisException] { ds.as[ClassData2].collect() } + assert(e.getMessage.contains("cannot resolve 'c' given input columns a, b"), e.getMessage) } }