Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
a7250b4
added Multivariate gaussian in ML Pyspark
praveendareddy21 May 22, 2016
0c58e88
added testcase for python multivariate
praveendareddy21 May 22, 2016
5fb7804
[SPARK-16114][SQL] structured streaming network word count examples
jjthomas Jun 28, 2016
52c9d69
[MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around `DataFrame…
brkyvz Jun 29, 2016
d7a59f1
[SPARKR] add csv tests
felixcheung Jun 29, 2016
835c5a3
[SPARK-16268][PYSPARK] SQLContext should import DataStreamReader
zsxwing Jun 29, 2016
dd70a11
[SPARK-16248][SQL] Whitelist the list of Hive fallback functions
rxin Jun 29, 2016
22b4072
[SPARK-16245][ML] model loading backward compatibility for ml.feature…
yanboliang Jun 29, 2016
345212b
[SPARK-16259][PYSPARK] cleanup options in DataFrame read/write API
Jun 28, 2016
6650c05
[SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspa…
tdas Jun 29, 2016
9041223
[TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Java…
holdenk Jun 29, 2016
1b4d63f
[SPARK-16291][SQL] CheckAnalysis should capture nested aggregate func…
liancheng Jun 29, 2016
ba71cf4
[SPARK-16261][EXAMPLES][ML] Fixed incorrect appNames in ML Examples
BryanCutler Jun 29, 2016
d96e8c2
[MINOR][SPARKR] Fix arguments of survreg in SparkR
yanboliang Jun 29, 2016
1cde325
[SPARK-16140][MLLIB][SPARKR][DOCS] Group k-means method in generated …
keypointt Jun 29, 2016
edd1905
[SPARK-16236][SQL][FOLLOWUP] Add Path Option back to Load API in Data…
gatorsmile Jun 29, 2016
3cc258e
[SPARK-16256][SQL][STREAMING] Added Structured Streaming Programming …
tdas Jun 29, 2016
809af6d
[TRIVIAL] [PYSPARK] Clean up orc compression option as well
HyukjinKwon Jun 29, 2016
a7f66ef
[SPARK-16301] [SQL] The analyzer rule for resolving using joins shoul…
yhuai Jun 29, 2016
ef0253f
[SPARK-16006][SQL] Attemping to write empty DataFrame with no fields …
dongjoon-hyun Jun 29, 2016
c4cebd5
[SPARK-16238] Metrics for generated method and class bytecode size
ericl Jun 29, 2016
011befd
[SPARK-16228][SQL] HiveSessionCatalog should return `double`-param fu…
dongjoon-hyun Jun 29, 2016
8da4314
[SPARK-16134][SQL] optimizer rules for typed filter
cloud-fan Jun 30, 2016
e1bdf1e
Revert "[SPARK-16134][SQL] optimizer rules for typed filter"
liancheng Jun 30, 2016
b52bd80
[SPARK-16267][TEST] Replace deprecated `CREATE TEMPORARY TABLE ... US…
dongjoon-hyun Jun 30, 2016
a548523
[SPARK-16294][SQL] Labelling support for the include_example Jekyll p…
liancheng Jun 30, 2016
3134f11
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.1…
koeninger Jun 30, 2016
c8a7c23
[SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programmi…
tdas Jun 30, 2016
1d27445
[SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes
zlpmichelle Jun 30, 2016
6a4f4c1
[SPARK-12177][TEST] Removed test to avoid compilation issue in scala …
tdas Jun 30, 2016
56207fc
[SPARK-16071][SQL] Checks size limit when doubling the array size in …
clockfly Jun 30, 2016
98056a1
[BUILD] Fix version in poms related to kafka-0-10
tdas Jun 30, 2016
f17ffef
[SPARK-13850] Force the sorter to Spill when number of elements in th…
Jun 30, 2016
03008e0
[SPARK-16256][DOCS] Fix window operation diagram
tdas Jun 30, 2016
4dc7d37
[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundExcep…
petermaxlee Jun 30, 2016
17c7522
[SPARK-16313][SQL] Spark should not silently drop exceptions in file …
rxin Jun 30, 2016
d3027c4
[SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion …
Jul 1, 2016
79c96c9
[SPARK-15643][DOC][ML] Add breaking changes to ML migration guide
Jul 1, 2016
94d61de
[SPARK-15954][SQL] Disable loading test tables in Python tests
rxin Jul 1, 2016
80a7bff
[SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API
WeichenXu123 Jun 30, 2016
cc3c44b
[SPARK-14608][ML] transformSchema needs better documentation
hhbyyh Jul 1, 2016
1932bb6
[SPARK-12177][STREAMING][KAFKA] limit api surface area
koeninger Jul 1, 2016
972106d
[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Pro…
srowen Jul 1, 2016
0b64543
[SPARK-15761][MLLIB][PYSPARK] Load ipython when default python is Pyt…
MechCoder Jul 1, 2016
3665927
[SPARK-16222][SQL] JDBC Sources - Handling illegal input values for `…
gatorsmile Jul 1, 2016
4c96ded
[SPARK-16012][SPARKR] Implement gapplyCollect which will apply a R fu…
Jul 1, 2016
d658811
[SPARK-16299][SPARKR] Capture errors from R workers in daemon.R to av…
sun-rui Jul 1, 2016
78387ce
[SPARK-16335][SQL] Structured streaming should fail if source directo…
rxin Jul 1, 2016
794d099
[SPARK-16233][R][TEST] ORC test should be enabled only when HiveConte…
dongjoon-hyun Jul 1, 2016
ab43038
[SPARK-16095][YARN] Yarn cluster mode should report correct state to …
renozhang Jul 1, 2016
f3a3599
[GRAPHX][EXAMPLES] move graphx test data directory and update graphx …
WeichenXu123 Jul 2, 2016
0d0b416
[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx program…
WeichenXu123 Jul 2, 2016
0c6fd03
[MINOR][BUILD] Fix Java linter errors
dongjoon-hyun Jul 2, 2016
3ecee57
[SPARK-16260][ML][EXAMPLE] PySpark ML Example Improvements and Cleanup
wangmiao1981 Jul 4, 2016
ecbb447
[MINOR][DOCS] Remove unused images; crush PNGs that could use it for …
srowen Jul 4, 2016
d5683a7
[SPARK-16329][SQL][BACKPORT-2.0] Star Expansion over Table Containing…
gatorsmile Jul 4, 2016
cc100ab
[SPARK-16353][BUILD][DOC] Missing javadoc options for java unidoc
Jul 4, 2016
0754ccb
[SPARK-16311][SQL] Metadata refresh should work on temporary views
rxin Jul 5, 2016
cabee23
[SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka
koeninger Jul 5, 2016
9c1596b
[SPARK-15730][SQL] Respect the --hiveconf in the spark-sql command line
chenghao-intel Jul 5, 2016
801fb79
[SPARK-16359][STREAMING][KAFKA] unidoc skip kafka 0.10
koeninger Jul 5, 2016
a2ef13a
[SPARK-16385][CORE] Catch correct exception when calling method via r…
Jul 5, 2016
0fe2a8c
[SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML J…
jkbradley Jul 6, 2016
4a55b23
Preparing Spark release v2.0.0-rc2
pwendell Jul 6, 2016
6e8fa86
Preparing development version 2.0.1-SNAPSHOT
pwendell Jul 6, 2016
521fc71
[SPARK-16339][CORE] ScriptTransform does not print stderr when outstr…
tejasapatil Jul 6, 2016
25006c8
[SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to pu…
YY-OnCall Jul 6, 2016
d5d2457
[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached
rxin Jul 6, 2016
e956bd7
[SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails
gatorsmile Jul 6, 2016
091cd5f
[DOC][SQL] update out-of-date code snippets using SQLContext in all d…
WeichenXu123 Jul 6, 2016
03f336d
[MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark docum…
HyukjinKwon Jul 6, 2016
2465f07
[SPARK-16371][SQL] Do not push down filters incorrectly when inner na…
HyukjinKwon Jul 6, 2016
d7926da
[SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM…
tmnd1991 Jul 6, 2016
88be66b
[SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condi…
srowen Jul 6, 2016
2c2b8f1
[MESOS] expand coarse-grained mode docs
Jul 6, 2016
05ddc75
[SPARK-16371][SQL] Two follow-up tasks
rxin Jul 6, 2016
920162a
[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as…
koeninger Jul 6, 2016
d63428a
[SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatch…
gatorsmile Jul 7, 2016
2493335
[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix
yinxusen Jul 7, 2016
cbfd94e
[SPARK-16350][SQL] Fix support for incremental planning in wirteStrea…
lw-lin Jul 7, 2016
30cb3f1
[SPARK-16415][SQL] fix catalog string error
adrian-wang Jul 7, 2016
5828da4
[SPARK-16310][SPARKR] R na.string-like default for csv source
felixcheung Jul 7, 2016
73c764a
[SPARK-16425][R] `describe()` should not fail with non-numeric columns
dongjoon-hyun Jul 8, 2016
88603bd
[SPARK-16276][SQL] Implement elt SQL function
petermaxlee Jun 30, 2016
7ef1d1c
[SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL fun…
dongjoon-hyun Jul 3, 2016
a049754
[SPARK-16289][SQL] Implement posexplode table generating function
dongjoon-hyun Jun 30, 2016
144aa84
[SPARK-16271][SQL] Implement Hive's UDFXPathUtil
petermaxlee Jun 29, 2016
bb4b041
[SPARK-16274][SQL] Implement xpath_boolean
petermaxlee Jun 30, 2016
e32c29d
[SPARK-16288][SQL] Implement inline table generating function
dongjoon-hyun Jul 3, 2016
565e18c
[SPARK-16286][SQL] Implement stack table generating function
dongjoon-hyun Jul 6, 2016
18ace01
[SPARK-16430][SQL][STREAMING] Add option maxFilesPerTrigger
tdas Jul 8, 2016
221a4a7
[SPARK-16285][SQL] Implement sentences SQL functions
dongjoon-hyun Jul 8, 2016
8c81806
[SPARK-16369][MLLIB] tallSkinnyQR of RowMatrix should aware of empty …
yinxusen Jul 8, 2016
8dee2ec
[SPARK-13638][SQL] Add quoteAll option to CSV DataFrameWriter
jurriaan Jul 8, 2016
0e9333b
[SPARK-16420] Ensure compression streams are closed.
rdblue Jul 8, 2016
e3424fd
[SPARK-16281][SQL] Implement parse_url SQL function
janplus Jul 8, 2016
07f562f
[SPARK-16453][BUILD] release-build.sh is missing hive-thriftserver fo…
yhuai Jul 8, 2016
463cbf7
[SPARK-16387][SQL] JDBC Writer should use dialect to quote field names.
dongjoon-hyun Jul 8, 2016
c425230
[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription
koeninger Jul 9, 2016
16202ba
[SPARK-16376][WEBUI][SPARK WEB UI][APP-ID] HTTP ERROR 500 when using …
srowen Jul 9, 2016
5024c4c
[SPARK-16432] Empty blocks fail to serialize due to assert in Chunked…
ericl Jul 9, 2016
50d7002
[SPARK-11857][MESOS] Deprecate fine grained
Jul 9, 2016
a33643c
[SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider…
gatorsmile Jul 9, 2016
b254fba
added Multivariate gaussian in ML Pyspark
praveendareddy21 May 22, 2016
ff450f5
added testcase for python multivariate
praveendareddy21 May 22, 2016
8968543
ran pep8 and other style changes
praveendareddy21 Jul 11, 2016
39b3952
resolved merge conflicts in local branch
praveendareddy21 Jul 11, 2016
901d2b0
resolved merge conflicts on local branch
praveendareddy21 Jul 11, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ exportMethods("arrange",
"first",
"freqItems",
"gapply",
"gapplyCollect",
"group_by",
"groupBy",
"head",
Expand Down Expand Up @@ -234,6 +235,7 @@ exportMethods("%in%",
"over",
"percent_rank",
"pmod",
"posexplode",
"quarter",
"rand",
"randn",
Expand Down
114 changes: 103 additions & 11 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ setMethod("dapplyCollect",

#' gapply
#'
#' Group the SparkDataFrame using the specified columns and apply the R function to each
#' Groups the SparkDataFrame using the specified columns and applies the R function to each
#' group.
#'
#' @param x A SparkDataFrame
Expand All @@ -1351,9 +1351,11 @@ setMethod("dapplyCollect",
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @return a SparkDataFrame
#' @family SparkDataFrame functions
#' @rdname gapply
#' @name gapply
#' @seealso \link{gapplyCollect}
#' @export
#' @examples
#'
Expand All @@ -1369,14 +1371,22 @@ setMethod("dapplyCollect",
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' result <- gapply(
#' df,
#' list("a", "c"),
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#' }, schema)
#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
#' result <- gapply(
#' gdf,
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#' collect(result)
#'
#' Result
#' ------
Expand All @@ -1394,16 +1404,16 @@ setMethod("dapplyCollect",
#' structField("Petal_Width", "double"))
#' df1 <- gapply(
#' df,
#' list(df$"Species"),
#' df$"Species",
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' }, schema)
#' collect(df1)
#'
#'Result
#'---------
#' Result
#' ---------
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
Expand All @@ -1418,6 +1428,89 @@ setMethod("gapply",
gapply(grouped, func, schema)
})

#' gapplyCollect
#'
#' Groups the SparkDataFrame using the specified columns, applies the R function to each
#' group and collects the result back to R as data.frame.
#'
#' @param x A SparkDataFrame
#' @param cols Grouping columns
#' @param func A function to be applied to each group partition specified by grouping
#' column of the SparkDataFrame. The function `func` takes as argument
#' a key - grouping columns and a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @return a data.frame
#' @family SparkDataFrame functions
#' @rdname gapplyCollect
#' @name gapplyCollect
#' @seealso \link{gapply}
#' @export
#' @examples
#'
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' result <- gapplyCollect(
#' df,
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' colnames(y) <- c("key_a", "key_c", "mean_b")
#' y
#' })
#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
#' result <- gapplyCollect(
#' gdf,
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' colnames(y) <- c("key_a", "key_c", "mean_b")
#' y
#' })
#'
#' Result
#' ------
#' key_a key_c mean_b
#' 3 3 3.0
#' 1 1 1.5
#'
#' Fits linear models on iris dataset by grouping on the 'Species' column and
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
#' and 'Petal_Width' as training features.
#'
#' df <- createDataFrame (iris)
#' result <- gapplyCollect(
#' df,
#' df$"Species",
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' })
#'
#' Result
#'---------
#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#'
#'}
#' @note gapplyCollect(SparkDataFrame) since 2.0.0
setMethod("gapplyCollect",
signature(x = "SparkDataFrame"),
function(x, cols, func) {
grouped <- do.call("groupBy", c(x, cols))
gapplyCollect(grouped, func)
})

############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
# but allow for use with DataFrames by first converting to an RRDD before calling #
Expand Down Expand Up @@ -2524,8 +2617,7 @@ setMethod("describe",
setMethod("describe",
signature(x = "SparkDataFrame"),
function(x) {
colList <- as.list(c(columns(x)))
sdf <- callJMethod(x@sdf, "describe", colList)
sdf <- callJMethod(x@sdf, "describe", list())
dataFrame(sdf)
})

Expand Down
10 changes: 8 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,14 @@ dropTempView <- function(viewName) {
#'
#' The data source is specified by the `source` and a set of options(...).
#' If `source` is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#' "spark.sql.sources.default" will be used. \cr
#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" will be interpreted
#' as NA.
#'
#' @param path The path of files to load
#' @param source The name of external data source
#' @param schema The data schema defined in structType
#' @param na.strings Default string value for NA when source is "csv"
#' @return SparkDataFrame
#' @rdname read.df
#' @name read.df
Expand All @@ -735,7 +738,7 @@ dropTempView <- function(viewName) {
#' @name read.df
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand All @@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
if (is.null(source)) {
source <- getDefaultSqlSource()
}
if (source == "csv" && is.null(options[["nullValue"]])) {
options[["nullValue"]] <- na.strings
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2934,3 +2934,20 @@ setMethod("sort_array",
jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", x@jc, asc)
column(jc)
})

#' posexplode
#'
#' Creates a new row for each element with position in the given array or map column.
#'
#' @rdname posexplode
#' @name posexplode
#' @family collection_funcs
#' @export
#' @examples \dontrun{posexplode(df$c)}
#' @note posexplode since 2.1.0
setMethod("posexplode",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", x@jc)
column(jc)
})
10 changes: 10 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect")
#' @export
setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })

#' @rdname gapplyCollect
#' @export
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })

#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
Expand Down Expand Up @@ -1050,6 +1054,10 @@ setGeneric("percent_rank", function(x) { standardGeneric("percent_rank") })
#' @export
setGeneric("pmod", function(y, x) { standardGeneric("pmod") })

#' @rdname posexplode
#' @export
setGeneric("posexplode", function(x) { standardGeneric("posexplode") })

#' @rdname quarter
#' @export
setGeneric("quarter", function(x) { standardGeneric("quarter") })
Expand Down Expand Up @@ -1247,6 +1255,7 @@ setGeneric("spark.glm", function(data, formula, ...) { standardGeneric("spark.gl
#' @export
setGeneric("glm")

#' predict
#' @rdname predict
#' @export
setGeneric("predict", function(object, ...) { standardGeneric("predict") })
Expand All @@ -1271,6 +1280,7 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s
#' @export
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })

#' write.ml
#' @rdname write.ml
#' @export
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
93 changes: 40 additions & 53 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -196,64 +196,51 @@ createMethods()

#' gapply
#'
#' Applies a R function to each group in the input GroupedData
#'
#' @param x a GroupedData
#' @param func A function to be applied to each group partition specified by GroupedData.
#' The function `func` takes as argument a key - grouping columns and
#' a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @return a SparkDataFrame
#' @param x A GroupedData
#' @rdname gapply
#' @name gapply
#' @export
#' @examples
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' df,
#' list("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#'
#' Result
#' ------
#' a c avg
#' 3 3 3.0
#' 1 1 1.5
#' }
#' @note gapply(GroupedData) since 2.0.0
setMethod("gapply",
signature(x = "GroupedData"),
function(x, func, schema) {
try(if (is.null(schema)) stop("schema cannot be NULL"))
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
sdf <- callJStatic(
"org.apache.spark.sql.api.r.SQLUtils",
"gapply",
x@sgd,
serialize(cleanClosure(func), connection = NULL),
packageNamesArr,
broadcastArr,
schema$jobj)
dataFrame(sdf)
if (is.null(schema)) stop("schema cannot be NULL")
gapplyInternal(x, func, schema)
})

#' gapplyCollect
#'
#' @param x A GroupedData
#' @rdname gapplyCollect
#' @name gapplyCollect
#' @export
#' @note gapplyCollect(GroupedData) since 2.0.0
setMethod("gapplyCollect",
signature(x = "GroupedData"),
function(x, func) {
gdf <- gapplyInternal(x, func, NULL)
content <- callJMethod(gdf@sdf, "collect")
# content is a list of items of struct type. Each item has a single field
# which is a serialized data.frame corresponds to one group of the
# SparkDataFrame.
ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
ldf <- do.call(rbind, ldfs)
row.names(ldf) <- NULL
ldf
})

gapplyInternal <- function(x, func, schema) {
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
sdf <- callJStatic(
"org.apache.spark.sql.api.r.SQLUtils",
"gapply",
x@sgd,
serialize(cleanClosure(func), connection = NULL),
packageNamesArr,
broadcastArr,
if (class(schema) == "structType") { schema$jobj } else { NULL })
dataFrame(sdf)
}
Loading