diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 5a64845598ea5..37ddbaf1673d7 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -142,6 +142,7 @@ Datetime Functions window session_window timestamp_seconds + window_time Collection Functions diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f01379afd6ef9..ad1bc488e876d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -4884,6 +4884,52 @@ def check_string_field(field, fieldName): # type: ignore[no-untyped-def] return _invoke_function("window", time_col, windowDuration) +def window_time( + windowColumn: "ColumnOrName", +) -> Column: + """Computes the event time from a window column. The column window values are produced + by window aggregating operators and are of type `STRUCT` + where start is inclusive and end is exclusive. The event time of records produced by window + aggregating operators can be computed as ``window_time(window)`` and are + ``window.end - lit(1).alias("microsecond")`` (as microsecond is the minimal supported event + time precision). The window column must be one produced by a window aggregating operator. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + windowColumn : :class:`~pyspark.sql.Column` + The window column of a window aggregate records. + + Returns + ------- + :class:`~pyspark.sql.Column` + the column for computed results. + + Examples + -------- + >>> import datetime + >>> df = spark.createDataFrame( + ... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], + ... ).toDF("date", "val") + + Group the data into 5 second time windows and aggregate as sum. + + >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) + + Extract the window event time using the window_time function. + + >>> w.select( + ... w.window.end.cast("string").alias("end"), + ... window_time(w.window).cast("string").alias("window_time"), + ... "sum" + ... ).collect() + [Row(end='2016-03-11 09:00:10', window_time='2016-03-11 09:00:09.999999', sum=1)] + """ + window_col = _to_java_column(windowColumn) + return _invoke_function("window_time", window_col) + + def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) -> Column: """ Generates session window given a timestamp specifying column. diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 32cc77e11155b..55ef012b6d021 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -894,6 +894,22 @@ def test_window_functions_cumulative_sum(self): for r, ex in zip(rs, expected): self.assertEqual(tuple(r), ex[: len(r)]) + def test_window_time(self): + df = self.spark.createDataFrame( + [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], ["date", "val"] + ) + from pyspark.sql import functions as F + + w = df.groupBy(F.window("date", "5 seconds")).agg(F.sum("val").alias("sum")) + r = w.select( + w.window.end.cast("string").alias("end"), + F.window_time(w.window).cast("string").alias("window_time"), + "sum", + ).collect() + self.assertEqual( + r[0], Row(end="2016-03-11 09:00:10", window_time="2016-03-11 09:00:09.999999", sum=1) + ) + def test_collect_functions(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) from pyspark.sql import functions diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b185b38797bb9..fc12b6522b41b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -56,7 +56,6 @@ import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType.DAY import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} -import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.Utils import org.apache.spark.util.collection.{Utils => CUtils} @@ -313,6 +312,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveAggregateFunctions :: TimeWindowing :: SessionWindowing :: + ResolveWindowTime :: ResolveDefaultColumns(v1SessionCatalog) :: ResolveInlineTables :: ResolveLambdaVariables :: @@ -3965,242 +3965,6 @@ object EliminateEventTimeWatermark extends Rule[LogicalPlan] { } } -/** - * Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to - * figure out how many windows a time column can map to, we over-estimate the number of windows and - * filter out the rows where the time column is not inside the time window. - */ -object TimeWindowing extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalyst.dsl.expressions._ - - private final val WINDOW_COL_NAME = "window" - private final val WINDOW_START = "start" - private final val WINDOW_END = "end" - - /** - * Generates the logical plan for generating window ranges on a timestamp column. Without - * knowing what the timestamp value is, it's non-trivial to figure out deterministically how many - * window ranges a timestamp will map to given all possible combinations of a window duration, - * slide duration and start time (offset). Therefore, we express and over-estimate the number of - * windows there may be, and filter the valid windows. We use last Project operator to group - * the window columns into a struct so they can be accessed as `window.start` and `window.end`. - * - * The windows are calculated as below: - * maxNumOverlapping <- ceil(windowDuration / slideDuration) - * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration - * windowStart <- lastStart - i * slideDuration - * windowEnd <- windowStart + windowDuration - * return windowStart, windowEnd - * - * This behaves as follows for the given parameters for the time: 12:05. The valid windows are - * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the - * Filter operator. - * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m - * 11:55 - 12:07 + 11:52 - 12:04 x - * 12:00 - 12:12 + 11:57 - 12:09 + - * 12:05 - 12:17 + 12:02 - 12:14 + - * - * @param plan The logical plan - * @return the logical plan that will generate the time windows using the Expand operator, with - * the Filter operator for correctness and Project for usability. - */ - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( - _.containsPattern(TIME_WINDOW), ruleId) { - case p: LogicalPlan if p.children.size == 1 => - val child = p.children.head - val windowExpressions = - p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet - - val numWindowExpr = p.expressions.flatMap(_.collect { - case s: SessionWindow => s - case t: TimeWindow => t - }).toSet.size - - // Only support a single window expression for now - if (numWindowExpr == 1 && windowExpressions.nonEmpty && - windowExpressions.head.timeColumn.resolved && - windowExpressions.head.checkInputDataTypes().isSuccess) { - - val window = windowExpressions.head - - val metadata = window.timeColumn match { - case a: Attribute => a.metadata - case _ => Metadata.empty - } - - def getWindow(i: Int, dataType: DataType): Expression = { - val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp - (timestamp - window.startTime - + window.slideDuration) % window.slideDuration - val windowStart = lastStart - i * window.slideDuration - val windowEnd = windowStart + window.windowDuration - - // We make sure value fields are nullable since the dataType of TimeWindow defines them - // as nullable. - CreateNamedStruct( - Literal(WINDOW_START) :: - PreciseTimestampConversion(windowStart, LongType, dataType).castNullable() :: - Literal(WINDOW_END) :: - PreciseTimestampConversion(windowEnd, LongType, dataType).castNullable() :: - Nil) - } - - val windowAttr = AttributeReference( - WINDOW_COL_NAME, window.dataType, metadata = metadata)() - - if (window.windowDuration == window.slideDuration) { - val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)( - exprId = windowAttr.exprId, explicitMetadata = Some(metadata)) - - val replacedPlan = p transformExpressions { - case t: TimeWindow => windowAttr - } - - // For backwards compatibility we add a filter to filter out nulls - val filterExpr = IsNotNull(window.timeColumn) - - replacedPlan.withNewChildren( - Project(windowStruct +: child.output, - Filter(filterExpr, child)) :: Nil) - } else { - val overlappingWindows = - math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt - val windows = - Seq.tabulate(overlappingWindows)(i => - getWindow(i, window.timeColumn.dataType)) - - val projections = windows.map(_ +: child.output) - - // When the condition windowDuration % slideDuration = 0 is fulfilled, - // the estimation of the number of windows becomes exact one, - // which means all produced windows are valid. - val filterExpr = - if (window.windowDuration % window.slideDuration == 0) { - IsNotNull(window.timeColumn) - } else { - window.timeColumn >= windowAttr.getField(WINDOW_START) && - window.timeColumn < windowAttr.getField(WINDOW_END) - } - - val substitutedPlan = Filter(filterExpr, - Expand(projections, windowAttr +: child.output, child)) - - val renamedPlan = p transformExpressions { - case t: TimeWindow => windowAttr - } - - renamedPlan.withNewChildren(substitutedPlan :: Nil) - } - } else if (numWindowExpr > 1) { - throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) - } else { - p // Return unchanged. Analyzer will throw exception later - } - } -} - -/** Maps a time column to a session window. */ -object SessionWindowing extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalyst.dsl.expressions._ - - private final val SESSION_COL_NAME = "session_window" - private final val SESSION_START = "start" - private final val SESSION_END = "end" - - /** - * Generates the logical plan for generating session window on a timestamp column. - * Each session window is initially defined as [timestamp, timestamp + gap). - * - * This also adds a marker to the session column so that downstream can easily find the column - * on session window. - */ - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case p: LogicalPlan if p.children.size == 1 => - val child = p.children.head - val sessionExpressions = - p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet - - val numWindowExpr = p.expressions.flatMap(_.collect { - case s: SessionWindow => s - case t: TimeWindow => t - }).toSet.size - - // Only support a single session expression for now - if (numWindowExpr == 1 && sessionExpressions.nonEmpty && - sessionExpressions.head.timeColumn.resolved && - sessionExpressions.head.checkInputDataTypes().isSuccess) { - - val session = sessionExpressions.head - - val metadata = session.timeColumn match { - case a: Attribute => a.metadata - case _ => Metadata.empty - } - - val newMetadata = new MetadataBuilder() - .withMetadata(metadata) - .putBoolean(SessionWindow.marker, true) - .build() - - val sessionAttr = AttributeReference( - SESSION_COL_NAME, session.dataType, metadata = newMetadata)() - - val sessionStart = - PreciseTimestampConversion(session.timeColumn, session.timeColumn.dataType, LongType) - val gapDuration = session.gapDuration match { - case expr if Cast.canCast(expr.dataType, CalendarIntervalType) => - Cast(expr, CalendarIntervalType) - case other => - throw QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType) - } - val sessionEnd = PreciseTimestampConversion(session.timeColumn + gapDuration, - session.timeColumn.dataType, LongType) - - // We make sure value fields are nullable since the dataType of SessionWindow defines them - // as nullable. - val literalSessionStruct = CreateNamedStruct( - Literal(SESSION_START) :: - PreciseTimestampConversion(sessionStart, LongType, session.timeColumn.dataType) - .castNullable() :: - Literal(SESSION_END) :: - PreciseTimestampConversion(sessionEnd, LongType, session.timeColumn.dataType) - .castNullable() :: - Nil) - - val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)( - exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata)) - - val replacedPlan = p transformExpressions { - case s: SessionWindow => sessionAttr - } - - val filterByTimeRange = session.gapDuration match { - case Literal(interval: CalendarInterval, CalendarIntervalType) => - interval == null || interval.months + interval.days + interval.microseconds <= 0 - case _ => true - } - - // As same as tumbling window, we add a filter to filter out nulls. - // And we also filter out events with negative or zero or invalid gap duration. - val filterExpr = if (filterByTimeRange) { - IsNotNull(session.timeColumn) && - (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START)) - } else { - IsNotNull(session.timeColumn) - } - - replacedPlan.withNewChildren( - Filter(filterExpr, - Project(sessionStruct +: child.output, child)) :: Nil) - } else if (numWindowExpr > 1) { - throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) - } else { - p // Return unchanged. Analyzer will throw exception later - } - } -} - /** * Resolve expressions if they contains [[NamePlaceholder]]s. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index ef8ce3f48d5a5..f5e494e909671 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -639,6 +639,7 @@ object FunctionRegistry { expression[Year]("year"), expression[TimeWindow]("window"), expression[SessionWindow]("session_window"), + expression[WindowTime]("window_time"), expression[MakeDate]("make_date"), expression[MakeTimestamp]("make_timestamp"), // We keep the 2 expression builders below to have different function docs. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala new file mode 100644 index 0000000000000..fd5da3ff13d88 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} +import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType} +import org.apache.spark.unsafe.types.CalendarInterval + +/** + * Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to + * figure out how many windows a time column can map to, we over-estimate the number of windows and + * filter out the rows where the time column is not inside the time window. + */ +object TimeWindowing extends Rule[LogicalPlan] { + import org.apache.spark.sql.catalyst.dsl.expressions._ + + private final val WINDOW_COL_NAME = "window" + private final val WINDOW_START = "start" + private final val WINDOW_END = "end" + + /** + * Generates the logical plan for generating window ranges on a timestamp column. Without + * knowing what the timestamp value is, it's non-trivial to figure out deterministically how many + * window ranges a timestamp will map to given all possible combinations of a window duration, + * slide duration and start time (offset). Therefore, we express and over-estimate the number of + * windows there may be, and filter the valid windows. We use last Project operator to group + * the window columns into a struct so they can be accessed as `window.start` and `window.end`. + * + * The windows are calculated as below: + * maxNumOverlapping <- ceil(windowDuration / slideDuration) + * for (i <- 0 until maxNumOverlapping) + * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration + * windowStart <- lastStart - i * slideDuration + * windowEnd <- windowStart + windowDuration + * return windowStart, windowEnd + * + * This behaves as follows for the given parameters for the time: 12:05. The valid windows are + * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the + * Filter operator. + * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m + * 11:55 - 12:07 + 11:52 - 12:04 x + * 12:00 - 12:12 + 11:57 - 12:09 + + * 12:05 - 12:17 + 12:02 - 12:14 + + * + * @param plan The logical plan + * @return the logical plan that will generate the time windows using the Expand operator, with + * the Filter operator for correctness and Project for usability. + */ + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(TIME_WINDOW), ruleId) { + case p: LogicalPlan if p.children.size == 1 => + val child = p.children.head + val windowExpressions = + p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet + + val numWindowExpr = p.expressions.flatMap(_.collect { + case s: SessionWindow => s + case t: TimeWindow => t + }).toSet.size + + // Only support a single window expression for now + if (numWindowExpr == 1 && windowExpressions.nonEmpty && + windowExpressions.head.timeColumn.resolved && + windowExpressions.head.checkInputDataTypes().isSuccess) { + + val window = windowExpressions.head + + if (StructType.acceptsType(window.timeColumn.dataType)) { + return p.transformExpressions { + case t: TimeWindow => t.copy(timeColumn = WindowTime(window.timeColumn)) + } + } + + val metadata = window.timeColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } + + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .putBoolean(TimeWindow.marker, true) + .build() + + def getWindow(i: Int, dataType: DataType): Expression = { + val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) + val lastStart = timestamp - (timestamp - window.startTime + + window.slideDuration) % window.slideDuration + val windowStart = lastStart - i * window.slideDuration + val windowEnd = windowStart + window.windowDuration + + // We make sure value fields are nullable since the dataType of TimeWindow defines them + // as nullable. + CreateNamedStruct( + Literal(WINDOW_START) :: + PreciseTimestampConversion(windowStart, LongType, dataType).castNullable() :: + Literal(WINDOW_END) :: + PreciseTimestampConversion(windowEnd, LongType, dataType).castNullable() :: + Nil) + } + + val windowAttr = AttributeReference( + WINDOW_COL_NAME, window.dataType, metadata = newMetadata)() + + if (window.windowDuration == window.slideDuration) { + val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)( + exprId = windowAttr.exprId, explicitMetadata = Some(newMetadata)) + + val replacedPlan = p transformExpressions { + case t: TimeWindow => windowAttr + } + + // For backwards compatibility we add a filter to filter out nulls + val filterExpr = IsNotNull(window.timeColumn) + + replacedPlan.withNewChildren( + Project(windowStruct +: child.output, + Filter(filterExpr, child)) :: Nil) + } else { + val overlappingWindows = + math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt + val windows = + Seq.tabulate(overlappingWindows)(i => + getWindow(i, window.timeColumn.dataType)) + + val projections = windows.map(_ +: child.output) + + // When the condition windowDuration % slideDuration = 0 is fulfilled, + // the estimation of the number of windows becomes exact one, + // which means all produced windows are valid. + val filterExpr = + if (window.windowDuration % window.slideDuration == 0) { + IsNotNull(window.timeColumn) + } else { + window.timeColumn >= windowAttr.getField(WINDOW_START) && + window.timeColumn < windowAttr.getField(WINDOW_END) + } + + val substitutedPlan = Filter(filterExpr, + Expand(projections, windowAttr +: child.output, child)) + + val renamedPlan = p transformExpressions { + case t: TimeWindow => windowAttr + } + + renamedPlan.withNewChildren(substitutedPlan :: Nil) + } + } else if (numWindowExpr > 1) { + throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) + } else { + p // Return unchanged. Analyzer will throw exception later + } + } +} + +/** Maps a time column to a session window. */ +object SessionWindowing extends Rule[LogicalPlan] { + import org.apache.spark.sql.catalyst.dsl.expressions._ + + private final val SESSION_COL_NAME = "session_window" + private final val SESSION_START = "start" + private final val SESSION_END = "end" + + /** + * Generates the logical plan for generating session window on a timestamp column. + * Each session window is initially defined as [timestamp, timestamp + gap). + * + * This also adds a marker to the session column so that downstream can easily find the column + * on session window. + */ + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case p: LogicalPlan if p.children.size == 1 => + val child = p.children.head + val sessionExpressions = + p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet + + val numWindowExpr = p.expressions.flatMap(_.collect { + case s: SessionWindow => s + case t: TimeWindow => t + }).toSet.size + + // Only support a single session expression for now + if (numWindowExpr == 1 && sessionExpressions.nonEmpty && + sessionExpressions.head.timeColumn.resolved && + sessionExpressions.head.checkInputDataTypes().isSuccess) { + + val session = sessionExpressions.head + + if (StructType.acceptsType(session.timeColumn.dataType)) { + return p transformExpressions { + case t: SessionWindow => t.copy(timeColumn = WindowTime(session.timeColumn)) + } + } + + val metadata = session.timeColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } + + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .putBoolean(SessionWindow.marker, true) + .build() + + val sessionAttr = AttributeReference( + SESSION_COL_NAME, session.dataType, metadata = newMetadata)() + + val sessionStart = + PreciseTimestampConversion(session.timeColumn, session.timeColumn.dataType, LongType) + val gapDuration = session.gapDuration match { + case expr if Cast.canCast(expr.dataType, CalendarIntervalType) => + Cast(expr, CalendarIntervalType) + case other => + throw QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType) + } + val sessionEnd = PreciseTimestampConversion(session.timeColumn + gapDuration, + session.timeColumn.dataType, LongType) + + // We make sure value fields are nullable since the dataType of SessionWindow defines them + // as nullable. + val literalSessionStruct = CreateNamedStruct( + Literal(SESSION_START) :: + PreciseTimestampConversion(sessionStart, LongType, session.timeColumn.dataType) + .castNullable() :: + Literal(SESSION_END) :: + PreciseTimestampConversion(sessionEnd, LongType, session.timeColumn.dataType) + .castNullable() :: + Nil) + + val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)( + exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata)) + + val replacedPlan = p transformExpressions { + case s: SessionWindow => sessionAttr + } + + val filterByTimeRange = session.gapDuration match { + case Literal(interval: CalendarInterval, CalendarIntervalType) => + interval == null || interval.months + interval.days + interval.microseconds <= 0 + case _ => true + } + + // As same as tumbling window, we add a filter to filter out nulls. + // And we also filter out events with negative or zero or invalid gap duration. + val filterExpr = if (filterByTimeRange) { + IsNotNull(session.timeColumn) && + (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START)) + } else { + IsNotNull(session.timeColumn) + } + + replacedPlan.withNewChildren( + Filter(filterExpr, + Project(sessionStruct +: child.output, child)) :: Nil) + } else if (numWindowExpr > 1) { + throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) + } else { + p // Return unchanged. Analyzer will throw exception later + } + } +} + +/** + * Resolves the window_time expression which extracts the correct window time from the + * window column generated as the output of the window aggregating operators. The + * window column is of type struct { start: TimestampType, end: TimestampType }. + * The correct representative event time of a window is ``window.end - 1``. + * */ +object ResolveWindowTime extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case p: LogicalPlan if p.children.size == 1 => + val child = p.children.head + val windowTimeExpressions = + p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet + + if (windowTimeExpressions.size == 1 && + windowTimeExpressions.head.windowColumn.resolved && + windowTimeExpressions.head.checkInputDataTypes().isSuccess) { + + val windowTime = windowTimeExpressions.head + + val metadata = windowTime.windowColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } + + if (!metadata.contains(TimeWindow.marker) && + !metadata.contains(SessionWindow.marker)) { + // FIXME: error framework? + throw new AnalysisException( + "The input is not a correct window column: $windowTime", plan = Some(p)) + } + + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .remove(TimeWindow.marker) + .remove(SessionWindow.marker) + .build() + + val attr = AttributeReference( + "window_time", windowTime.dataType, metadata = newMetadata)() + + // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as + // it is, it is going to be bound to the different window even if we apply the same window + // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the + // correct window range. + val subtractExpr = + PreciseTimestampConversion( + Subtract(PreciseTimestampConversion( + GetStructField(windowTime.windowColumn, 1), + windowTime.dataType, LongType), Literal(1L)), + LongType, + windowTime.dataType) + + val newColumn = Alias(subtractExpr, "window_time")( + exprId = attr.exprId, explicitMetadata = Some(newMetadata)) + + val replacedPlan = p transformExpressions { + case w: WindowTime => attr + } + + replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil) + } else { + p // Return unchanged. Analyzer will throw exception later + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index d7deca2f7b765..53c79d1fd54bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -136,6 +136,8 @@ case class TimeWindow( } object TimeWindow { + val marker = "spark.timeWindow" + /** * Parses the interval string for a valid time duration. CalendarInterval expects interval * strings to start with the string `interval`. For usability, we prepend `interval` to the string diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala new file mode 100644 index 0000000000000..effc1506d741a --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit line.contains.tab +@ExpressionDescription( + usage = """ + _FUNC_(window_column) - Extract the time value from time/session window column which can be used for event time value of window. + The extracted time is (window.end - 1) which reflects the fact that the the aggregating + windows have exclusive upper bound - [start, end) + See 'Window Operations on Event Time' in Structured Streaming guide doc for detailed explanation and examples. + """, + arguments = """ + Arguments: + * window_column - The column representing time/session window. + """, + examples = """ + Examples: + > SELECT a, window.start as start, window.end as end, _FUNC_(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start); + A1 2021-01-01 00:00:00 2021-01-01 00:05:00 2021-01-01 00:04:59.999999 2 + A1 2021-01-01 00:05:00 2021-01-01 00:10:00 2021-01-01 00:09:59.999999 1 + A2 2021-01-01 00:00:00 2021-01-01 00:05:00 2021-01-01 00:04:59.999999 1 + """, + group = "datetime_funcs", + since = "3.3.0") +// scalastyle:on line.size.limit line.contains.tab +case class WindowTime(windowColumn: Expression) + extends UnaryExpression + with ImplicitCastInputTypes + with Unevaluable + with NonSQLExpression { + + override def child: Expression = windowColumn + override def inputTypes: Seq[AbstractDataType] = Seq(StructType) + + override def dataType: DataType = child.dataType.asInstanceOf[StructType].head.dataType + + override def prettyName: String = "window_time" + + // This expression is replaced in the analyzer. + override lazy val resolved = false + + override protected def withNewChildInternal(newChild: Expression): WindowTime = + copy(windowColumn = newChild) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 620e1c6072172..780bf925ad7e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3777,6 +3777,23 @@ object functions { window(timeColumn, windowDuration, windowDuration, "0 second") } + /** + * Extracts the event time from the window column. + * + * The window column is of StructType { start: Timestamp, end: Timestamp } where start is + * inclusive and end is exclusive. Since event time can support microsecond precision, + * window_time(window) = window.end - 1 microsecond. + * + * @param windowColumn The window column (typically produced by window aggregation) of type + * StructType { start: Timestamp, end: Timestamp } + * + * @group datetime_funcs + * @since 3.3.0 + */ + def window_time(windowColumn: Column): Column = withExpr { + WindowTime(windowColumn.expr) + } + /** * Generates session window given a timestamp specifying column. * diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 4ce4f1225ce64..6f111b777a6d0 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -345,6 +345,7 @@ | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct | | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct | +| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct | | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct | | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct>> | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index bd39453f5120e..f775eb9ecfc0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -575,4 +575,66 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { validateWindowColumnInSchema(schema2, "window") } } + + test("window_time function on raw window column") { + val df = Seq( + ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25") + ).toDF("time") + + checkAnswer( + df.select(window($"time", "10 seconds").as("window")) + .select( + $"window.end".cast("string"), + window_time($"window").cast("string") + ), + Seq( + Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999") + ) + ) + } + + test("2 window_time functions on raw window column") { + val df = Seq( + ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25") + ).toDF("time") + + val e = intercept[AnalysisException] { + df + .withColumn("time2", expr("time - INTERVAL 5 minutes")) + .select( + window($"time", "10 seconds").as("window1"), + window($"time2", "10 seconds").as("window2") + ) + .select( + $"window1.end".cast("string"), + window_time($"window1").cast("string"), + $"window2.end".cast("string"), + window_time($"window2").cast("string") + ) + } + assert(e.getMessage.contains( + "Multiple time/session window expressions would result in a cartesian product of rows, " + + "therefore they are currently not supported")) + } + + test("window_time function on agg output") { + val df = Seq( + ("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2) + ).toDF("time", "value") + checkAnswer( + df.groupBy(window($"time", "10 seconds")) + .agg(count("*").as("counts")) + .orderBy($"window.start".asc) + .select( + $"window.start".cast("string"), + $"window.end".cast("string"), + window_time($"window").cast("string"), + $"counts"), + Seq( + Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", 1), + Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", 1) + ) + ) + } }