Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ trait StateStoreWriter extends StatefulOperator {
}

/** An operator that supports watermark. */
trait WatermarkSupport extends SparkPlan {
trait WatermarkSupport extends UnaryExecNode {

/** The keys that may have a watermark attribute. */
def keyExpressions: Seq[Attribute]

/** The watermark value. */
def eventTimeWatermark: Option[Long]

/** Generate a predicate that matches data older than the watermark */
lazy val watermarkPredicate: Option[Predicate] = {
/** Generate an expression that matches data older than the watermark */
lazy val watermarkExpression: Option[Expression] = {
val optionalWatermarkAttribute =
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))

Expand All @@ -96,9 +96,19 @@ trait WatermarkSupport extends SparkPlan {
}

logInfo(s"Filtering state store on: $evictionExpression")
newPredicate(evictionExpression, keyExpressions)
evictionExpression
}
}

/** Generate a predicate based on keys that matches data older than the watermark */
lazy val watermarkPredicateForKeys: Option[Predicate] =
watermarkExpression.map(newPredicate(_, keyExpressions))

/**
* Generate a predicate based on the child output that matches data older than the watermark.
*/
lazy val watermarkPredicate: Option[Predicate] =
watermarkExpression.map(newPredicate(_, child.output))
}

/**
Expand Down Expand Up @@ -192,7 +202,7 @@ case class StateStoreSaveExec(
}

// Assumption: Append mode can be done only when watermark has been specified
store.remove(watermarkPredicate.get.eval _)
store.remove(watermarkPredicateForKeys.get.eval _)
store.commit()

numTotalStateRows += store.numKeys()
Expand All @@ -215,7 +225,9 @@ case class StateStoreSaveExec(
override def hasNext: Boolean = {
if (!baseIterator.hasNext) {
// Remove old aggregates if watermark specified
if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval _)
if (watermarkPredicateForKeys.nonEmpty) {
store.remove(watermarkPredicateForKeys.get.eval _)
}
store.commit()
numTotalStateRows += store.numKeys()
false
Expand Down Expand Up @@ -361,7 +373,7 @@ case class StreamingDeduplicateExec(
val numUpdatedStateRows = longMetric("numUpdatedStateRows")

val baseIterator = watermarkPredicate match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}

Expand All @@ -381,7 +393,7 @@ case class StreamingDeduplicateExec(
}

CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
watermarkPredicate.foreach(f => store.remove(f.eval _))
watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
store.commit()
numTotalStateRows += store.numKeys()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,23 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
}
}
}

test("SPARK-19841: watermarkPredicate should filter based on keys") {
val input = MemoryStream[(Int, Int)]
val df = input.toDS.toDF("time", "id")
.withColumn("time", $"time".cast("timestamp"))
.withWatermark("time", "1 second")
.dropDuplicates("id", "time") // Change the column positions
.select($"id")
testStream(df)(
AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
CheckLastBatch(1, 2),
AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
CheckLastBatch(3, 4),
AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) due to watermark
CheckLastBatch(5, 6),
AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
CheckLastBatch(7)
)
}
}