diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index fb434f488361..3b409fa2f6a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE @@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Tabl import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource import org.apache.spark.sql.internal.SQLConf @@ -577,15 +578,23 @@ class MicroBatchExecution( // For v1 sources. case StreamingExecutionRelation(source, output) => newData.get(source).map { dataPlan => + val hasFileMetadata = output.exists { + case FileSourceMetadataAttribute(_) => true + case _ => false + } + val finalDataPlan = dataPlan match { + case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns() + case _ => dataPlan + } val maxFields = SQLConf.get.maxToStringFields - assert(output.size == dataPlan.output.size, + assert(output.size == finalDataPlan.output.size, s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " + - s"${truncatedString(dataPlan.output, ",", maxFields)}") + s"${truncatedString(finalDataPlan.output, ",", maxFields)}") - val aliases = output.zip(dataPlan.output).map { case (to, from) => + val aliases = output.zip(finalDataPlan.output).map { case (to, from) => Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata)) } - Project(aliases, dataPlan) + Project(aliases, finalDataPlan) }.getOrElse { LocalRelation(output, isStreaming = true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 5d4b811defee..00962a4f4cdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -21,12 +21,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat} object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { @@ -43,7 +43,7 @@ object StreamingRelation { * passing to [[StreamExecution]] to run a query. */ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute]) - extends LeafNode with MultiInstanceRelation { + extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns { override def isStreaming: Boolean = true override def toString: String = sourceName @@ -56,6 +56,31 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: ) override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance())) + + override lazy val metadataOutput: Seq[AttributeReference] = { + dataSource.providingClass match { + // If the dataSource provided class is a same or subclass of FileFormat class + case f if classOf[FileFormat].isAssignableFrom(f) => + val resolve = conf.resolver + val outputNames = outputSet.map(_.name) + def isOutputColumn(col: AttributeReference): Boolean = { + outputNames.exists(name => resolve(col.name, name)) + } + // filter out the metadata struct column if it has the name conflicting with output columns. + // if the file has a column "_metadata", + // then the data column should be returned not the metadata struct column + Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn) + case _ => Nil + } + } + + override def withMetadataColumns(): LogicalPlan = { + if (metadataOutput.nonEmpty) { + this.copy(output = output ++ metadataOutput) + } else { + this + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 175b42083f26..410fc985dd3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } } + + metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) => + withTempDir { dir => + df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data") + + val stream = spark.readStream.format("json") + .schema(schema) + .load(dir.getCanonicalPath + "/source/new-streaming-data") + .select("*", "_metadata") + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .start(dir.getCanonicalPath + "/target/new-streaming-data") + + stream.processAllAvailable() + stream.stop() + + val newDF = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data") + + val sourceFile = new File(dir, "/source/new-streaming-data").listFiles() + .filter(_.getName.endsWith(".json")).head + val sourceFileMetadata = Map( + METADATA_FILE_PATH -> sourceFile.toURI.toString, + METADATA_FILE_NAME -> sourceFile.getName, + METADATA_FILE_SIZE -> sourceFile.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified()) + ) + + // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data + assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata")) + // Verify the data is expected + checkAnswer( + newDF.select(col("name"), col("age"), col("info"), + col(METADATA_FILE_PATH), col(METADATA_FILE_NAME), + // since we are writing _metadata to a json file, + // we should explicitly cast the column to timestamp type + col(METADATA_FILE_SIZE), to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))), + Seq( + Row( + "jack", 24, Row(12345L, "uom"), + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)), + Row( + "lily", 31, Row(54321L, "ucb"), + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + } + } }