From cc5a9341e3f75ff9b2984a34cfd63d469e7252fd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 16:39:17 +0300 Subject: [PATCH 01/17] Add a test --- .../apache/spark/sql/CsvFunctionsSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 61f0e138cc35..b6247ff2a31d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -200,4 +200,23 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { assert(readback(0).getAs[Row](0).getAs[Date](0).getTime >= 0) } } + + test("support foldable schema by from_csv") { + val options = Map[String, String]().asJava + val schema = concat_ws(",", lit("i int"), lit("s string")) + checkAnswer( + Seq("""1,"a"""").toDS().select(from_csv($"value", schema, options)), + Row(Row(1, "a"))) + + val errMsg = intercept[AnalysisException] { + Seq(("1", "i int")).toDF("csv", "schema") + .select(from_csv($"csv", $"schema", options)).collect() + }.getMessage + assert(errMsg.contains("Schema should be specified in DDL format as a string literal")) + + val errMsg2 = intercept[AnalysisException] { + Seq("1").toDF("csv").select(from_csv($"csv", lit(1), options)).collect() + }.getMessage + assert(errMsg2.contains("must return a valid string")) + } } From 0495763c5f3ca0f3949273515be1ce80a23e9f69 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 16:39:49 +0300 Subject: [PATCH 02/17] Support foldable schema --- .../sql/catalyst/expressions/ExprUtils.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 3f3d6b2b63a0..1fbf5757e471 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -28,16 +28,18 @@ import org.apache.spark.unsafe.types.UTF8String object ExprUtils { def evalSchemaExpr(exp: Expression): StructType = { - // Use `DataType.fromDDL` since the type string can be struct<...>. - val dataType = exp match { - case Literal(s, StringType) => - DataType.fromDDL(s.toString) - case e @ SchemaOfCsv(_: Literal, _) => - val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] - DataType.fromDDL(ddlSchema.toString) - case e => throw new AnalysisException( + val dataType = if (exp.foldable) { + exp.eval() match { + case s: UTF8String => + // Use `DataType.fromDDL` since the type string can be struct<...>. + DataType.fromDDL(s.toString) + case _ => throw new AnalysisException( + s"The expression ${exp.sql} must return a valid string.") + } + } else { + throw new AnalysisException( "Schema should be specified in DDL format as a string literal or output of " + - s"the schema_of_csv function instead of ${e.sql}") + s"the schema_of_csv function instead of ${exp.sql}") } if (!dataType.isInstanceOf[StructType]) { @@ -48,7 +50,8 @@ object ExprUtils { } def evalTypeExpr(exp: Expression): DataType = exp match { - case Literal(s, StringType) => DataType.fromDDL(s.toString) + case Literal(s, StringType) => + DataType.fromDDL(s.toString) case e @ SchemaOfJson(_: Literal, _) => val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] DataType.fromDDL(ddlSchema.toString) From 3b11635bff551e590a8d72d4742f5fc2e91c1acd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 19:33:33 +0300 Subject: [PATCH 03/17] Check null --- .../org/apache/spark/sql/catalyst/expressions/ExprUtils.scala | 4 ++-- .../test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 1fbf5757e471..12f3d0701a35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -30,11 +30,11 @@ object ExprUtils { def evalSchemaExpr(exp: Expression): StructType = { val dataType = if (exp.foldable) { exp.eval() match { - case s: UTF8String => + case s: UTF8String if s != null => // Use `DataType.fromDDL` since the type string can be struct<...>. DataType.fromDDL(s.toString) case _ => throw new AnalysisException( - s"The expression ${exp.sql} must return a valid string.") + s"The expression '${exp.sql}' must be evaluated to a valid string.") } } else { throw new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index b6247ff2a31d..ca71ff43c832 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -217,6 +217,6 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val errMsg2 = intercept[AnalysisException] { Seq("1").toDF("csv").select(from_csv($"csv", lit(1), options)).collect() }.getMessage - assert(errMsg2.contains("must return a valid string")) + assert(errMsg2.contains("The expression '1' must be evaluated to a valid string")) } } From b5c9f71a61bf4777a86b2036769ba37bd94fe9bf Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 19:50:17 +0300 Subject: [PATCH 04/17] Re-gen csv-functions.sql.out --- .../src/test/resources/sql-tests/results/csv-functions.sql.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 8495bef9122e..497fe9b52e80 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -24,7 +24,7 @@ select from_csv('1', 1) struct<> -- !query output org.apache.spark.sql.AnalysisException -Schema should be specified in DDL format as a string literal or output of the schema_of_csv function instead of 1;; line 1 pos 7 +The expression '1' must be evaluated to a valid string.;; line 1 pos 7 -- !query From 109cb71b1ec36d3b20275f82cad2a10d34292688 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 19:23:48 +0300 Subject: [PATCH 05/17] Add a test --- .../apache/spark/sql/JsonFunctionsSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index fd1e9e309558..eee005fcc5af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -313,7 +313,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val errMsg1 = intercept[AnalysisException] { df3.selectExpr("from_json(value, 1)") } - assert(errMsg1.getMessage.startsWith("Schema should be specified in DDL format as a string")) + assert(errMsg1.getMessage.startsWith("The expression '1' must be evaluated to a valid string")) val errMsg2 = intercept[AnalysisException] { df3.selectExpr("""from_json(value, 'time InvalidType')""") } @@ -653,4 +653,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { assert(json_tuple_result === len) } } + + test("support foldable schema by from_json") { + val options = Map[String, String]().asJava + val schema = regexp_replace(lit("dpt_org_id INT, dpt_org_city STRING"), "dpt_org_", "") + checkAnswer( + Seq("""{"id":1,"city":"Moscow"}""").toDS().select(from_json($"value", schema, options)), + Row(Row(1, "Moscow"))) + + val errMsg = intercept[AnalysisException] { + Seq(("""{"i":1}""", "i int")).toDF("json", "schema") + .select(from_json($"json", $"schema", options)).collect() + }.getMessage + assert(errMsg.contains("Schema should be specified in DDL format as a string literal")) + } } From d28d0197759900cc5b3e3d801673546d2e18850d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 19:24:12 +0300 Subject: [PATCH 06/17] Support foldable expr --- .../sql/catalyst/expressions/ExprUtils.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 12f3d0701a35..a361076489ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -49,15 +49,18 @@ object ExprUtils { dataType.asInstanceOf[StructType] } - def evalTypeExpr(exp: Expression): DataType = exp match { - case Literal(s, StringType) => - DataType.fromDDL(s.toString) - case e @ SchemaOfJson(_: Literal, _) => - val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] - DataType.fromDDL(ddlSchema.toString) - case e => throw new AnalysisException( - "Schema should be specified in DDL format as a string literal or output of " + - s"the schema_of_json function instead of ${e.sql}") + def evalTypeExpr(exp: Expression): DataType = { + if (exp.foldable) { + exp.eval() match { + case s: UTF8String if s != null => DataType.fromDDL(s.toString) + case _ => throw new AnalysisException( + s"The expression '${exp.sql}' must be evaluated to a valid string.") + } + } else { + throw new AnalysisException( + "Schema should be specified in DDL format as a string literal or output of " + + s"the schema_of_json function instead of ${exp.sql}") + } } def convertToMapData(exp: Expression): Map[String, String] = exp match { From 3a9cf3f13c911c72a7f593d510906782a68b82e1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 19:43:15 +0300 Subject: [PATCH 07/17] Re-gen json-functions.sql.out --- .../src/test/resources/sql-tests/results/json-functions.sql.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 21a3531caf73..2e1e374bd913 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -115,7 +115,7 @@ select from_json('{"a":1}', 1) struct<> -- !query output org.apache.spark.sql.AnalysisException -Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of 1;; line 1 pos 7 +The expression '1' must be evaluated to a valid string.;; line 1 pos 7 -- !query From 97287240d8eee5bccc3d71a253d99aaaad52ac0e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 22:39:32 +0300 Subject: [PATCH 08/17] Add a test --- .../scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index ca71ff43c832..bd8027270991 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -219,4 +219,11 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { }.getMessage assert(errMsg2.contains("The expression '1' must be evaluated to a valid string")) } + + test("schema_of_csv - infers the schema of foldable CSV string") { + val input = concat_ws(",", lit(0.1), lit(1)) + checkAnswer( + spark.range(1).select(schema_of_csv(input)), + Seq(Row("struct<_c0:double,_c1:int>"))) + } } From 10c0bc812dcccd7c617c0d7484938b5db0b1f788 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 3 Mar 2020 22:40:09 +0300 Subject: [PATCH 09/17] Support foldable input --- .../sql/catalyst/expressions/csvExpressions.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 54af314fe417..5140db90c595 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -165,10 +165,14 @@ case class SchemaOfCsv( @transient private lazy val csv = child.eval().asInstanceOf[UTF8String] - override def checkInputDataTypes(): TypeCheckResult = child match { - case Literal(s, StringType) if s != null => super.checkInputDataTypes() - case _ => TypeCheckResult.TypeCheckFailure( - s"The input csv should be a string literal and not null; however, got ${child.sql}.") + override def checkInputDataTypes(): TypeCheckResult = { + if (child.foldable && csv != null) { + super.checkInputDataTypes() + } else { + TypeCheckResult.TypeCheckFailure( + "The input csv should be a foldable string expression and not null; " + + s"however, got ${child.sql}.") + } } override def eval(v: InternalRow): Any = { From 2f889238a3ea5120f2b0f4b6d271180219a276a7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 4 Mar 2020 11:15:07 +0300 Subject: [PATCH 10/17] Re-gen csv-functions.sql.out --- .../test/resources/sql-tests/results/csv-functions.sql.out | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 497fe9b52e80..fd36737e0cc1 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -91,7 +91,7 @@ select schema_of_csv(null) struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a string literal and not null; however, got NULL.; line 1 pos 7 +cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got NULL.; line 1 pos 7 -- !query @@ -108,7 +108,7 @@ SELECT schema_of_csv(csvField) FROM csvTable struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a string literal and not null; however, got csvtable.`csvField`.; line 1 pos 7 +cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got csvtable.`csvField`.; line 1 pos 7 -- !query From e06da2a3832b3eb3b056fed11d8ac07dbde75dda Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 4 Mar 2020 21:46:52 +0300 Subject: [PATCH 11/17] Add a test --- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index eee005fcc5af..6b73eda39310 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -667,4 +667,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { }.getMessage assert(errMsg.contains("Schema should be specified in DDL format as a string literal")) } + + test("schema_of_json - infers the schema of foldable JSON string") { + val input = regexp_replace(lit("""{"item_id": 1, "item_price": 0.1}"""), "item_", "") + checkAnswer( + spark.range(1).select(schema_of_json(input)), + Seq(Row("struct"))) + } } From c0868e445da4609453432210249c7780f303adae Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 4 Mar 2020 21:47:05 +0300 Subject: [PATCH 12/17] Support foldable input --- .../sql/catalyst/expressions/jsonExpressions.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 61afdb6c9492..4fcce95f50df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -764,10 +764,14 @@ case class SchemaOfJson( @transient private lazy val json = child.eval().asInstanceOf[UTF8String] - override def checkInputDataTypes(): TypeCheckResult = child match { - case Literal(s, StringType) if s != null => super.checkInputDataTypes() - case _ => TypeCheckResult.TypeCheckFailure( - s"The input json should be a string literal and not null; however, got ${child.sql}.") + override def checkInputDataTypes(): TypeCheckResult = { + if (child.foldable && json != null) { + super.checkInputDataTypes() + } else { + TypeCheckResult.TypeCheckFailure( + "The input csv should be a foldable string expression and not null; " + + s"however, got ${child.sql}.") + } } override def eval(v: InternalRow): Any = { From 35492f60f9b3775b4bd5b8d9cd3ae145d0d43577 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 4 Mar 2020 23:22:16 +0300 Subject: [PATCH 13/17] Re-gen json-functions.sql.out --- .../test/resources/sql-tests/results/json-functions.sql.out | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 2e1e374bd913..ce7648c26231 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -326,7 +326,7 @@ select schema_of_json(null) struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7 +cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got NULL.; line 1 pos 7 -- !query @@ -343,7 +343,7 @@ SELECT schema_of_json(jsonField) FROM jsonTable struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7 +cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got jsontable.`jsonField`.; line 1 pos 7 -- !query From 715ea1e269ca4907a93b99193f6d8aafe681a13c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 4 Mar 2020 23:26:17 +0300 Subject: [PATCH 14/17] Bug fix --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../test/resources/sql-tests/results/json-functions.sql.out | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 4fcce95f50df..aa4b464850f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -769,7 +769,7 @@ case class SchemaOfJson( super.checkInputDataTypes() } else { TypeCheckResult.TypeCheckFailure( - "The input csv should be a foldable string expression and not null; " + + "The input json should be a foldable string expression and not null; " + s"however, got ${child.sql}.") } } diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index ce7648c26231..57dc7c55ab8c 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -326,7 +326,7 @@ select schema_of_json(null) struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got NULL.; line 1 pos 7 +cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got NULL.; line 1 pos 7 -- !query @@ -343,7 +343,7 @@ SELECT schema_of_json(jsonField) FROM jsonTable struct<> -- !query output org.apache.spark.sql.AnalysisException -cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got jsontable.`jsonField`.; line 1 pos 7 +cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got jsontable.`jsonField`.; line 1 pos 7 -- !query From cd93715861f58035e61fc240ff5eca590757b5b9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 5 Mar 2020 10:12:21 +0300 Subject: [PATCH 15/17] Refactoring --- .../sql/catalyst/expressions/ExprUtils.scala | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index a361076489ae..bb29ee9deff1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -27,21 +27,22 @@ import org.apache.spark.unsafe.types.UTF8String object ExprUtils { - def evalSchemaExpr(exp: Expression): StructType = { - val dataType = if (exp.foldable) { + def evalTypeExpr(exp: Expression): DataType = { + if (exp.foldable) { exp.eval() match { - case s: UTF8String if s != null => - // Use `DataType.fromDDL` since the type string can be struct<...>. - DataType.fromDDL(s.toString) + case s: UTF8String if s != null => DataType.fromDDL(s.toString) case _ => throw new AnalysisException( s"The expression '${exp.sql}' must be evaluated to a valid string.") } } else { throw new AnalysisException( "Schema should be specified in DDL format as a string literal or output of " + - s"the schema_of_csv function instead of ${exp.sql}") + s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}") } + } + def evalSchemaExpr(exp: Expression): StructType = { + val dataType = evalTypeExpr(exp) if (!dataType.isInstanceOf[StructType]) { throw new AnalysisException( s"Schema should be struct type but got ${dataType.sql}.") @@ -49,20 +50,6 @@ object ExprUtils { dataType.asInstanceOf[StructType] } - def evalTypeExpr(exp: Expression): DataType = { - if (exp.foldable) { - exp.eval() match { - case s: UTF8String if s != null => DataType.fromDDL(s.toString) - case _ => throw new AnalysisException( - s"The expression '${exp.sql}' must be evaluated to a valid string.") - } - } else { - throw new AnalysisException( - "Schema should be specified in DDL format as a string literal or output of " + - s"the schema_of_json function instead of ${exp.sql}") - } - } - def convertToMapData(exp: Expression): Map[String, String] = exp match { case m: CreateMap if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => From 874e17b70b8af45ba7968bbf4b979f3623b399f0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 5 Mar 2020 20:17:38 +0300 Subject: [PATCH 16/17] Address review comments --- .../org/apache/spark/sql/catalyst/expressions/ExprUtils.scala | 2 +- .../src/test/resources/sql-tests/results/csv-functions.sql.out | 2 +- .../src/test/resources/sql-tests/results/json-functions.sql.out | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index bb29ee9deff1..56bd3d7026d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -32,7 +32,7 @@ object ExprUtils { exp.eval() match { case s: UTF8String if s != null => DataType.fromDDL(s.toString) case _ => throw new AnalysisException( - s"The expression '${exp.sql}' must be evaluated to a valid string.") + s"The expression '${exp.sql}' is not a valid schema string.") } } else { throw new AnalysisException( diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index fd36737e0cc1..be7fa5e9d5ff 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -24,7 +24,7 @@ select from_csv('1', 1) struct<> -- !query output org.apache.spark.sql.AnalysisException -The expression '1' must be evaluated to a valid string.;; line 1 pos 7 +The expression '1' is not a valid schema string.;; line 1 pos 7 -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 57dc7c55ab8c..920b45a8fa77 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -115,7 +115,7 @@ select from_json('{"a":1}', 1) struct<> -- !query output org.apache.spark.sql.AnalysisException -The expression '1' must be evaluated to a valid string.;; line 1 pos 7 +The expression '1' is not a valid schema string.;; line 1 pos 7 -- !query From 87c2cf8f5d7bf1167ea32c3c39e84e075278f984 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 5 Mar 2020 22:41:33 +0300 Subject: [PATCH 17/17] Fix expected err in CsvFunctionsSuite and JsonFunctionsSuite --- .../src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index bd8027270991..54dfb4597b04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -217,7 +217,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val errMsg2 = intercept[AnalysisException] { Seq("1").toDF("csv").select(from_csv($"csv", lit(1), options)).collect() }.getMessage - assert(errMsg2.contains("The expression '1' must be evaluated to a valid string")) + assert(errMsg2.contains("The expression '1' is not a valid schema string")) } test("schema_of_csv - infers the schema of foldable CSV string") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 6b73eda39310..ebc2f57a984d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -313,7 +313,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val errMsg1 = intercept[AnalysisException] { df3.selectExpr("from_json(value, 1)") } - assert(errMsg1.getMessage.startsWith("The expression '1' must be evaluated to a valid string")) + assert(errMsg1.getMessage.startsWith("The expression '1' is not a valid schema string")) val errMsg2 = intercept[AnalysisException] { df3.selectExpr("""from_json(value, 'time InvalidType')""") }