Skip to content

Conversation

@lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Feb 19, 2017

What changes were proposed in this pull request?

Right now file source always uses InMemoryFileIndex to scan files from a given path.

But when reading the outputs from another streaming query, the file source should use MetadataFileIndex to list files from the sink log. This patch adds this support.

MetadataFileIndex or InMemoryFileIndex

spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
                      //   - use `MetadataFileIndex` when `/some/path/_spark_meta` exists
                      //   - fall back to `InMemoryFileIndex` otherwise
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`

How was this patch tested?

two newly added tests

@SparkQA
Copy link

SparkQA commented Feb 19, 2017

Test build #73124 has started for PR 16987 at commit b66d2cc.

@lw-lin
Copy link
Contributor Author

lw-lin commented Feb 19, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Feb 19, 2017

Test build #73126 has finished for PR 16987 at commit b66d2cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin lw-lin changed the title [WIP][SPARK-][SS] FileSource read from FileSink [SPARK-19633][SS] FileSource read from FileSink Feb 19, 2017
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to keep track of the file name for later checking

@lw-lin
Copy link
Contributor Author

lw-lin commented Feb 20, 2017

@marmbrus @zsxwing would you take a look at this? thanks!

@marmbrus
Copy link
Contributor

Thanks for working on this, however I'm not sure if we want to go with this approach. In Spark 2.2, I think we should consider deprecating the manifest files and instead use deterministic file names to get exactly once semantics.

@lw-lin
Copy link
Contributor Author

lw-lin commented Feb 23, 2017

Using deterministic file names sounds great. Thanks! I'm closing this.

@lw-lin lw-lin closed this Feb 23, 2017
@marmbrus
Copy link
Contributor

I spoke too soon, sorry! Thinking about it more the deterministic filename solution is not great as the number of partitions could change for several reasons.

Given that would you mind reopening this?

/cc @zsxwing do you have time to review?

@lw-lin
Copy link
Contributor Author

lw-lin commented Feb 24, 2017

Reopening :-)

@lw-lin lw-lin reopened this Feb 24, 2017
@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73374 has finished for PR 16987 at commit b66d2cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 24, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73382 has finished for PR 16987 at commit b66d2cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Could you rewrite the tests to use real streaming queries rather than modifying the log manually? It's better to have two queries, one is writing to FileSink, the other is reading from the same folder using FileSource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess sourceHasMetadata is generated here is because of hasMetadata. Could you move hasMetadata to object FileStreamSink? Then you can do it inside FileStreamSource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea hasMetadata was the reason! Now it lives in object FileStreamSink :-D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can merge the latest master and use test directly. Not need to use testWithUninterruptibleThread after #16947

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done; thanks! and good job for #16947!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@lw-lin lw-lin force-pushed the source-read-from-sink branch from b66d2cc to d31cb76 Compare February 26, 2017 12:49
@SparkQA
Copy link

SparkQA commented Feb 26, 2017

Test build #73492 has finished for PR 16987 at commit d31cb76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Feb 27, 2017

Rebased to master and tests updated. @zsxwing would you take another look when you've got a minute?

/**
* If the source has a metadata log indicating which files should be read, then we should use it.
* We figure out whether there exists some metadata log only when user gives a non-glob path.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found one corner case: if the query to write files has not yet started, the current folder will contain no files even it's an output folder of the file sink. I think we should always call sourceHasMetadata until the folder is not empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why not just change sourceHasMetadata to a method? sparkSession.sessionState.newHadoopConf() seems expensive but we can save it into a field.

Copy link
Contributor Author

@lw-lin lw-lin Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks! I was about to change it to a method which would stop detecting once we know for sure to use a metadatafileindex or a inmemoryfileindex and remember this information. will push an udpate soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and add a dedicated test case of course

*
* None means we don't know at the moment
* Some(true) means we know for sure the source DOES have metadata
* Some(false) means we know for sure the source DOSE NOT have metadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( some notes here since the changes are not trival )

here we're using this sourceHasMetadata to indicate whether we know for sure the source has metadata, as stated in the source file comments:

  • None means we don't know at the moment
  • Some(true) means we know for sure the source DOES have metadata
  • Some(false) means we know for sure the source DOSE NOT have metadata

// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
// non-glob path
new MetadataLogFileIndex(sparkSession, qualifiedBasePath)

Copy link
Contributor Author

@lw-lin lw-lin Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then based on sourceHasMetadata's value, we can choose which FileIndex should be used. As shown below, case None requires most of the care.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like sourceHasMetadata match { case ... } is more appropriate here

val sources = query.get.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this common logic is extracted out

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73578 has finished for PR 16987 at commit eed1c04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Left some style comments.

}

/** Execute arbitrary code */
case class Execute(val func: StreamExecution => Any) extends StreamAction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just make this extend AssertOnQuery to avoid adding new case clause to testStream which is already pretty long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks!

withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDirs { case (dir, tmp) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1_source = MemoryStream[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please don't use _ in a variable name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


test("read data from outputs of another streaming query") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDirs { case (dir, tmp) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmp is not used. Why not just name them as (outputDir, checkpointDir)? Same for other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

val q1_source = MemoryStream[String]
val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
val q1_outputDir = new File(dir, "q1_outputDir")
assert(q1_outputDir.mkdir()) // prepare the output dir for q2 to read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just put the command following the statement with 1 space. Using the current format is hard to maintain in future because it requires to align comments. Same for other comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood & fixed

testStream(q2)(
AssertOnQuery { q2 =>
val fileSource = getSourcesFromStreamingQuery(q2).head
fileSource.sourceHasMetadata === None // q1 has not started yet, verify that q2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the comment above this line. Same for other comments

// q1 has not started yet, verify that q2 doesn't know whether q1 has metadata
fileSource.sourceHasMetadata === None 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

fileSource.sourceHasMetadata === Some(true) // q1 has started, verify that q2 knows q1 has
// metadata by now
},
CheckAnswer("keep2"), // answer should be correct
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: // answer should be correct is obvious. Don't add such comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// doesn't know whether q1 has metadata
},
Execute { _ =>
q1 = q1_write.start(q1_outputDir.getCanonicalPath) // start q1 !!!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: // start q1 !!! is obvious. Don't add such comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

q2ProcessAllAvailable(),
CheckAnswer("keep2", "keep3", "keep4"),

// stop q1 manually
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: // stop q1 manually is obvious. Don't add such comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

test("read partitioned data from outputs of another streaming query") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems not necessary. It will pass even if the source doesn't use the partition information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term, we should write the partition information to the file sink log, then we can read it in the file source. However, it's out of scope. If you have time, you can think about it and submit a new PR after this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test removed -- Let me think about this write partition infommation thing :)
thanks!

allFiles = allFilesUsingInMemoryFileIndex()
if (allFiles.isEmpty) {
// we still cannot decide
sourceHasMetadata match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply switched to sourceHasMetadata match { case... case ... case ...}
actual diff is quite small

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73653 has finished for PR 16987 at commit 62fd518.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 1, 2017

LGTM. Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants