diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 8494aef004bb..655a61655e39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -45,7 +45,7 @@ class FileStreamSource( private val qualifiedBasePath: Path = { val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.makeQualified(new Path(path)) // can contains glob patterns + fs.makeQualified(new Path(path)) // can contain glob patterns } private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3ca6feac05ce..5d18194ee90f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -408,16 +408,38 @@ class StreamExecution( case StreamingExecutionRelation(source, output) => newData.get(source).map { data => val newPlan = data.logicalPlan - assert(output.size == newPlan.output.size, - s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(newPlan.output, ",")}") - replacements ++= output.zip(newPlan.output) + val newPlanOutput = newPlan.output + output.foreach { out => + val outputInNewPlan = newPlanOutput.find { newPlanOut => + // we can't use semanticEquals here because `AttributeReference.semanticEquals` + // checks the equality of expression id's, but we're guaranteed that they will be + // different here + out.name.toLowerCase == newPlanOut.name.toLowerCase && + // the line below means that we don't support schema evolution for now + out.dataType == newPlanOut.dataType + }.getOrElse { + val availableColumns = newPlan.output.map(a => s"${a.name} of ${a.dataType}") + throw new AnalysisException( + s""" + |Batch does not have expected schema + |Missing Column: ${out.name} of ${out.dataType} + |Available Columns: ${availableColumns.mkString(", ")} + | + |== Original == + |$logicalPlan + | + |== Batch == + |$newPlan + """.stripMargin + ) + } + replacements += out -> outputInNewPlan + } newPlan }.getOrElse { LocalRelation(output) } } - // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b365af76c379..f47dc76b99b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1009,6 +1009,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1")) assert(options.maxFilesPerTrigger == Some(1)) } + + test("SPARK-18407: Prune inferred partition columns from execution if not specified in schema") { + withTempDirs { case (src, tmp) => + spark.range(4).select('id, 'id % 4 as 'part).coalesce(1).write + .partitionBy("part") + .mode("overwrite") + .parquet(src.toString) + val sdf = spark.readStream + .schema(StructType(Seq(StructField("id", LongType)))) // omit 'part' + .format("parquet") + .load(src.toString) + try { + val sq = sdf.writeStream + .queryName("assertion_test") + .format("memory") + .start() + // used to throw an assertion error + sq.processAllAvailable() + checkAnswer( + spark.table("assertion_test"), + Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil + ) + } finally { + spark.streams.active.foreach(_.stop()) + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest {