Skip to content
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see

- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.

## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,18 @@ object FileFormatWriter extends Logging {
global = false,
child = plan).execute()
}
val ret = new Array[WriteTaskResult](rdd.partitions.length)

// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
rdd
}

val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
sparkSession.sparkContext.runJob(
rdd,
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
Expand All @@ -202,7 +211,7 @@ object FileFormatWriter extends Logging {
committer,
iterator = iter)
},
0 until rdd.partitions.length,
rddWithNonEmptyPartitions.partitions.indices,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -89,6 +90,23 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

Seq("orc", "parquet").foreach { format =>
test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") {
withTempPath { outputPath =>
val df = spark.emptyDataFrame.select(lit(1).as("i"))
df.write.format(format).save(outputPath.toString)
val partFiles = outputPath.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 1)

// Now read the file.
val df1 = spark.read.format(format).load(outputPath.toString)
checkAnswer(df1, Seq.empty[Row])
assert(df1.schema.equals(df.schema.asNullable))
}
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
intercept[AnalysisException] {
spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path)
}
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it fail? If it's a runtime error we should fail earlier during analysis. This worth a new JIRA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I forgot :-) I will double check and get back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan It fails in the executor like this -

org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message spark_schema {
}

	at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
	at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
	at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
	at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:225)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:376)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:387)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:276)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:281)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:206)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me study the code to see how we can fail earlier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open a JIRA. We can fix it in another PR.

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan OK Wenchen. Created SPARK-23372 - FYI

}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not legal to write an empty struct in parquet. Its explained by Herman in SPARK-20593. Previously, we didn't setup a write
task for this where as now with this fix we do.

}

Expand Down