-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-17829 [SQL] Stable format for offset log #15626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
jenkins, add to whitelist |
|
Test build #67524 has finished for PR 15626 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is a good start, but I think there one critical piece missing (or maybe i am not understanding the PR correctly).
The OffsetSeq is finally written to the log by the file HDFSMetadataLog. Currently, HDFSMetadataLog serializes the offset using Java serialization, and writes the bytes to the file. The goal of this PR is to ultimately not use java serialization, and rather use JSON text representation in the file. In this PR I dont see any changes to the HDFSMetadatalog, so the serialization format hasnt changed. All its doing is instead of Java-serializing CompositeOffset, its serializing OffsetSeq.
I strongly suggest writing a unit test where you take a OffsetSeq, and use HDFSMetadataLog to write it a file. And see the contents of the file is a valid json or not.
| */ | ||
| private[kafka010] | ||
| case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { | ||
| import org.json4s.jackson.Serialization.{write} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually put imports at the top, especially important ones like this.
And when it is just one component its just import org.json4s.jackson.Serialization.write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually its easier to read the code later if you import Serialization, and then use Serialization.write. Same for read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
|
|
||
| /** Companion object of the [[KafkaSourceOffset]] */ | ||
| private[kafka010] object KafkaSourceOffset { | ||
| import org.json4s.jackson.Serialization.{read} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
| } | ||
|
|
||
| object LongOffset { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line
|
|
||
| def apply(serialized: Offset) : LongOffset = new LongOffset(serialized.json.toLong) | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra lines above and below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please clarify best practices w.r.t. extra lines? I didn't see anything in the code style guide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a strict guideline i guess, its matter of the taste of the reviewer quite a bit. For me i guess where there is no need to visually introduce a gap, there is no need for an empty line. For example: between def apply and } there no need to introduce a separation.
|
|
||
| def apply(json: String): SerializedOffset = new SerializedOffset(json) | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra lines.
| * vector clock that must progress linearly forward. | ||
| */ | ||
| case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { | ||
| case class OffsetSeq(offsets: Seq[Option[Offset]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this renamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed to avoid confusing the class as being a type of Offset. Does that make sense or should I take an alternative approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah right. Of course. I didnt notice that. That makes sense.
| KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap) | ||
| } | ||
|
|
||
| def apply(serialized: Offset): KafkaSourceOffset = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I get this right, this can be called only on SerializedOffset. In that case, best to type the param as SerializedOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
| } | ||
|
|
||
| /** Used when loading */ | ||
| class SerializedOffset(override val json: String) extends Offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a case class in which case the apply below wont be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
| def json: String | ||
| } | ||
|
|
||
| /** Used when loading */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve the docs to something like "A JSON-serialized representation of an Offset that is used for saving offsets to the offset log"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
|
|
||
| object LongOffset { | ||
|
|
||
| def apply(serialized: Offset) : LongOffset = new LongOffset(serialized.json.toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as with SerializedOffset.apply, make the param SerializedOffset instead of Offset
|
Test build #67541 has finished for PR 15626 at commit
|
|
So it looks like this is the json format being used for kafka offsets: [{"_1":{"hash":0,"partition":0,"topic":"t"},"_2":2},{"_1":{"hash":0,"partition":1,"topic":"t"},"_2":2}] My main concerns are that is unnecessarily verbose, and is completely different from the format being used to specify per-topicpartition offsets in org.apache.spark.sql.kafka010.JsonUtils I see no reason why those formats should be different - a user should be able to take offsets directly from the checkpoint and use them to start a job, for instance. |
|
@koeninger Thanks for pointing this out. I have revised to serialization procedure to use org.apache.spark.sql.kafka010.JsonUtils |
|
Test build #67605 has finished for PR 15626 at commit
|
|
Test build #67685 has finished for PR 15626 at commit
|
|
Test build #67684 has finished for PR 15626 at commit
|
|
Test build #67690 has finished for PR 15626 at commit
|
| one = KafkaSourceOffset(("t", 0, 1L)), | ||
| two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) | ||
|
|
||
| compare( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this a separate test("json serialization") { ... } just to verify whether json ser --> deser gives back the same results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have extended the KafkaSourceOffsetSuite to include more thorough evaluations of json serialization/deserialization, including to/from OffsetSeqLog.
|
Test build #67711 has finished for PR 15626 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is look good, pending some public API issues. In addition, can you also add to the scala docs of Source.getBatch that the user should not assume start and end offset to a particular specific type, and the safe way read the offset details is to use the Offset.json to parse the details.
| * doing a compaction, it will read all old log files and merge them with the new batch. | ||
| */ | ||
| abstract class CompactibleFileStreamLog[T: ClassTag]( | ||
| abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this AnyRef needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's required by json4s that T be an AnyRef.
| private implicit val formats = Serialization.formats(NoTypeHints) | ||
|
|
||
| /** Needed to serialize type T into JSON */ | ||
| private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed now? This wasnt needed earlier when we were using json serialization in FilestreamSinkLog, then why is the manifest needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it why this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you added "Needed by Jackson to serialize/deserialize... "
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| def json: String | ||
| } | ||
|
|
||
| /** Used when loading */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs better docs. I think i had commented that earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
|
||
| override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = { | ||
| // called inside a try-finally where the underlying stream is closed in the caller | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right.
|
|
||
| override protected def deserialize(in: InputStream): OffsetSeq = { | ||
| // called inside a try-finally where the underlying stream is closed in the caller | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right.
| assert(metadataLog.get(1) === Some(batch1Serialized)) | ||
| assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) | ||
| assert(metadataLog.get(None, Some(1)) === | ||
| Array(0 -> batch0Serialized, 1 -> batch1Serialized)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect continuation indent, should be 2 not eight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| assert(metadataLog.get(1) === Some(batch1Serialized)) | ||
| assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) | ||
| assert(metadataLog.get(None, Some(1)) === | ||
| Array(0 -> batch0Serialized, 1 -> batch1Serialized)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect continuation indent, should be 2 not eight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| val batch0Serialized = OffsetSeq.fill(batch0.offsets.map(_.map(o => | ||
| SerializedOffset(o.json))).flatten: _*) | ||
|
|
||
| val batch1Serialized = OffsetSeq.fill(batch1.offsets.map(_.map(o => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simpler to read if you use flatmap instead of map...map...flatten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed!
| val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir | ||
| val metadataLog = new OffsetSeqLog("v1", spark, dir.getAbsolutePath) | ||
| val batch0 = OffsetSeq.fill(LongOffset(0)) | ||
| val batch1 = OffsetSeq.fill(LongOffset(1), LongOffset(2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would like to see this test with mixed offset types. how about define a custom offset and use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a StringOffset type. Let me know if you'd like to see something more complex.
| /** | ||
| * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]] | ||
| */ | ||
| def apply(serialized: SerializedOffset): KafkaSourceOffset = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem to used any where. I think the code in KafkaSourceOffset.getPartitionOffsets needs to change because it does not take into account that the param offset can be a SerializedOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks! I also added a test for this case.
|
Thanks for working on this! Could you include examples of the various logs, since we are committing to this specific JSON. |
|
All log are now uniformly marshall entries to JSON. Each log has a particular way of organizing its JSON entries: OffsetSeqLog: CompactibleFileStreamLog, FileStreamSinkLog, and FileStreamSourceLog Offsets are meant to be implemented by the Source. We currently have two Offset implementations:
|
|
Test build #67725 has finished for PR 15626 at commit
|
|
Test build #67739 has finished for PR 15626 at commit
|
|
Test build #67822 has finished for PR 15626 at commit
|
|
Test build #67829 has finished for PR 15626 at commit
|
|
Test build #67831 has finished for PR 15626 at commit
|
|
Test build #67845 has finished for PR 15626 at commit
|
|
Test build #67940 has finished for PR 15626 at commit
|
|
Test build #67994 has finished for PR 15626 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Just some minor issues.
| case _ => false | ||
| } | ||
|
|
||
| override def hashCode(): Int = this.json.hashCode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to add final
| * new data has arrived. | ||
| */ | ||
| trait Offset extends Serializable {} | ||
| abstract class Offset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you document that we only compare the json string, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also document in Source.getBatch that start and end may be semantics same but has different json representations and the Source should handle this case.
| // called inside a try-finally where the underlying stream is closed in the caller | ||
| val inStream = serializer.deserializeStream(in) | ||
| inStream.readObject[T]() | ||
| val reader = new InputStreamReader(in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should specify the charset like new InputStreamReader(in, StandardCharsets.UTF_8)
| /** | ||
| * Returns the data that is between the offsets (`start`, `end`]. | ||
| */ | ||
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given this change that start and end Offset will not the custom Offset class defined by the source, i think it is important to document this in the Source.getBatch class that the user should not make that assumption.
|
Test build #68008 has finished for PR 15626 at commit
|
|
Test build #68010 has finished for PR 15626 at commit
|
|
retest this please |
|
LGTM. Pending tests. |
|
Test build #68417 has finished for PR 15626 at commit
|
|
Thanks! Merging to master and 2.1. |
## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes #15626 from tcondie/spark-8360. (cherry picked from commit 3f62e1b) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes:
- To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
- To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).
An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
.groupBy(window($"eventTime", "1 minute") as 'window)
.count()
.writeStream
.format("console")
.mode("append") // In append mode, we only output finalized aggregations.
.start()
```
### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically.
Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late.
This mechanism was chosen for the initial implementation over processing time for two reasons:
- it is robust to downtime that could affect processing delay
- it does not require syncing of time or timezones between the producer and the processing engine.
### Other notable implementation details
- A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
- We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata.
- `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
- Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.
### Remaining in this PR
- [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until apache#15626 is merged.
### Other follow-ups
There are some natural additional features that we should consider for future work:
- Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
- `Update` mode so you can get partial results before a group is evicted.
- Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers.
Author: Michael Armbrust <michael@databricks.com>
Closes apache#15702 from marmbrus/watermarks.
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes:
- To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
- To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).
An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
.groupBy(window($"eventTime", "1 minute") as 'window)
.count()
.writeStream
.format("console")
.mode("append") // In append mode, we only output finalized aggregations.
.start()
```
### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically.
Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late.
This mechanism was chosen for the initial implementation over processing time for two reasons:
- it is robust to downtime that could affect processing delay
- it does not require syncing of time or timezones between the producer and the processing engine.
### Other notable implementation details
- A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
- We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata.
- `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
- Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.
### Remaining in this PR
- [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until #15626 is merged.
### Other follow-ups
There are some natural additional features that we should consider for future work:
- Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
- `Update` mode so you can get partial results before a group is evicted.
- Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers.
Author: Michael Armbrust <michael@databricks.com>
Closes #15702 from marmbrus/watermarks.
(cherry picked from commit c071878)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes apache#15626 from tcondie/spark-8360.
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes:
- To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
- To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).
An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
.groupBy(window($"eventTime", "1 minute") as 'window)
.count()
.writeStream
.format("console")
.mode("append") // In append mode, we only output finalized aggregations.
.start()
```
### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically.
Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late.
This mechanism was chosen for the initial implementation over processing time for two reasons:
- it is robust to downtime that could affect processing delay
- it does not require syncing of time or timezones between the producer and the processing engine.
### Other notable implementation details
- A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
- We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata.
- `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
- Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.
### Remaining in this PR
- [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until apache#15626 is merged.
### Other follow-ups
There are some natural additional features that we should consider for future work:
- Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
- `Update` mode so you can get partial results before a group is evicted.
- Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers.
Author: Michael Armbrust <michael@databricks.com>
Closes apache#15702 from marmbrus/watermarks.
What changes were proposed in this pull request?
Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues:
It can break across spark releases (though this is not the only thing preventing us from upgrading a running query)
It is unnecessarily opaque to the user.
I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option.
How was this patch tested?
Tests were added for KafkaSourceOffset in KafkaSourceOffsetSuite and for LongOffset in OffsetSuite
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
@zsxwing @marmbrus