Skip to content

Commit da09d9f

Browse files
committed
Merge remote-tracking branch 'upstream/master' into add-flag-disable-constraint-propagation
2 parents d4c9a5e + 478fbc8 commit da09d9f

File tree

219 files changed

+9622
-2880
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

219 files changed

+9622
-2880
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Collate:
5151
'serialize.R'
5252
'sparkR.R'
5353
'stats.R'
54+
'streaming.R'
5455
'types.R'
5556
'utils.R'
5657
'window.R'

R/pkg/NAMESPACE

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ exportMethods("arrange",
8282
"as.data.frame",
8383
"attach",
8484
"cache",
85+
"checkpoint",
8586
"coalesce",
8687
"collect",
8788
"colnames",
@@ -121,6 +122,7 @@ exportMethods("arrange",
121122
"insertInto",
122123
"intersect",
123124
"isLocal",
125+
"isStreaming",
124126
"join",
125127
"limit",
126128
"merge",
@@ -169,6 +171,7 @@ exportMethods("arrange",
169171
"write.json",
170172
"write.orc",
171173
"write.parquet",
174+
"write.stream",
172175
"write.text",
173176
"write.ml")
174177

@@ -365,7 +368,9 @@ export("as.DataFrame",
365368
"read.json",
366369
"read.orc",
367370
"read.parquet",
371+
"read.stream",
368372
"read.text",
373+
"setCheckpointDir",
369374
"spark.lapply",
370375
"spark.addFile",
371376
"spark.getSparkFilesRootDirectory",
@@ -402,6 +407,16 @@ export("partitionBy",
402407
export("windowPartitionBy",
403408
"windowOrderBy")
404409

410+
exportClasses("StreamingQuery")
411+
412+
export("awaitTermination",
413+
"isActive",
414+
"lastProgress",
415+
"queryName",
416+
"status",
417+
"stopQuery")
418+
419+
405420
S3method(print, jobj)
406421
S3method(print, structField)
407422
S3method(print, structType)

R/pkg/R/DataFrame.R

Lines changed: 130 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,6 @@ setMethod("schema",
133133
#'
134134
#' Print the logical and physical Catalyst plans to the console for debugging.
135135
#'
136-
#' @param x a SparkDataFrame.
137-
#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan.
138-
#' @param ... further arguments to be passed to or from other methods.
139136
#' @family SparkDataFrame functions
140137
#' @aliases explain,SparkDataFrame-method
141138
#' @rdname explain
@@ -3515,3 +3512,133 @@ setMethod("getNumPartitions",
35153512
function(x) {
35163513
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
35173514
})
3515+
3516+
#' isStreaming
3517+
#'
3518+
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3519+
#' as it arrives.
3520+
#'
3521+
#' @param x A SparkDataFrame
3522+
#' @return TRUE if this SparkDataFrame is from a streaming source
3523+
#' @family SparkDataFrame functions
3524+
#' @aliases isStreaming,SparkDataFrame-method
3525+
#' @rdname isStreaming
3526+
#' @name isStreaming
3527+
#' @seealso \link{read.stream} \link{write.stream}
3528+
#' @export
3529+
#' @examples
3530+
#'\dontrun{
3531+
#' sparkR.session()
3532+
#' df <- read.stream("socket", host = "localhost", port = 9999)
3533+
#' isStreaming(df)
3534+
#' }
3535+
#' @note isStreaming since 2.2.0
3536+
#' @note experimental
3537+
setMethod("isStreaming",
3538+
signature(x = "SparkDataFrame"),
3539+
function(x) {
3540+
callJMethod(x@sdf, "isStreaming")
3541+
})
3542+
3543+
#' Write the streaming SparkDataFrame to a data source.
3544+
#'
3545+
#' The data source is specified by the \code{source} and a set of options (...).
3546+
#' If \code{source} is not specified, the default data source configured by
3547+
#' spark.sql.sources.default will be used.
3548+
#'
3549+
#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
3550+
#' output data source. There are three modes:
3551+
#' \itemize{
3552+
#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
3553+
#' output mode can be only be used in queries that do not contain any aggregation.
3554+
#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
3555+
#' there are some updates. This output mode can only be used in queries that
3556+
#' contain aggregations.
3557+
#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
3558+
#' out every time there are some updates. If the query doesn't contain aggregations,
3559+
#' it will be equivalent to \code{append} mode.
3560+
#' }
3561+
#'
3562+
#' @param df a streaming SparkDataFrame.
3563+
#' @param source a name for external data source.
3564+
#' @param outputMode one of 'append', 'complete', 'update'.
3565+
#' @param ... additional argument(s) passed to the method.
3566+
#'
3567+
#' @family SparkDataFrame functions
3568+
#' @seealso \link{read.stream}
3569+
#' @aliases write.stream,SparkDataFrame-method
3570+
#' @rdname write.stream
3571+
#' @name write.stream
3572+
#' @export
3573+
#' @examples
3574+
#'\dontrun{
3575+
#' sparkR.session()
3576+
#' df <- read.stream("socket", host = "localhost", port = 9999)
3577+
#' isStreaming(df)
3578+
#' wordCounts <- count(group_by(df, "value"))
3579+
#'
3580+
#' # console
3581+
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
3582+
#' # text stream
3583+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
3584+
#' # memory stream
3585+
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
3586+
#' head(sql("SELECT * from outs"))
3587+
#' queryName(q)
3588+
#'
3589+
#' stopQuery(q)
3590+
#' }
3591+
#' @note write.stream since 2.2.0
3592+
#' @note experimental
3593+
setMethod("write.stream",
3594+
signature(df = "SparkDataFrame"),
3595+
function(df, source = NULL, outputMode = NULL, ...) {
3596+
if (!is.null(source) && !is.character(source)) {
3597+
stop("source should be character, NULL or omitted. It is the data source specified ",
3598+
"in 'spark.sql.sources.default' configuration by default.")
3599+
}
3600+
if (!is.null(outputMode) && !is.character(outputMode)) {
3601+
stop("outputMode should be charactor or omitted.")
3602+
}
3603+
if (is.null(source)) {
3604+
source <- getDefaultSqlSource()
3605+
}
3606+
options <- varargsToStrEnv(...)
3607+
write <- handledCallJMethod(df@sdf, "writeStream")
3608+
write <- callJMethod(write, "format", source)
3609+
if (!is.null(outputMode)) {
3610+
write <- callJMethod(write, "outputMode", outputMode)
3611+
}
3612+
write <- callJMethod(write, "options", options)
3613+
ssq <- handledCallJMethod(write, "start")
3614+
streamingQuery(ssq)
3615+
})
3616+
3617+
#' checkpoint
3618+
#'
3619+
#' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be used to truncate the
3620+
#' logical plan, which is especially useful in iterative algorithms where the plan may grow
3621+
#' exponentially. It will be saved to files inside the checkpoint directory set with
3622+
#' \code{setCheckpointDir}
3623+
#'
3624+
#' @param x A SparkDataFrame
3625+
#' @param eager whether to checkpoint this SparkDataFrame immediately
3626+
#' @return a new checkpointed SparkDataFrame
3627+
#' @family SparkDataFrame functions
3628+
#' @aliases checkpoint,SparkDataFrame-method
3629+
#' @rdname checkpoint
3630+
#' @name checkpoint
3631+
#' @seealso \link{setCheckpointDir}
3632+
#' @export
3633+
#' @examples
3634+
#'\dontrun{
3635+
#' setCheckpointDir("/checkpoint")
3636+
#' df <- checkpoint(df)
3637+
#' }
3638+
#' @note checkpoint since 2.2.0
3639+
setMethod("checkpoint",
3640+
signature(x = "SparkDataFrame"),
3641+
function(x, eager = TRUE) {
3642+
df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
3643+
dataFrame(df)
3644+
})

R/pkg/R/RDD.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ setMethod("unpersistRDD",
291291
#' @rdname checkpoint-methods
292292
#' @aliases checkpoint,RDD-method
293293
#' @noRd
294-
setMethod("checkpoint",
294+
setMethod("checkpointRDD",
295295
signature(x = "RDD"),
296296
function(x) {
297297
jrdd <- getJRDD(x)

R/pkg/R/SQLContext.R

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName,
937937
}
938938
dataFrame(sdf)
939939
}
940+
941+
#' Load a streaming SparkDataFrame
942+
#'
943+
#' Returns the dataset in a data source as a SparkDataFrame
944+
#'
945+
#' The data source is specified by the \code{source} and a set of options(...).
946+
#' If \code{source} is not specified, the default data source configured by
947+
#' "spark.sql.sources.default" will be used.
948+
#'
949+
#' @param source The name of external data source
950+
#' @param schema The data schema defined in structType, this is required for file-based streaming
951+
#' data source
952+
#' @param ... additional external data source specific named options, for instance \code{path} for
953+
#' file-based streaming data source
954+
#' @return SparkDataFrame
955+
#' @rdname read.stream
956+
#' @name read.stream
957+
#' @seealso \link{write.stream}
958+
#' @export
959+
#' @examples
960+
#'\dontrun{
961+
#' sparkR.session()
962+
#' df <- read.stream("socket", host = "localhost", port = 9999)
963+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
964+
#'
965+
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
966+
#' }
967+
#' @name read.stream
968+
#' @note read.stream since 2.2.0
969+
#' @note experimental
970+
read.stream <- function(source = NULL, schema = NULL, ...) {
971+
sparkSession <- getSparkSession()
972+
if (!is.null(source) && !is.character(source)) {
973+
stop("source should be character, NULL or omitted. It is the data source specified ",
974+
"in 'spark.sql.sources.default' configuration by default.")
975+
}
976+
if (is.null(source)) {
977+
source <- getDefaultSqlSource()
978+
}
979+
options <- varargsToStrEnv(...)
980+
read <- callJMethod(sparkSession, "readStream")
981+
read <- callJMethod(read, "format", source)
982+
if (!is.null(schema)) {
983+
stopifnot(class(schema) == "structType")
984+
read <- callJMethod(read, "schema", schema$jobj)
985+
}
986+
read <- callJMethod(read, "options", options)
987+
sdf <- handledCallJMethod(read, "load")
988+
dataFrame(callJMethod(sdf, "toDF"))
989+
}

R/pkg/R/context.R

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ broadcast <- function(sc, object) {
291291
#' rdd <- parallelize(sc, 1:2, 2L)
292292
#' checkpoint(rdd)
293293
#'}
294-
setCheckpointDir <- function(sc, dirName) {
294+
setCheckpointDirSC <- function(sc, dirName) {
295295
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
296296
}
297297

@@ -330,7 +330,13 @@ spark.addFile <- function(path, recursive = FALSE) {
330330
#'}
331331
#' @note spark.getSparkFilesRootDirectory since 2.1.0
332332
spark.getSparkFilesRootDirectory <- function() {
333-
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
333+
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
334+
# Running on driver.
335+
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
336+
} else {
337+
# Running on worker.
338+
Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
339+
}
334340
}
335341

336342
#' Get the absolute path of a file added through spark.addFile.
@@ -345,7 +351,13 @@ spark.getSparkFilesRootDirectory <- function() {
345351
#'}
346352
#' @note spark.getSparkFiles since 2.1.0
347353
spark.getSparkFiles <- function(fileName) {
348-
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
354+
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
355+
# Running on driver.
356+
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
357+
} else {
358+
# Running on worker.
359+
file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
360+
}
349361
}
350362

351363
#' Run a function over a list of elements, distributing the computations with Spark
@@ -410,3 +422,22 @@ setLogLevel <- function(level) {
410422
sc <- getSparkContext()
411423
invisible(callJMethod(sc, "setLogLevel", level))
412424
}
425+
426+
#' Set checkpoint directory
427+
#'
428+
#' Set the directory under which SparkDataFrame are going to be checkpointed. The directory must be
429+
#' a HDFS path if running on a cluster.
430+
#'
431+
#' @rdname setCheckpointDir
432+
#' @param directory Directory path to checkpoint to
433+
#' @seealso \link{checkpoint}
434+
#' @export
435+
#' @examples
436+
#'\dontrun{
437+
#' setCheckpointDir("/checkpoint")
438+
#'}
439+
#' @note setCheckpointDir since 2.2.0
440+
setCheckpointDir <- function(directory) {
441+
sc <- getSparkContext()
442+
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(directory))))
443+
}

R/pkg/R/functions.R

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,10 +1795,10 @@ setMethod("to_date",
17951795

17961796
#' to_json
17971797
#'
1798-
#' Converts a column containing a \code{structType} into a Column of JSON string.
1799-
#' Resolving the Column can fail if an unsupported type is encountered.
1798+
#' Converts a column containing a \code{structType} or array of \code{structType} into a Column
1799+
#' of JSON string. Resolving the Column can fail if an unsupported type is encountered.
18001800
#'
1801-
#' @param x Column containing the struct
1801+
#' @param x Column containing the struct or array of the structs
18021802
#' @param ... additional named properties to control how it is converted, accepts the same options
18031803
#' as the JSON data source.
18041804
#'
@@ -1809,8 +1809,13 @@ setMethod("to_date",
18091809
#' @export
18101810
#' @examples
18111811
#' \dontrun{
1812-
#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
1813-
#' select(df, to_json(df$t))
1812+
#' # Converts a struct into a JSON object
1813+
#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
1814+
#' select(df, to_json(df$d, dateFormat = 'dd/MM/yyyy'))
1815+
#'
1816+
#' # Converts an array of structs into a JSON array
1817+
#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
1818+
#' select(df, to_json(df$people))
18141819
#'}
18151820
#' @note to_json since 2.2.0
18161821
setMethod("to_json", signature(x = "Column"),
@@ -2433,7 +2438,8 @@ setMethod("date_format", signature(y = "Column", x = "character"),
24332438
#' from_json
24342439
#'
24352440
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
2436-
#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
2441+
#' \code{schema} or array of \code{structType} if \code{asJsonArray} is set to \code{TRUE}.
2442+
#' If the string is unparseable, the Column will contains the value NA.
24372443
#'
24382444
#' @param x Column containing the JSON string.
24392445
#' @param schema a structType object to use as the schema to use when parsing the JSON string.

0 commit comments

Comments
 (0)