Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FileStreamSource(

private val qualifiedBasePath: Path = {
val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.makeQualified(new Path(path)) // can contains glob patterns
fs.makeQualified(new Path(path)) // can contain glob patterns
}

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,38 @@ class StreamExecution(
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
val newPlanOutput = newPlan.output
output.foreach { out =>
val outputInNewPlan = newPlanOutput.find { newPlanOut =>
// we can't use semanticEquals here because `AttributeReference.semanticEquals`
// checks the equality of expression id's, but we're guaranteed that they will be
// different here
out.name.toLowerCase == newPlanOut.name.toLowerCase &&
// the line below means that we don't support schema evolution for now
out.dataType == newPlanOut.dataType
}.getOrElse {
val availableColumns = newPlan.output.map(a => s"${a.name} of ${a.dataType}")
throw new AnalysisException(
s"""
|Batch does not have expected schema
|Missing Column: ${out.name} of ${out.dataType}
|Available Columns: ${availableColumns.mkString(", ")}
|
|== Original ==
|$logicalPlan
|
|== Batch ==
|$newPlan
""".stripMargin
)
}
replacements += out -> outputInNewPlan
}
newPlan
}.getOrElse {
LocalRelation(output)
}
}

// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1))
}

test("SPARK-18407: Prune inferred partition columns from execution if not specified in schema") {
withTempDirs { case (src, tmp) =>
spark.range(4).select('id, 'id % 4 as 'part).coalesce(1).write
.partitionBy("part")
.mode("overwrite")
.parquet(src.toString)
val sdf = spark.readStream
.schema(StructType(Seq(StructField("id", LongType)))) // omit 'part'
.format("parquet")
.load(src.toString)
try {
val sq = sdf.writeStream
.queryName("assertion_test")
.format("memory")
.start()
// used to throw an assertion error
sq.processAllAvailable()
checkAnswer(
spark.table("assertion_test"),
Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil
)
} finally {
spark.streams.active.foreach(_.stop())
}
}
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down