-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14078] Streaming Parquet Based FileSink #11897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val startOffset: Option[Offset] = None, | ||
| val endOffset: Option[Offset] = None | ||
| ) extends Exception(message, cause) { | ||
| ) extends Exception(message, cause) with Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Exception has already extended Serializable. Not need to add it again.
|
Test build #53807 has finished for PR 11897 at commit
|
| path: String, | ||
| fileFormat: FileFormat) extends Sink with Logging { | ||
|
|
||
| val basePath = new Path(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private
| .startStream(outputDir) | ||
|
|
||
| inputData.addData(1, 2, 3) | ||
| failAfter(streamingTimeout) { query.processAllAvailable() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here: noNewData may become true before processAllAvailable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind. I just realized it will continue to set noNewData to true
|
Looks pretty good. Just some nits. |
|
Test build #53841 has finished for PR 11897 at commit
|
|
Test build #53847 has finished for PR 11897 at commit
|
|
|
||
| override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) | ||
|
|
||
| override def toString: String = s"FileSink[$path]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the file source :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, yes. I'll fix this in a follow up.
This PR adds a new
Sinkimplementation that writes out Parquet files. In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet basedDataSourceis initialized for reading, we first check for this log directory and use it instead of file listing when present.Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures.