File tree Expand file tree Collapse file tree 4 files changed +27
-4
lines changed
catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis
main/scala/org/apache/spark/sql/execution/streaming
test/scala/org/apache/spark/sql/streaming Expand file tree Collapse file tree 4 files changed +27
-4
lines changed Original file line number Diff line number Diff line change @@ -155,6 +155,16 @@ trait CheckAnalysis extends PredicateHelper {
155155 }
156156
157157 operator match {
158+ case etw : EventTimeWatermark =>
159+ etw.eventTime.dataType match {
160+ case s : StructType
161+ if s.find(_.name == " start" ).map(_.dataType).contains(TimestampType ) =>
162+ case _ : TimestampType =>
163+ case _ =>
164+ failAnalysis(
165+ s " Event time must be defined on a window or a timestamp, but " +
166+ s " ${etw.eventTime.name} is of type ${etw.eventTime.dataType.simpleString}" )
167+ }
158168 case f : Filter if f.condition.dataType != BooleanType =>
159169 failAnalysis(
160170 s " filter expression ' ${f.condition.sql}' " +
Original file line number Diff line number Diff line change @@ -26,9 +26,9 @@ import org.apache.spark.sql.types.MetadataBuilder
2626import org .apache .spark .unsafe .types .CalendarInterval
2727import org .apache .spark .util .AccumulatorV2
2828
29+ /** Tracks the maximum positive long seen. */
2930class MaxLong (protected var currentValue : Long = 0 )
30- extends AccumulatorV2 [Long , Long ]
31- with Serializable {
31+ extends AccumulatorV2 [Long , Long ] {
3232
3333 override def isZero : Boolean = value == 0
3434 override def value : Long = currentValue
Original file line number Diff line number Diff line change @@ -450,11 +450,14 @@ class StreamExecution(
450450 }.headOption.foreach { newWatermark =>
451451 if (newWatermark > currentEventTimeWatermark) {
452452 logInfo(s " Updating eventTime watermark to: $newWatermark ms " )
453- streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK , newWatermark)
454453 currentEventTimeWatermark = newWatermark
455454 } else {
456455 logTrace(s " Event time didn't move: $newWatermark < $currentEventTimeWatermark" )
457456 }
457+
458+ if (newWatermark != 0 ) {
459+ streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK , newWatermark)
460+ }
458461 }
459462
460463 awaitBatchLock.lock()
Original file line number Diff line number Diff line change @@ -40,6 +40,16 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
4040 assert(e.getMessage contains " badColumn" )
4141 }
4242
43+ test(" error on wrong type" ) {
44+ val inputData = MemoryStream [Int ].toDF()
45+ val e = intercept[AnalysisException ] {
46+ inputData.withWatermark(" value" , " 1 minute" )
47+ }
48+ assert(e.getMessage contains " value" )
49+ assert(e.getMessage contains " int" )
50+ }
51+
52+
4353 test(" watermark metric" ) {
4454 val inputData = MemoryStream [Int ]
4555
@@ -129,7 +139,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
129139 CheckAnswer ((10 , 3 )),
130140 AddData (inputData, 10 ), // 10 is later than 15 second watermark
131141 CheckAnswer ((10 , 3 )),
132- AddData (inputData, 25 ), // 10 is later than 15 second watermark
142+ AddData (inputData, 25 ),
133143 CheckAnswer ((10 , 3 )) // Should not emit an incorrect partial result.
134144 )
135145 }
You can’t perform that action at this time.
0 commit comments