diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index b61583d0dafb..79493265ec7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -496,6 +496,21 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: override def dataType: DataType = schema + override def checkInputDataTypes(): TypeCheckResult = { + if (StringType.acceptsType(child.dataType)) { + try { + JacksonUtils.verifySchema(schema) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } + } else { + TypeCheckResult.TypeCheckFailure( + s"$prettyName requires that the expression is a string expression.") + } + } + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).head catch { case _: SparkSQLJsonProcessingException => null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 3b23c6cd2816..1fe9351e507d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -50,7 +50,7 @@ object JacksonUtils { case _ => throw new UnsupportedOperationException( - s"Unable to convert column $name of type ${dataType.simpleString} to JSON.") + s"Unsupported type ${dataType.simpleString} of column $name in JSON conversion.") } schema.foreach(field => verifyType(field.name, field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a77937efd7e1..a2e553274901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JacksonParser, JacksonUtils, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ @@ -329,6 +329,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { columnNameOfCorruptRecord, parsedOptions) } + if (parsedOptions.failFast) { + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. + JacksonUtils.verifySchema(schema) + } val parsed = jsonRDD.mapPartitions { iter => val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions) iter.flatMap(parser.parse) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a3691158ee75..c4f2c352f769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -143,6 +143,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + if (csvOptions.failFast) { + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. We should use + // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original + // behaviour. + verifySchema(requiredSchema) + } + (file: PartitionedFile) => { val lineIterator = { val conf = broadcastedHadoopConf.value.value @@ -223,15 +232,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { private def verifySchema(schema: StructType): Unit = { def verifyType(dataType: DataType): Unit = dataType match { - case ByteType | ShortType | IntegerType | LongType | FloatType | - DoubleType | BooleanType | _: DecimalType | TimestampType | - DateType | StringType => + case ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | BooleanType | _: DecimalType | TimestampType | + DateType | StringType => - case udt: UserDefinedType[_] => verifyType(udt.sqlType) + case udt: UserDefinedType[_] => verifyType(udt.sqlType) - case _ => - throw new UnsupportedOperationException( - s"CSV data source does not support ${dataType.simpleString} data type.") + case _ => + throw new UnsupportedOperationException( + s"CSV data source does not support ${dataType.simpleString} data type.") } schema.foreach(field => verifyType(field.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 0e38aefecb67..851e55e78b2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JacksonUtils, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextOutputWriter @@ -75,6 +75,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + JacksonUtils.verifySchema(dataSchema) val conf = job.getConfiguration val parsedOptions: JSONOptions = new JSONOptions(options) parsedOptions.compressionCodec.foreach { codec => @@ -110,6 +111,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + if (parsedOptions.failFast) { + // We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE" + // mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED" + // mode, it drops records only containing non-null values in unsupported types. We should use + // `requiredSchema` instead of whole schema `dataSchema` here to not to break the original + // behaviour. + JacksonUtils.verifySchema(requiredSchema) + } + (file: PartitionedFile) => { val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 8e043960326d..7704e1fa6b87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -45,12 +45,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { private def verifySchema(schema: StructType): Unit = { if (schema.size != 1) { - throw new AnalysisException( + throw new UnsupportedOperationException( s"Text data source supports only a single column, and you have ${schema.size} columns.") } val tpe = schema(0).dataType if (tpe != StringType) { - throw new AnalysisException( + throw new UnsupportedOperationException( s"Text data source supports only a string column, but you have ${tpe.simpleString}.") } } @@ -95,9 +95,10 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - assert( - requiredSchema.length <= 1, - "Text data source only produces a single data column named \"value\".") + if (requiredSchema.nonEmpty) { + // `requiredSchema` can be empty when the projected column is only the partitioned column. + verifySchema(requiredSchema) + } val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 7d63d31d9b97..095ae138cb50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -123,6 +123,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(null) :: Nil) } + test("from_json unsupported type") { + val df = Seq("""{"a" 1}""").toDS() + val schema = new StructType().add("a", CalendarIntervalType) + + val e = intercept[AnalysisException]{ + // Unsupported type throws an exception + df.select(from_json($"value", schema)).collect() + } + assert(e.getMessage.contains( + "Unsupported type calendarinterval of column a in JSON conversion")) + } + test("to_json") { val df = Seq(Tuple1(Tuple1(1))).toDF("a") @@ -139,7 +151,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(to_json($"c")).collect() } assert(e.getMessage.contains( - "Unable to convert column a of type calendarinterval to JSON.")) + "Unsupported type calendarinterval of column a in JSON conversion")) } test("roundtrip in to_json and from_json") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 491ff72337a8..47a1999496d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -688,13 +688,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .write.csv(csvDir) }.getMessage assert(msg.contains("CSV data source does not support array data type")) + } + } + + test("Unsupported types - DROPMALFORMED mode") { + val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) + + withTempPath { path => + spark.range(1).write.csv(path.getAbsolutePath) + val df = spark.read + .schema(schema) + .option("mode", "DROPMALFORMED") + .csv(path.getAbsolutePath) + + assert(df.collect().isEmpty) + } + + withTempPath { path => + Seq(Tuple1("null")).toDF().write.csv(path.getAbsolutePath) + val nullDf = spark.read + .schema(schema) + .option("nullValue", "null") + .option("mode", "DROPMALFORMED") + .csv(path.getAbsolutePath) + + // This succeeds to read null even thought it is unsupported. + checkAnswer(nullDf, Row(null)) + } + } - msg = intercept[SparkException] { + test("Unsupported types - FAILFAST mode") { + withTempPath { path => + val msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) - spark.range(1).write.csv(csvDir) - spark.read.schema(schema).csv(csvDir).collect() + spark.range(1).write.csv(path.getAbsolutePath) + spark.read.schema(schema).option("mode", "FAILFAST").csv(path.getAbsolutePath).collect() + }.getMessage + + assert(msg.contains("CSV data source does not support array data type")) + } + } + + test("Unsupported types - PERMISSIVE mode") { + withTempDir { dir => + // If the values are null, it is fine to read. + val schema = StructType(StructField("a", + StructType(StructField("b", StringType, true) :: Nil), true) :: Nil) + val path = s"${dir.getAbsolutePath}/tmp1" + Seq(Tuple1("null")).toDF().write.csv(path) + val df = spark.read + .schema(schema) + .option("nullValue", "null") + .option("mode", "PERMISSIVE") + .csv(path) + + checkAnswer(df, Row(null)) + + // If the values are non-null and the type is unsupported, it throws an exception. + val msg = intercept[SparkException] { + val path = s"${dir.getAbsolutePath}/tmp2" + spark.range(1).write.csv(path) + spark.read.schema(schema).option("mode", "PERMISSIVE").csv(path).collect() }.getCause.getMessage - assert(msg.contains("Unsupported type: array")) + + assert(msg.contains("Unsupported type: struct")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 456052f79afc..9792056dbdf7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1055,6 +1055,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {")) } + test("Unsupported types: FAILFAST mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val exceptionOne = intercept[UnsupportedOperationException] { + // Read JSON data from RDD + spark.read + .option("mode", "FAILFAST") + .schema(schema) + .json(corruptRecords) + .collect() + } + + assert(exceptionOne.getMessage.contains( + "Unsupported type calendarinterval of column a in JSON conversion.")) + + val exceptionTwo = intercept[UnsupportedOperationException] { + // Read JSON data from files. + withTempDir { path => + spark.read + .option("mode", "FAILFAST") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + .collect() + } + } + + assert(exceptionTwo.getMessage.contains( + "Unsupported type calendarinterval of column a in JSON conversion.")) + } + test("Corrupt records: DROPMALFORMED mode") { val schemaOne = StructType( StructField("a", StringType, true) :: @@ -1082,6 +1113,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(jsonDFTwo.schema === schemaTwo) } + test("Unsupported types: DROPMALFORMED mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val rdd = sparkContext.parallelize(Seq("""{"a": 1}""")) + // Read JSON data from RDD + val df = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .json(rdd) + + assert(df.collect().isEmpty) + + val nullRdd = sparkContext.parallelize(Seq("""{"a": null}""")) + // Read JSON data from RDD + val nullDf = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .json(nullRdd) + + // This succeeds to read null even thought it is unsupported. + checkAnswer(nullDf, Row(null)) + + withTempPath { path => + // Read JSON data from files. + rdd.saveAsTextFile(path.getAbsolutePath) + val df = spark.read + .option("mode", "DROPMALFORMED") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + + assert(df.collect().isEmpty) + } + } + test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { withTempView("jsonTable") { val schema = StructType( @@ -1163,6 +1229,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Unsupported types: PERMISSIVE mode") { + val schema = StructType( + StructField("a", CalendarIntervalType, true) :: Nil) + val rdd = sparkContext.parallelize(Seq("""{"a": 1}""")) + // Read JSON data from RDD + val df = spark.read + .option("mode", "PERMISSIVE") + .schema(schema) + .json(rdd) + + checkAnswer(df, Row(null)) + + withTempPath { path => + // Read JSON data from files. + rdd.saveAsTextFile(path.getAbsolutePath) + val df = spark.read + .option("mode", "PERMISSIVE") + .schema(schema) + .format("json") + .load(path.getAbsolutePath) + + checkAnswer(df, Row(null)) + } + } + test("SPARK-13953 Rename the corrupt record field via option") { val jsonDF = spark.read .option("columnNameOfCorruptRecord", "_malformed") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index d11c2acb815d..84a38cd2ba5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -22,10 +22,10 @@ import java.io.File import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class TextSuite extends QueryTest with SharedSQLContext { @@ -51,16 +51,39 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("error handling for invalid schema") { - val tempFile = Utils.createTempDir() - tempFile.delete() - - val df = spark.range(2) - intercept[AnalysisException] { - df.write.text(tempFile.getCanonicalPath) + withTempPath { path => + var message = intercept[UnsupportedOperationException] { + spark.range(2).write.text(path.getCanonicalPath) + }.getMessage + assert(message.contains( + "Text data source supports only a string column, but you have bigint.")) + + message = intercept[UnsupportedOperationException] { + Seq(("a", "b")).toDF().write.text(path.getCanonicalPath) + }.getMessage + assert(message.contains( + "Text data source supports only a single column, and you have 2 columns.")) } - intercept[AnalysisException] { - spark.range(2).select(df("id"), df("id") + 1).write.text(tempFile.getCanonicalPath) + withTempDir { dir => + var message = intercept[UnsupportedOperationException] { + val path = s"${dir.getAbsolutePath}/text1" + val schema = StructType(StructField("a", LongType, true) :: Nil) + spark.range(1).write.text(path) + spark.read.schema(schema).text(path).collect() + }.getMessage + assert(message.contains( + "Text data source supports only a string column, but you have bigint.")) + + message = intercept[UnsupportedOperationException] { + val path = s"${dir.getAbsolutePath}/text2" + val schema = StructType( + StructField("a", StringType, true) :: StructField("b", StringType, true) :: Nil) + Seq(Tuple1("a")).toDF().write.text(path) + spark.read.schema(schema).text(path).collect() + }.getMessage + assert(message.contains( + "Text data source supports only a single column, and you have 2 columns.")) } }