Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter}
import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
import org.apache.spark.sql.catalyst.encoders.encoderFor

/**
* A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
Expand All @@ -32,46 +31,26 @@ import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {

override def addBatch(batchId: Long, data: DataFrame): Unit = {
// TODO: Refine this method when SPARK-16264 is resolved; see comments below.

// This logic should've been as simple as:
// ```
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-16264 was resolved as Won't Fix. So I removed it from the comment.

// data.as[T].foreachPartition { iter => ... }
// ```
//
// Unfortunately, doing that would just break the incremental planing. The reason is,
// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just
// does not support `IncrementalExecution`.
// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
// create a new plan. Because StreamExecution uses the existing plan to collect metrics and
// update watermark, we should never create a new plan. Otherwise, metrics and watermark are
// updated in the new plan, and StreamExecution cannot retrieval them.
//
// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()`
// method supporting incremental planning. But in the long run, we should generally make newly
// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to
// resolve).
val incrementalExecution = data.queryExecution.asInstanceOf[IncrementalExecution]
val datasetWithIncrementalExecution =
new Dataset(data.sparkSession, incrementalExecution, implicitly[Encoder[T]]) {
override lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](logicalPlan)

// was originally: sparkSession.sessionState.executePlan(deserialized) ...
val newIncrementalExecution = new IncrementalExecution(
this.sparkSession,
deserialized,
incrementalExecution.outputMode,
incrementalExecution.checkpointLocation,
incrementalExecution.currentBatchId,
incrementalExecution.currentEventTimeWatermark)
newIncrementalExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType))
}.asInstanceOf[RDD[T]]
}
}
datasetWithIncrementalExecution.foreachPartition { iter =>
// Hence, we need to manually convert internal rows to objects using encoder.
val encoder = encoderFor[T].resolveAndBind(
data.logicalPlan.output,
data.sparkSession.sessionState.analyzer)
data.queryExecution.toRdd.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
try {
while (iter.hasNext) {
writer.process(iter.next())
writer.process(encoder.fromRow(iter.next()))
}
} catch {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
}
}

test("foreach with watermark") {
test("foreach with watermark: complete") {
val inputData = MemoryStream[Int]

val windowedAggregation = inputData.toDF()
Expand Down Expand Up @@ -204,6 +204,72 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
query.stop()
}
}

test("foreach with watermark: append") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test that is not covered in the previous test "watermark + complete"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas As no eviction with complete mode, it will always output all data. So basically, the test "watermark + complete" is not super helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually complete mode should NOT work when watermark is enabled!! Why does this query still work? Thats material for different PR. So I approve this change in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM. watermark is a noop in complete mode. false alarm.

val inputData = MemoryStream[Int]

val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"count".as[Long])
.map(_.toInt)
.repartition(1)

val query = windowedAggregation
.writeStream
.outputMode(OutputMode.Append)
.foreach(new TestForeachWriter())
.start()
try {
inputData.addData(10, 11, 12)
query.processAllAvailable()
inputData.addData(25) // Advance watermark to 15 seconds
query.processAllAvailable()
inputData.addData(25) // Evict items less than previous watermark
query.processAllAvailable()

// There should be 3 batches and only does the last batch contain a value.
val allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 3)
val expectedEvents = Seq(
Seq(
ForeachSinkSuite.Open(partition = 0, version = 0),
ForeachSinkSuite.Close(None)
),
Seq(
ForeachSinkSuite.Open(partition = 0, version = 1),
ForeachSinkSuite.Close(None)
),
Seq(
ForeachSinkSuite.Open(partition = 0, version = 2),
ForeachSinkSuite.Process(value = 3),
ForeachSinkSuite.Close(None)
)
)
assert(allEvents === expectedEvents)
} finally {
query.stop()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont see a test that verifies whether the metrics are correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple test for metrics


test("foreach sink should support metrics") {
val inputData = MemoryStream[Int]
val query = inputData.toDS()
.writeStream
.foreach(new TestForeachWriter())
.start()
try {
inputData.addData(10, 11, 12)
query.processAllAvailable()
val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption
assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3,
s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics")
} finally {
query.stop()
}
}
}

/** A global object to collect events in the executor */
Expand Down