Skip to content

Commit 538463a

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-35780-full-range-datetime
2 parents 4723f8e + 103d16e commit 538463a

File tree

29 files changed

+766
-159
lines changed

29 files changed

+766
-159
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark._
3535
import org.apache.spark.broadcast.BroadcastManager
3636
import org.apache.spark.executor.ExecutorMetrics
3737
import org.apache.spark.internal.config
38+
import org.apache.spark.internal.config.Tests
3839
import org.apache.spark.rdd.{DeterministicLevel, RDD}
3940
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests}
4041
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
@@ -3427,6 +3428,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
34273428
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
34283429
conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
34293430
conf.set("spark.master", "pushbasedshuffleclustermanager")
3431+
// Needed to run push-based shuffle tests in ad-hoc manner through IDE
3432+
conf.set(Tests.IS_TESTING, true)
34303433
}
34313434

34323435
test("SPARK-32920: shuffle merge finalization") {

dev/deps/spark-deps-hadoop-2.7-hive-2.3

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ commons-cli/1.2//commons-cli-1.2.jar
3838
commons-codec/1.15//commons-codec-1.15.jar
3939
commons-collections/3.2.2//commons-collections-3.2.2.jar
4040
commons-compiler/3.0.16//commons-compiler-3.0.16.jar
41-
commons-compress/1.20//commons-compress-1.20.jar
41+
commons-compress/1.21//commons-compress-1.21.jar
4242
commons-configuration/1.6//commons-configuration-1.6.jar
4343
commons-crypto/1.1.0//commons-crypto-1.1.0.jar
4444
commons-dbcp/1.4//commons-dbcp-1.4.jar

dev/deps/spark-deps-hadoop-3.2-hive-2.3

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ commons-cli/1.2//commons-cli-1.2.jar
3232
commons-codec/1.15//commons-codec-1.15.jar
3333
commons-collections/3.2.2//commons-collections-3.2.2.jar
3434
commons-compiler/3.0.16//commons-compiler-3.0.16.jar
35-
commons-compress/1.20//commons-compress-1.20.jar
35+
commons-compress/1.21//commons-compress-1.21.jar
3636
commons-crypto/1.1.0//commons-crypto-1.1.0.jar
3737
commons-dbcp/1.4//commons-dbcp-1.4.jar
3838
commons-io/2.8.0//commons-io-2.8.0.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<netlib.java.version>1.1.2</netlib.java.version>
173173
<netlib.ludovic.dev.version>2.2.0</netlib.ludovic.dev.version>
174174
<commons-codec.version>1.15</commons-codec.version>
175-
<commons-compress.version>1.20</commons-compress.version>
175+
<commons-compress.version>1.21</commons-compress.version>
176176
<commons-io.version>2.8.0</commons-io.version>
177177
<!-- org.apache.commons/commons-lang/-->
178178
<commons-lang2.version>2.6</commons-lang2.version>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ object FunctionRegistry {
519519
expression[CurrentDate]("current_date"),
520520
expression[CurrentTimestamp]("current_timestamp"),
521521
expression[CurrentTimeZone]("current_timezone"),
522+
expression[LocalTimestamp]("localtimestamp"),
522523
expression[DateDiff]("datediff"),
523524
expression[DateAdd]("date_add"),
524525
expression[DateFormatClass]("date_format"),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.AnalysisException
22-
import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestamp, GroupingSets, MonotonicallyIncreasingID, Now}
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID}
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2424
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.catalyst.plans.logical._
@@ -417,7 +417,7 @@ object UnsupportedOperationChecker extends Logging {
417417

418418
subPlan.expressions.foreach { e =>
419419
if (e.collectLeaves().exists {
420-
case (_: CurrentTimestamp | _: Now | _: CurrentDate) => true
420+
case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => true
421421
case _ => false
422422
}) {
423423
throwError(s"Continuous processing does not support current time operations.")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.expressions
1919

2020
import java.text.ParseException
21-
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
21+
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId, ZoneOffset}
2222
import java.time.format.DateTimeParseException
2323
import java.util.Locale
2424

@@ -200,6 +200,44 @@ case class Now() extends CurrentTimestampLike {
200200
override def prettyName: String = "now"
201201
}
202202

203+
/**
204+
* Returns the current timestamp without time zone at the start of query evaluation.
205+
* There is no code generation since this expression should get constant folded by the optimizer.
206+
*/
207+
// scalastyle:off line.size.limit
208+
@ExpressionDescription(
209+
usage = """
210+
_FUNC_() - Returns the current timestamp without time zone at the start of query evaluation. All calls of localtimestamp within the same query return the same value.
211+
212+
_FUNC_ - Returns the current local date-time at the session time zone at the start of query evaluation.
213+
""",
214+
examples = """
215+
Examples:
216+
> SELECT _FUNC_();
217+
2020-04-25 15:49:11.914
218+
""",
219+
group = "datetime_funcs",
220+
since = "3.2.0")
221+
case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression
222+
with TimeZoneAwareExpression with CodegenFallback {
223+
224+
def this() = this(None)
225+
226+
override def foldable: Boolean = true
227+
override def nullable: Boolean = false
228+
229+
override def dataType: DataType = TimestampNTZType
230+
231+
final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
232+
233+
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
234+
copy(timeZoneId = Option(timeZoneId))
235+
236+
override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId))
237+
238+
override def prettyName: String = "localtimestamp"
239+
}
240+
203241
/**
204242
* Expression representing the current batch time, which is used by StreamExecution to
205243
* 1. prevent optimizer from pushing this expression below a stateful operator
@@ -236,6 +274,8 @@ case class CurrentBatchTimestamp(
236274
val timestampUs = millisToMicros(timestampMs)
237275
dataType match {
238276
case _: TimestampType => Literal(timestampUs, TimestampType)
277+
case _: TimestampNTZType =>
278+
Literal(convertTz(timestampUs, ZoneOffset.UTC, zoneId), TimestampNTZType)
239279
case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType)
240280
}
241281
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,9 @@ object UnwrapCastInBinaryComparison extends Rule[LogicalPlan] {
141141
// values.
142142
// 2. this rule only handles the case when both `fromExp` and value in `in.list` are of numeric
143143
// type.
144+
// 3. this rule doesn't optimize In when `in.list` contains an expression that is not literal.
144145
case in @ In(Cast(fromExp, toType: NumericType, _, _), list @ Seq(firstLit, _*))
145-
if canImplicitlyCast(fromExp, toType, firstLit.dataType) =>
146+
if canImplicitlyCast(fromExp, toType, firstLit.dataType) && in.inSetConvertible =>
146147

147148
// There are 3 kinds of literals in the list:
148149
// 1. null literals

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
2525
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.rules._
2727
import org.apache.spark.sql.catalyst.trees.TreePattern._
28-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2928
import org.apache.spark.sql.connector.catalog.CatalogManager
3029
import org.apache.spark.sql.types._
3130
import org.apache.spark.util.Utils
@@ -81,16 +80,19 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
8180
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
8281
val currentTime = Literal.create(timestamp, timeExpr.dataType)
8382
val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
83+
val localTimestamps = mutable.Map.empty[String, Literal]
8484

8585
plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
8686
case currentDate @ CurrentDate(Some(timeZoneId)) =>
8787
currentDates.getOrElseUpdate(timeZoneId, {
88-
Literal.create(
89-
DateTimeUtils.microsToDays(timestamp, currentDate.zoneId),
90-
DateType)
88+
Literal.create(currentDate.eval().asInstanceOf[Int], DateType)
9189
})
9290
case CurrentTimestamp() | Now() => currentTime
9391
case CurrentTimeZone() => timezone
92+
case localTimestamp @ LocalTimestamp(Some(timeZoneId)) =>
93+
localTimestamps.getOrElseUpdate(timeZoneId, {
94+
Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType)
95+
})
9496
}
9597
}
9698
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
9696
assert(math.abs(t1 - ct.getTime) < 5000)
9797
}
9898

99+
test("datetime function localtimestamp") {
100+
outstandingTimezonesIds.foreach { zid =>
101+
val ct = LocalTimestamp(Some(zid)).eval(EmptyRow).asInstanceOf[Long]
102+
val t1 = DateTimeUtils.localDateTimeToMicros(
103+
LocalDateTime.now(DateTimeUtils.getZoneId(zid)))
104+
assert(math.abs(t1 - ct) < 5000)
105+
}
106+
}
107+
99108
test("DayOfYear") {
100109
val sdfDay = new SimpleDateFormat("D", Locale.US)
101110

0 commit comments

Comments
 (0)