diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index f7ca1eae0ef2..621b652f36bb 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2959,6 +2959,11 @@ "Unsupported dtype: . Valid values: float64, float32." ] }, + "EXTENSION" : { + "message" : [ + "Invalid extension: . Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)" + ] + }, "INTEGER" : { "message" : [ "expects an integer literal, but got ." diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 97a7065e0598..8008bc562082 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -60,6 +60,12 @@ Data source options of CSV can be set via: Sets a separator for each field and value. This separator can be one or more characters. read/write + + extension + csv + Sets the file extension for the output files. Limited to letters. Length must equal 3. + write + encoding
charset UTF-8 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 5a23d6f7a3cc..6c68bc1aa589 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -103,6 +103,16 @@ class CSVOptions( val delimiter = CSVExprUtils.toDelimiterStr( parameters.getOrElse(SEP, parameters.getOrElse(DELIMITER, ","))) + + val extension = { + val ext = parameters.getOrElse(EXTENSION, "csv") + if (ext.size != 3 && !ext.forall(_.isLetter)) { + throw QueryExecutionErrors.invalidFileExtensionError(EXTENSION, ext) + } + + ext + } + val parseMode: ParseMode = parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode) val charset = parameters.get(ENCODING).orElse(parameters.get(CHARSET)) @@ -385,6 +395,7 @@ object CSVOptions extends DataSourceOptions { val NEGATIVE_INF = newOption("negativeInf") val TIME_ZONE = newOption("timeZone") val UNESCAPED_QUOTE_HANDLING = newOption("unescapedQuoteHandling") + val EXTENSION = newOption("extension") // Options with alternative val ENCODING = "encoding" val CHARSET = "charset" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index e500f5e3cbd7..1ae2e5445c0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2786,6 +2786,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE Map.empty ) + def invalidFileExtensionError(functionName: String, extension: String): RuntimeException = { + new SparkIllegalArgumentException( + errorClass = "INVALID_PARAMETER_VALUE.EXTENSION", + messageParameters = Map( + "functionName" -> toSQLId(functionName), + "parameter" -> toSQLId("extension"), + "fileExtension" -> toSQLId(extension), + "acceptable" -> "Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)")) + } + def invalidCharsetError(functionName: String, charset: String): RuntimeException = { new SparkIllegalArgumentException( errorClass = "INVALID_PARAMETER_VALUE.CHARSET", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 8ef85ee91aa8..b2b99e2d0f4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -86,7 +86,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } override def getFileExtension(context: TaskAttemptContext): String = { - ".csv" + CodecStreams.getCompressionExtension(context) + "." + csvOptions.extension + CodecStreams.getCompressionExtension(context) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala index f38a1d385a39..7011fea77d88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala @@ -58,7 +58,7 @@ case class CSVWrite( } override def getFileExtension(context: TaskAttemptContext): String = { - ".csv" + CodecStreams.getCompressionExtension(context) + "." + csvOptions.extension + CodecStreams.getCompressionExtension(context) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7cacd8ea2dc5..850e887ac8e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3078,6 +3078,23 @@ abstract class CSVSuite } } + test("SPARK-50616: We can write with a tsv file extension") { + withTempPath { path => + val input = Seq( + "1423-11-12T23:41:00", + "1765-03-28", + "2016-01-28T20:00:00" + ).toDF().repartition(1) + input.write.option("extension", "tsv").csv(path.getAbsolutePath) + + val files = Files.list(path.toPath) + .iterator().asScala.map(x => x.getFileName.toString) + .toList.filter(x => x.takeRight(3).equals("tsv")) + + assert(files.size == 1) + } + } + test("SPARK-39904: Parse incorrect timestamp values") { withTempPath { path => Seq( @@ -3308,7 +3325,7 @@ abstract class CSVSuite } test("SPARK-40667: validate CSV Options") { - assert(CSVOptions.getAllOptions.size == 39) + assert(CSVOptions.getAllOptions.size == 40) // Please add validation on any new CSV options here assert(CSVOptions.isValidOption("header")) assert(CSVOptions.isValidOption("inferSchema")) @@ -3347,6 +3364,7 @@ abstract class CSVSuite assert(CSVOptions.isValidOption("compression")) assert(CSVOptions.isValidOption("codec")) assert(CSVOptions.isValidOption("sep")) + assert(CSVOptions.isValidOption("extension")) assert(CSVOptions.isValidOption("delimiter")) assert(CSVOptions.isValidOption("columnPruning")) // Please add validation on any new parquet options with alternative here