Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ exportMethods("%in%",
"var_samp",
"weekofyear",
"when",
"window",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to check window from the stats package can still be called?
https://stat.ethz.ch/R-manual/R-devel/library/stats/html/window.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. window can be called using stats::window

"year")

exportClasses("GroupedData")
Expand Down
63 changes: 63 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
column(jc)
})

#' window
#'
#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
#' the order of months are not supported.
#'
#' The time column must be of TimestampType.
#'
#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
#' If the `slideDuration` is not provided, the windows will be tumbling windows.
#'
#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
#'
#' The output column will be a struct called 'window' by default with the nested columns 'start'
#' and 'end'.
#'
#' @family datetime_funcs
#' @rdname window
#' @name window
#' @export
#' @examples
#'\dontrun{
#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
#' window(df$time, "1 minute", "15 seconds", "10 seconds")
#'
#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
#' # 09:01:15-09:02:15...
#' window(df$time, "1 minute", startTime = "15 seconds")
#'
#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
#' window(df$time, "30 seconds", "10 seconds")
#'}
setMethod("window", signature(x = "Column"),
function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
stopifnot(is.character(windowDuration))
if (!is.null(slideDuration) && !is.null(startTime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check we check the type of windowDuration, slideDuration and startTime?

stopifnot(is.character(slideDuration) && is.character(startTime))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, slideDuration, startTime)
} else if (!is.null(slideDuration)) {
stopifnot(is.character(slideDuration))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, slideDuration)
} else if (!is.null(startTime)) {
stopifnot(is.character(startTime))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, windowDuration, startTime)
} else {
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration)
}
column(jc)
})

#' locate
#'
#' Locate the position of the first occurrence of substr.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
#' @export
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })

#' @rdname window
#' @export
setGeneric("window", function(x, ...) { standardGeneric("window") })

#' @rdname year
#' @export
setGeneric("year", function(x) { standardGeneric("year") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop")
"summary", "transform", "drop", "window")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand Down
36 changes: 36 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
})

test_that("time windowing (window()) with all inputs", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})

test_that("time windowing (window()) with slide duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1, 1))
})

test_that("time windowing (window()) with start time", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})

test_that("time windowing (window()) with just window duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})

test_that("when(), otherwise() and ifelse() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
Expand Down