Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,21 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:

override def dataType: DataType = schema

override def checkInputDataTypes(): TypeCheckResult = {
if (StringType.acceptsType(child.dataType)) {
try {
JacksonUtils.verifySchema(schema)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we don't have to worry about parsing mode, mode because from_json produces null with the default parse mode, FAILFAST.

TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
} else {
TypeCheckResult.TypeCheckFailure(
s"$prettyName requires that the expression is a string expression.")
}
}

override def nullSafeEval(json: Any): Any = {
try parser.parse(json.toString).head catch {
case _: SparkSQLJsonProcessingException => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object JacksonUtils {

case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.simpleString} to JSON.")
s"Unsupported type ${dataType.simpleString} of column $name in JSON conversion.")
}

schema.foreach(field => verifyType(field.name, field.dataType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{JacksonParser, JacksonUtils, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
Expand Down Expand Up @@ -329,6 +329,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
columnNameOfCorruptRecord,
parsedOptions)
}
if (parsedOptions.failFast) {
// We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE"
// mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED"
// mode, it drops records only containing non-null values in unsupported types.
JacksonUtils.verifySchema(schema)
}
val parsed = jsonRDD.mapPartitions { iter =>
val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions)
iter.flatMap(parser.parse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

if (csvOptions.failFast) {
// We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE"
// mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED"
// mode, it drops records only containing non-null values in unsupported types. We should use
// `requiredSchema` instead of whole schema `dataSchema` here to not to break the original
// behaviour.
verifySchema(requiredSchema)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, it only checks projected columns for not changing the existing behaviour (we are not really checking the other columns when parsing already).

}

(file: PartitionedFile) => {
val lineIterator = {
val conf = broadcastedHadoopConf.value.value
Expand Down Expand Up @@ -223,15 +232,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {

private def verifySchema(schema: StructType): Unit = {
def verifyType(dataType: DataType): Unit = dataType match {
case ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | BooleanType | _: DecimalType | TimestampType |
DateType | StringType =>
case ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | BooleanType | _: DecimalType | TimestampType |
DateType | StringType =>

case udt: UserDefinedType[_] => verifyType(udt.sqlType)
case udt: UserDefinedType[_] => verifyType(udt.sqlType)

case _ =>
throw new UnsupportedOperationException(
s"CSV data source does not support ${dataType.simpleString} data type.")
case _ =>
throw new UnsupportedOperationException(
s"CSV data source does not support ${dataType.simpleString} data type.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV currently throws UnsupportedOperation but text datasource throws AnalysisException. I just matched this to UnsupportedOperation. I am happy to match this to AnalysisException if anyone thinks so.

}

schema.foreach(field => verifyType(field.dataType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JacksonUtils, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
Expand Down Expand Up @@ -75,6 +75,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
JacksonUtils.verifySchema(dataSchema)
val conf = job.getConfiguration
val parsedOptions: JSONOptions = new JSONOptions(options)
parsedOptions.compressionCodec.foreach { codec =>
Expand Down Expand Up @@ -110,6 +111,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)

if (parsedOptions.failFast) {
// We can fail before starting to parse in case of "FAILFAST" mode. In case of "PERMISIVE"
// mode, it allows to read values as null for unsupported types. In case of "DROPMALFORMED"
// mode, it drops records only containing non-null values in unsupported types. We should use
// `requiredSchema` instead of whole schema `dataSchema` here to not to break the original
// behaviour.
JacksonUtils.verifySchema(requiredSchema)
}

(file: PartitionedFile) => {
val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {

private def verifySchema(schema: StructType): Unit = {
if (schema.size != 1) {
throw new AnalysisException(
throw new UnsupportedOperationException(
s"Text data source supports only a single column, and you have ${schema.size} columns.")
}
val tpe = schema(0).dataType
if (tpe != StringType) {
throw new AnalysisException(
throw new UnsupportedOperationException(
s"Text data source supports only a string column, but you have ${tpe.simpleString}.")
}
}
Expand Down Expand Up @@ -95,9 +95,10 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
assert(
requiredSchema.length <= 1,
"Text data source only produces a single data column named \"value\".")
if (requiredSchema.nonEmpty) {
// `requiredSchema` can be empty when the projected column is only the partitioned column.
verifySchema(requiredSchema)
}

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(null) :: Nil)
}

test("from_json unsupported type") {
val df = Seq("""{"a" 1}""").toDS()
val schema = new StructType().add("a", CalendarIntervalType)

val e = intercept[AnalysisException]{
// Unsupported type throws an exception
df.select(from_json($"value", schema)).collect()
}
assert(e.getMessage.contains(
"Unsupported type calendarinterval of column a in JSON conversion"))
}

test("to_json") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")

Expand All @@ -139,7 +151,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
df.select(to_json($"c")).collect()
}
assert(e.getMessage.contains(
"Unable to convert column a of type calendarinterval to JSON."))
"Unsupported type calendarinterval of column a in JSON conversion"))
}

test("roundtrip in to_json and from_json") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support array<double> data type"))
}
}

test("Unsupported types - DROPMALFORMED mode") {
val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)

withTempPath { path =>
spark.range(1).write.csv(path.getAbsolutePath)
val df = spark.read
.schema(schema)
.option("mode", "DROPMALFORMED")
.csv(path.getAbsolutePath)

assert(df.collect().isEmpty)
}

withTempPath { path =>
Seq(Tuple1("null")).toDF().write.csv(path.getAbsolutePath)
val nullDf = spark.read
.schema(schema)
.option("nullValue", "null")
.option("mode", "DROPMALFORMED")
.csv(path.getAbsolutePath)

// This succeeds to read null even thought it is unsupported.
checkAnswer(nullDf, Row(null))
}
}

msg = intercept[SparkException] {
test("Unsupported types - FAILFAST mode") {
withTempPath { path =>
val msg = intercept[UnsupportedOperationException] {
val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
spark.range(1).write.csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
spark.range(1).write.csv(path.getAbsolutePath)
spark.read.schema(schema).option("mode", "FAILFAST").csv(path.getAbsolutePath).collect()
}.getMessage

assert(msg.contains("CSV data source does not support array<double> data type"))
}
}

test("Unsupported types - PERMISSIVE mode") {
withTempDir { dir =>
// If the values are null, it is fine to read.
val schema = StructType(StructField("a",
StructType(StructField("b", StringType, true) :: Nil), true) :: Nil)
val path = s"${dir.getAbsolutePath}/tmp1"
Seq(Tuple1("null")).toDF().write.csv(path)
val df = spark.read
.schema(schema)
.option("nullValue", "null")
.option("mode", "PERMISSIVE")
.csv(path)

checkAnswer(df, Row(null))

// If the values are non-null and the type is unsupported, it throws an exception.
val msg = intercept[SparkException] {
val path = s"${dir.getAbsolutePath}/tmp2"
spark.range(1).write.csv(path)
spark.read.schema(schema).option("mode", "PERMISSIVE").csv(path).collect()
}.getCause.getMessage
assert(msg.contains("Unsupported type: array"))

assert(msg.contains("Unsupported type: struct"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {"))
}

test("Unsupported types: FAILFAST mode") {
val schema = StructType(
StructField("a", CalendarIntervalType, true) :: Nil)
val exceptionOne = intercept[UnsupportedOperationException] {
// Read JSON data from RDD
spark.read
.option("mode", "FAILFAST")
.schema(schema)
.json(corruptRecords)
.collect()
}

assert(exceptionOne.getMessage.contains(
"Unsupported type calendarinterval of column a in JSON conversion."))

val exceptionTwo = intercept[UnsupportedOperationException] {
// Read JSON data from files.
withTempDir { path =>
spark.read
.option("mode", "FAILFAST")
.schema(schema)
.format("json")
.load(path.getAbsolutePath)
.collect()
}
}

assert(exceptionTwo.getMessage.contains(
"Unsupported type calendarinterval of column a in JSON conversion."))
}

test("Corrupt records: DROPMALFORMED mode") {
val schemaOne = StructType(
StructField("a", StringType, true) ::
Expand Down Expand Up @@ -1082,6 +1113,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}

test("Unsupported types: DROPMALFORMED mode") {
val schema = StructType(
StructField("a", CalendarIntervalType, true) :: Nil)
val rdd = sparkContext.parallelize(Seq("""{"a": 1}"""))
// Read JSON data from RDD
val df = spark.read
.option("mode", "DROPMALFORMED")
.schema(schema)
.json(rdd)

assert(df.collect().isEmpty)

val nullRdd = sparkContext.parallelize(Seq("""{"a": null}"""))
// Read JSON data from RDD
val nullDf = spark.read
.option("mode", "DROPMALFORMED")
.schema(schema)
.json(nullRdd)

// This succeeds to read null even thought it is unsupported.
checkAnswer(nullDf, Row(null))

withTempPath { path =>
// Read JSON data from files.
rdd.saveAsTextFile(path.getAbsolutePath)
val df = spark.read
.option("mode", "DROPMALFORMED")
.schema(schema)
.format("json")
.load(path.getAbsolutePath)

assert(df.collect().isEmpty)
}
}

test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
withTempView("jsonTable") {
val schema = StructType(
Expand Down Expand Up @@ -1163,6 +1229,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("Unsupported types: PERMISSIVE mode") {
val schema = StructType(
StructField("a", CalendarIntervalType, true) :: Nil)
val rdd = sparkContext.parallelize(Seq("""{"a": 1}"""))
// Read JSON data from RDD
val df = spark.read
.option("mode", "PERMISSIVE")
.schema(schema)
.json(rdd)

checkAnswer(df, Row(null))

withTempPath { path =>
// Read JSON data from files.
rdd.saveAsTextFile(path.getAbsolutePath)
val df = spark.read
.option("mode", "PERMISSIVE")
.schema(schema)
.format("json")
.load(path.getAbsolutePath)

checkAnswer(df, Row(null))
}
}

test("SPARK-13953 Rename the corrupt record field via option") {
val jsonDF = spark.read
.option("columnNameOfCorruptRecord", "_malformed")
Expand Down
Loading