Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ setOldClass("structType")
#' @slot env An R environment that stores bookkeeping states of the SparkDataFrame
#' @slot sdf A Java object reference to the backing Scala DataFrame
#' @seealso \link{createDataFrame}, \link{read.json}, \link{table}
#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkdataframe}
#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes}
#' @export
#' @examples
#'\dontrun{
Expand Down
16 changes: 8 additions & 8 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ connExists <- function(env) {
})
}

#' @rdname sparkR.session.stop
#' @name sparkR.stop
#' @export
#' @note sparkR.stop since 1.4.0
sparkR.stop <- function() {
sparkR.session.stop()
}

#' Stop the Spark Session and Spark Context
#'
#' Stop the Spark Session and Spark Context.
Expand Down Expand Up @@ -90,6 +82,14 @@ sparkR.session.stop <- function() {
clearJobjs()
}

#' @rdname sparkR.session.stop
#' @name sparkR.stop
#' @export
#' @note sparkR.stop since 1.4.0
sparkR.stop <- function() {
sparkR.session.stop()
}

#' (Deprecated) Initialize a new Spark Context
#'
#' This function initializes a new SparkContext.
Expand Down
107 changes: 53 additions & 54 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,59 @@ head(ldf, 3)
Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to
that key. The groups are chosen from `SparkDataFrame`s column(s).
The output of function should be a `data.frame`. Schema specifies the row format of the resulting
`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below is the data type mapping between R
and Spark.
`SparkDataFrame`. It must represent R function's output schema on the basis of Spark [data types](#data-type-mapping-between-r-and-spark). The column names of the returned `data.frame` are set by user.

<div data-lang="r" markdown="1">
{% highlight r %}

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
{% endhighlight %}
</div>

##### gapplyCollect
Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

<div data-lang="r" markdown="1">
{% highlight r %}

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900

{% endhighlight %}
</div>

#### Data type mapping between R and Spark
<table class="table">
Expand Down Expand Up @@ -394,58 +445,6 @@ and Spark.
</tr>
</table>

<div data-lang="r" markdown="1">
{% highlight r %}

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
{% endhighlight %}
</div>

##### gapplyCollect
Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

<div data-lang="r" markdown="1">
{% highlight r %}

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900

{% endhighlight %}
</div>

#### Run local R functions distributed using `spark.lapply`

##### spark.lapply
Expand Down