Skip to content

Commit 2acc53a

Browse files
committed
Remove ProcessingTime entirely, move ProcessingTimeTrigger to Triggers.scala
1 parent 20059fa commit 2acc53a

File tree

7 files changed

+38
-196
lines changed

7 files changed

+38
-196
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.sources.v2._
3232
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
33-
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
33+
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
3434
import org.apache.spark.util.Clock
3535

3636
class MicroBatchExecution(
@@ -51,8 +51,6 @@ class MicroBatchExecution(
5151
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty
5252

5353
private val triggerExecutor = trigger match {
54-
case ProcessingTime(interval) => ProcessingTimeExecutor(
55-
ProcessingTimeTrigger(interval), triggerClock)
5654
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
5755
case OneTimeTrigger => OneTimeExecutor()
5856
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeTrigger.scala

Lines changed: 0 additions & 57 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.streaming
1919

2020
import org.apache.spark.internal.Logging
21-
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
2221
import org.apache.spark.util.{Clock, SystemClock}
2322

2423
trait TriggerExecutor {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.util.concurrent.TimeUnit
21+
22+
import scala.concurrent.duration.Duration
23+
2024
import org.apache.spark.annotation.{Evolving, Experimental}
2125
import org.apache.spark.sql.streaming.Trigger
26+
import org.apache.spark.unsafe.types.CalendarInterval
2227

2328
/**
2429
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
@@ -27,3 +32,34 @@ import org.apache.spark.sql.streaming.Trigger
2732
@Experimental
2833
@Evolving
2934
case object OneTimeTrigger extends Trigger
35+
36+
/**
37+
* A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0,
38+
* the query will run as fast as possible.
39+
*/
40+
@Evolving
41+
private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
42+
require(intervalMs >= 0, "the interval of trigger should not be negative")
43+
}
44+
45+
private[sql] object ProcessingTimeTrigger {
46+
def apply(interval: String): ProcessingTimeTrigger = {
47+
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
48+
if (cal.months > 0) {
49+
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
50+
}
51+
new ProcessingTimeTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
52+
}
53+
54+
def apply(interval: Duration): ProcessingTimeTrigger = {
55+
ProcessingTimeTrigger(interval.toMillis)
56+
}
57+
58+
def create(interval: String): ProcessingTimeTrigger = {
59+
apply(interval)
60+
}
61+
62+
def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
63+
ProcessingTimeTrigger(unit.toMillis(interval))
64+
}
65+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
3434
import org.apache.spark.sql.sources.v2
3535
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
3636
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
37-
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
37+
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
3838
import org.apache.spark.util.Clock
3939

4040
class ContinuousExecution(

sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala

Lines changed: 0 additions & 133 deletions
This file was deleted.

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
2424
import org.scalatest.time.SpanSugar._
2525

2626
import org.apache.spark.SparkFunSuite
27-
import org.apache.spark.sql.streaming.ProcessingTime
2827
import org.apache.spark.sql.streaming.util.StreamManualClock
2928

3029
class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {

0 commit comments

Comments
 (0)