Skip to content

Commit 084c8d5

Browse files
committed
Set default daysPerMonth to 31
1 parent 6afa006 commit 084c8d5

File tree

5 files changed

+16
-17
lines changed

5 files changed

+16
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object EventTimeWatermark {
3030

3131
def getDelayMs(delay: CalendarInterval): Long = {
3232
// We define month as `31 days` to simplify calculation.
33-
IntervalUtils.getDuration(delay, 31, TimeUnit.MILLISECONDS)
33+
IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS)
3434
}
3535
}
3636

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ object IntervalUtils {
9595
* Gets interval duration
9696
*
9797
* @param cal - the interval to get duration
98-
* @param daysPerMonth - the number of days per one month
9998
* @param targetUnit - time units of the result
99+
* @param daysPerMonth - the number of days per one month
100100
* @return duration in the specified time units
101101
*/
102102
def getDuration(
103103
cal: CalendarInterval,
104-
daysPerMonth: Int,
105-
targetUnit: TimeUnit): Long = {
104+
targetUnit: TimeUnit,
105+
daysPerMonth: Int = 31): Long = {
106106
val monthsDuration = Math.multiplyExact(daysPerMonth * DateTimeUtils.MICROS_PER_DAY, cal.months)
107107
val result = Math.addExact(cal.microseconds, monthsDuration)
108108
targetUnit.convert(result, TimeUnit.MICROSECONDS)
@@ -115,7 +115,7 @@ object IntervalUtils {
115115
* @param daysPerMonth - the number of days per one month
116116
* @return true if duration of the given interval is less than 0 otherwise false
117117
*/
118-
def isNegative(cal: CalendarInterval, daysPerMonth: Int): Boolean = {
119-
getDuration(cal, daysPerMonth, TimeUnit.MICROSECONDS) < 0
118+
def isNegative(cal: CalendarInterval, daysPerMonth: Int = 31): Boolean = {
119+
getDuration(cal, TimeUnit.MICROSECONDS, daysPerMonth) < 0
120120
}
121121
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@ import org.apache.spark.unsafe.types.CalendarInterval
2727

2828
class IntervalUtilsSuite extends SparkFunSuite with Matchers {
2929
test("interval duration") {
30-
def duration(s: String, daysPerMonth: Int, unit: TimeUnit): Long = {
31-
getDuration(CalendarInterval.fromString(s), daysPerMonth, unit)
30+
def duration(s: String, unit: TimeUnit, daysPerMonth: Int): Long = {
31+
getDuration(CalendarInterval.fromString(s), unit, daysPerMonth)
3232
}
3333

34-
assert(duration("0 seconds", 31, TimeUnit.MILLISECONDS) === 0)
35-
assert(duration("1 month", 31, TimeUnit.DAYS) === 31)
36-
assert(duration("1 microsecond", 30, TimeUnit.MICROSECONDS) === 1)
37-
assert(duration("1 month -30 days", 31, TimeUnit.DAYS) === 1)
34+
assert(duration("0 seconds", TimeUnit.MILLISECONDS, 31) === 0)
35+
assert(duration("1 month", TimeUnit.DAYS, 31) === 31)
36+
assert(duration("1 microsecond", TimeUnit.MICROSECONDS, 30) === 1)
37+
assert(duration("1 month -30 days", TimeUnit.DAYS, 31) === 1)
3838

3939
try {
40-
duration(Integer.MAX_VALUE + " month", 31, TimeUnit.SECONDS)
40+
duration(Integer.MAX_VALUE + " month", TimeUnit.SECONDS, 31)
4141
fail("Expected to throw an exception for the invalid input")
4242
} catch {
4343
case e: ArithmeticException =>

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ class Dataset[T] private[sql](
732732
s"Unable to parse time delay '$delayThreshold'",
733733
cause = Some(e))
734734
}
735-
require(!IntervalUtils.isNegative(parsedDelay, 31),
735+
require(!IntervalUtils.isNegative(parsedDelay),
736736
s"delay threshold ($delayThreshold) should not be negative.")
737737
EliminateEventTimeWatermark(
738738
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,11 @@ private[sql] class GroupStateImpl[S] private(
161161

162162
private def parseDuration(duration: String): Long = {
163163
val cal = CalendarInterval.fromCaseInsensitiveString(duration)
164-
val daysPerMonth = 31
165-
if (IntervalUtils.isNegative(cal, daysPerMonth)) {
164+
if (IntervalUtils.isNegative(cal)) {
166165
throw new IllegalArgumentException(s"Provided duration ($duration) is negative")
167166
}
168167

169-
IntervalUtils.getDuration(cal, daysPerMonth, TimeUnit.MILLISECONDS)
168+
IntervalUtils.getDuration(cal, TimeUnit.MILLISECONDS)
170169
}
171170

172171
private def checkTimeoutTimestampAllowed(): Unit = {

0 commit comments

Comments
 (0)