Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap}

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
Expand All @@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Tabl
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsTriggerAvailableNow}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -577,15 +578,23 @@ class MicroBatchExecution(
// For v1 sources.
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}
val finalDataPlan = dataPlan match {
case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns()
case _ => dataPlan
}
val maxFields = SQLConf.get.maxToStringFields
assert(output.size == dataPlan.output.size,
assert(output.size == finalDataPlan.output.size,
s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
s"${truncatedString(dataPlan.output, ",", maxFields)}")
s"${truncatedString(finalDataPlan.output, ",", maxFields)}")

val aliases = output.zip(dataPlan.output).map { case (to, from) =>
val aliases = output.zip(finalDataPlan.output).map { case (to, from) =>
Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata))
}
Project(aliases, dataPlan)
Project(aliases, finalDataPlan)
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat}

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
Expand All @@ -43,7 +43,7 @@ object StreamingRelation {
* passing to [[StreamExecution]] to run a query.
*/
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
extends LeafNode with MultiInstanceRelation {
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
override def isStreaming: Boolean = true
override def toString: String = sourceName

Expand All @@ -56,6 +56,31 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))

override lazy val metadataOutput: Seq[AttributeReference] = {
dataSource.providingClass match {
// If the dataSource provided class is a same or subclass of FileFormat class
case f if classOf[FileFormat].isAssignableFrom(f) =>
val resolve = conf.resolver
val outputNames = outputSet.map(_.name)
def isOutputColumn(col: AttributeReference): Boolean = {
outputNames.exists(name => resolve(col.name, name))
}
// filter out the metadata struct column if it has the name conflicting with output columns.
// if the file has a column "_metadata",
// then the data column should be returned not the metadata struct column
Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
case _ => Nil
}
}

override def withMetadataColumns(): LogicalPlan = {
if (metadataOutput.nonEmpty) {
this.copy(output = output ++ metadataOutput)
} else {
this
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
)
}
}

metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
withTempDir { dir =>
df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")

val stream = spark.readStream.format("json")
.schema(schema)
.load(dir.getCanonicalPath + "/source/new-streaming-data")
.select("*", "_metadata")
.writeStream.format("json")
.option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
.start(dir.getCanonicalPath + "/target/new-streaming-data")

stream.processAllAvailable()
stream.stop()

val newDF = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data")

val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
.filter(_.getName.endsWith(".json")).head
val sourceFileMetadata = Map(
METADATA_FILE_PATH -> sourceFile.toURI.toString,
METADATA_FILE_NAME -> sourceFile.getName,
METADATA_FILE_SIZE -> sourceFile.length(),
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
)

// SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yaohua628

Sorry for the post review. I haven't had a time to review this in time.

Just to make clear, select("*").show() should not expose a hidden column, right? Since you've included "_metadata" from the list of columns so I would like to double confirm that it is not user facing.

And given we include the new column, dropDuplicate without explicitly mentioning columns in streaming query would be broken. state schema would somehow include the hidden column in the schema, whereas state schema from older version of checkpoint does not include the hidden column (as they didn't exist).

We should test it, and if it fall into the case, we should mention this in the migration guide, or make this configurable and by default turn off. (We did this for adding Kafka header - #22282)

cc. @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, select("*") won't expose the hidden file metadata column.

But here what I did is readStream and explicitly selecting * and _metadata (here) and writeStream to a target table /target/new-streaming-data, so the target table itself will have a column called _metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK my bad. That is just checking the output.

dropDuplicate() still remains a question. How we deal with this? We removed the column by default for Kafka header to not break compatibility with dropDuplicate(), but not sure we would like to add more config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! I can test with dropDuplicates() and see how it goes

// Verify the data is expected
checkAnswer(
newDF.select(col("name"), col("age"), col("info"),
col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
// since we are writing _metadata to a json file,
// we should explicitly cast the column to timestamp type
col(METADATA_FILE_SIZE), to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
Seq(
Row(
"jack", 24, Row(12345L, "uom"),
sourceFileMetadata(METADATA_FILE_PATH),
sourceFileMetadata(METADATA_FILE_NAME),
sourceFileMetadata(METADATA_FILE_SIZE),
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
Row(
"lily", 31, Row(54321L, "ucb"),
sourceFileMetadata(METADATA_FILE_PATH),
sourceFileMetadata(METADATA_FILE_NAME),
sourceFileMetadata(METADATA_FILE_SIZE),
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
)
)
}
}
}