diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala index 52c200796ce4..623a1b6f854c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -22,20 +22,22 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} class ProcessingTimeSuite extends SparkFunSuite { test("create") { - assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000) - assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 1000) - assert(ProcessingTime("1 minute").intervalMs === 60 * 1000) - assert(ProcessingTime("interval 1 minute").intervalMs === 60 * 1000) - - intercept[IllegalArgumentException] { ProcessingTime(null: String) } - intercept[IllegalArgumentException] { ProcessingTime("") } - intercept[IllegalArgumentException] { ProcessingTime("invalid") } - intercept[IllegalArgumentException] { ProcessingTime("1 month") } - intercept[IllegalArgumentException] { ProcessingTime("1 year") } + def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs + + assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000) + assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 * 1000) + assert(getIntervalMs(Trigger.ProcessingTime("1 minute")) === 60 * 1000) + assert(getIntervalMs(Trigger.ProcessingTime("interval 1 minute")) === 60 * 1000) + + intercept[IllegalArgumentException] { Trigger.ProcessingTime(null: String) } + intercept[IllegalArgumentException] { Trigger.ProcessingTime("") } + intercept[IllegalArgumentException] { Trigger.ProcessingTime("invalid") } + intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 month") } + intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 year") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 9f2f0d195de9..a5399cdb6e5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -664,7 +664,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc) testStream(result, Update)( - StartStream(ProcessingTime("1 second"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), AddData(inputData, "a"), AdvanceManualClock(1 * 1000), CheckLastBatch(("a", "1")), @@ -729,7 +729,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf .flatMapGroupsWithState(Update, EventTimeTimeout)(stateFunc) testStream(result, Update)( - StartStream(ProcessingTime("1 second")), + StartStream(Trigger.ProcessingTime("1 second")), AddData(inputData, ("a", 11), ("a", 13), ("a", 15)), // Set timeout timestamp of ... CheckLastBatch(("a", 15)), // "a" to 15 + 5 = 20s, watermark to 5s AddData(inputData, ("a", 4)), // Add data older than watermark for "a" @@ -901,7 +901,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc) testStream(result, Update)( - StartStream(ProcessingTime("1 second"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), AddData(inputData, ("a", 1L)), AdvanceManualClock(1 * 1000), CheckLastBatch(("a", "1")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 4345a70601c3..b6e82b621c8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -267,7 +267,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte .where('value >= current_timestamp().cast("long") - 10L) testStream(aggregated, Complete)( - StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), // advance clock to 10 seconds, all keys retained AddData(inputData, 0L, 5L, 5L, 10L), @@ -294,7 +294,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte clock.advance(60 * 1000L) true }, - StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock), // The commit log blown, causing the last batch to re-run CheckLastBatch((20L, 1), (85L, 1)), AssertOnQuery { q => @@ -322,7 +322,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte .where($"value".cast("date") >= date_sub(current_date(), 10)) .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( - StartStream(ProcessingTime("10 day"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), // advance clock to 10 days, should retain all keys AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), @@ -346,7 +346,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true }, - StartStream(ProcessingTime("10 day"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), // Commit log blown, causing a re-run of the last batch CheckLastBatch((20L, 1), (85L, 1)),