From ca0bf68e7269bc74da923a2f228bdf43b1bc868c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 18 Nov 2016 11:55:42 -0800 Subject: [PATCH 1/7] save try fix fix --- .../streaming/FileStreamSource.scala | 2 +- .../execution/streaming/StreamExecution.scala | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 8494aef004bb5..655a61655e395 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -45,7 +45,7 @@ class FileStreamSource( private val qualifiedBasePath: Path = { val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.makeQualified(new Path(path)) // can contains glob patterns + fs.makeQualified(new Path(path)) // can contain glob patterns } private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3ca6feac05cef..ed57289f55ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -408,10 +408,27 @@ class StreamExecution( case StreamingExecutionRelation(source, output) => newData.get(source).map { data => val newPlan = data.logicalPlan - assert(output.size == newPlan.output.size, - s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(newPlan.output, ",")}") - replacements ++= output.zip(newPlan.output) + val newPlanOutput = newPlan.output + output.foreach { out => + val outputInNewPlan = newPlanOutput.find { newPlanOut => + out.name == newPlanOut.name && out.dataType == newPlanOut.dataType + }.getOrElse { + throw new AnalysisException( + s""" + |Batch does not have expected schema + |Expected: ${output.mkString(",")} + |Actual: ${newPlan.output.mkString(",")} + | + |== Original == + |$logicalPlan + | + |== Batch == + |$newPlan + """.stripMargin + ) + } + replacements += out -> outputInNewPlan + } newPlan }.getOrElse { LocalRelation(output) From 6578cc34cd9f6938a98361047bee61d1ab4e08fb Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 18 Nov 2016 17:43:13 -0800 Subject: [PATCH 2/7] fixed --- .../spark/sql/execution/streaming/StreamExecution.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ed57289f55ba7..bdf4a8a7bacce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -411,7 +411,9 @@ class StreamExecution( val newPlanOutput = newPlan.output output.foreach { out => val outputInNewPlan = newPlanOutput.find { newPlanOut => - out.name == newPlanOut.name && out.dataType == newPlanOut.dataType + out.name.toLowerCase == newPlanOut.name.toLowerCase && + // the line below means that we don't support schema evolution for now + out.dataType == newPlanOut.dataType }.getOrElse { throw new AnalysisException( s""" From ed2c3f92d45d5075a475d83c79e45672b3aad794 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 18 Nov 2016 19:06:25 -0800 Subject: [PATCH 3/7] better debug message --- .../spark/sql/execution/streaming/StreamExecution.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bdf4a8a7bacce..a313db7f63118 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -411,15 +411,19 @@ class StreamExecution( val newPlanOutput = newPlan.output output.foreach { out => val outputInNewPlan = newPlanOutput.find { newPlanOut => + // we can't use semanticEquals here because `AttributeReference.semanticEquals` + // checks the equality of expression id's, but we're guaranteed that they will be + // different here out.name.toLowerCase == newPlanOut.name.toLowerCase && // the line below means that we don't support schema evolution for now out.dataType == newPlanOut.dataType }.getOrElse { + val availableColumns = newPlan.output.map(a => s"${a.name} of ${a.dataType}") throw new AnalysisException( s""" |Batch does not have expected schema - |Expected: ${output.mkString(",")} - |Actual: ${newPlan.output.mkString(",")} + |Missing Column: ${out.name} of ${out.dataType} + |Available Columns: ${availableColumns.mkString(", ")} | |== Original == |$logicalPlan From 8465aca7dfce72f4141e4bec241bc833a2e4a83c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 19 Nov 2016 19:23:54 -0800 Subject: [PATCH 4/7] ready for review --- .../sql/streaming/FileStreamSourceSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b365af76c3795..f47dc76b99b64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1009,6 +1009,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1")) assert(options.maxFilesPerTrigger == Some(1)) } + + test("SPARK-18407: Prune inferred partition columns from execution if not specified in schema") { + withTempDirs { case (src, tmp) => + spark.range(4).select('id, 'id % 4 as 'part).coalesce(1).write + .partitionBy("part") + .mode("overwrite") + .parquet(src.toString) + val sdf = spark.readStream + .schema(StructType(Seq(StructField("id", LongType)))) // omit 'part' + .format("parquet") + .load(src.toString) + try { + val sq = sdf.writeStream + .queryName("assertion_test") + .format("memory") + .start() + // used to throw an assertion error + sq.processAllAvailable() + checkAnswer( + spark.table("assertion_test"), + Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil + ) + } finally { + spark.streams.active.foreach(_.stop()) + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From c2c2cd5890a38ac3848d724fbe10b24a7cd44ad6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 19 Nov 2016 19:25:22 -0800 Subject: [PATCH 5/7] make test a bit more complex --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f47dc76b99b64..a1a66ccb89582 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1017,7 +1017,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .mode("overwrite") .parquet(src.toString) val sdf = spark.readStream - .schema(StructType(Seq(StructField("id", LongType)))) // omit 'part' + .schema(StructType(Seq(StructField("ID", LongType)))) // omit 'part' also change case .format("parquet") .load(src.toString) try { From 879c6e1449074badeb6da73fb10fdd6efcb5838c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 19 Nov 2016 19:29:55 -0800 Subject: [PATCH 6/7] make test a bit more complex --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 2 +- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a313db7f63118..0812c973a7714 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -440,7 +440,7 @@ class StreamExecution( LocalRelation(output) } } - + println(replacements) // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index a1a66ccb89582..f47dc76b99b64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1017,7 +1017,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .mode("overwrite") .parquet(src.toString) val sdf = spark.readStream - .schema(StructType(Seq(StructField("ID", LongType)))) // omit 'part' also change case + .schema(StructType(Seq(StructField("id", LongType)))) // omit 'part' .format("parquet") .load(src.toString) try { From b4efee97ab5b89526ec515735de32a3fde969c72 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 19 Nov 2016 19:35:15 -0800 Subject: [PATCH 7/7] remove debug --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0812c973a7714..5d18194ee90f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -440,7 +440,6 @@ class StreamExecution( LocalRelation(output) } } - println(replacements) // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions {