Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-2.1' into snappy/branch…
Browse files Browse the repository at this point in the history
…-2.1
  • Loading branch information
sumwale committed Oct 13, 2021
2 parents d4f987d + 4d2d3d4 commit 927d41e
Show file tree
Hide file tree
Showing 333 changed files with 4,355 additions and 941 deletions.
12 changes: 6 additions & 6 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/)
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)
Expand All @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.7 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

set -o pipefail
set -e
set -x

FWDIR="$(cd `dirname $0`; pwd)"
LIB_DIR="$FWDIR/lib"
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/.Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
^README\.Rmd$
^src-native$
^html$
^tests/fulltests/*

4 changes: 2 additions & 2 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Package: SparkR
Type: Package
Version: 2.1.1
Version: 2.1.3
Title: R Frontend for Apache Spark
Description: The SparkR package provides an R Frontend for Apache Spark.
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "shivaram@cs.berkeley.edu"),
person("Xiangrui", "Meng", role = "aut",
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,9 @@ setMethod("collect",
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
if (is.character(vec) && stringsAsFactors) {
vec <- as.factor(vec)
}
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# Creates a SparkR client connection object
# if one doesn't already exist
connectBackend <- function(hostname, port, timeout) {
connectBackend <- function(hostname, port, timeout, authSecret) {
if (exists(".sparkRcon", envir = .sparkREnv)) {
if (isOpen(.sparkREnv[[".sparkRCon"]])) {
cat("SparkRBackend client connection already exists\n")
Expand All @@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {

con <- socketConnection(host = hostname, port = port, server = FALSE,
blocking = TRUE, open = "wb", timeout = timeout)

doServerAuth(con, authSecret)
assign(".sparkRCon", con, envir = .sparkREnv)
con
}
Expand Down
10 changes: 7 additions & 3 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
stop(paste("Unsupported type for deserialization", type)))
}

readString <- function(con) {
stringLen <- readInt(con)
raw <- readBin(con, raw(), stringLen, endian = "big")
readStringData <- function(con, len) {
raw <- readBin(con, raw(), len, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}

readString <- function(con) {
stringLen <- readInt(con)
readStringData(con, stringLen)
}

readInt <- function(con) {
readBin(con, integer(), n = 1, endian = "big")
}
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% not found.",
message("%LOCALAPPDATA% not found. Falling back to %USERPROFILE%.")
winAppPath <- Sys.getenv("USERPROFILE", unset = NA)
}
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% and %USERPROFILE% not found.",
"Please define the environment variable",
"or restart and enter an installation path in localDir."))
} else {
Expand Down
39 changes: 34 additions & 5 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ sparkR.sparkContext <- function(
" please use the --packages commandline instead", sep = ","))
}
backendPort <- existingPort
authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
if (nchar(authSecret) == 0) {
stop("Auth secret not provided in environment.")
}
} else {
path <- tempfile(pattern = "backend_port")
submitOps <- getClientModeSparkSubmitOpts(
Expand Down Expand Up @@ -189,16 +193,27 @@ sparkR.sparkContext <- function(
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)

# Don't use readString() so that we can provide a useful
# error message if the R and Java versions are mismatched.
authSecretLen <- readInt(f)
if (length(authSecretLen) == 0 || authSecretLen == 0) {
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
}
authSecret <- readStringData(f, authSecretLen)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
length(monitorPort) == 0 || monitorPort == 0 ||
length(rLibPath) != 1) {
length(rLibPath) != 1 || length(authSecret) == 0) {
stop("JVM failed to launch")
}
assign(".monitorConn",
socketConnection(port = monitorPort, timeout = connectionTimeout),
envir = .sparkREnv)

monitorConn <- socketConnection(port = monitorPort, blocking = TRUE,
timeout = connectionTimeout, open = "wb")
doServerAuth(monitorConn, authSecret)

assign(".monitorConn", monitorConn, envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
Expand All @@ -208,7 +223,7 @@ sparkR.sparkContext <- function(

.sparkREnv$backendPort <- backendPort
tryCatch({
connectBackend("localhost", backendPort, timeout = connectionTimeout)
connectBackend("localhost", backendPort, timeout = connectionTimeout, authSecret = authSecret)
},
error = function(err) {
stop("Failed to connect JVM\n")
Expand Down Expand Up @@ -620,3 +635,17 @@ sparkCheckInstall <- function(sparkHome, master, deployMode) {
NULL
}
}

# Utility function for sending auth data over a socket and checking the server's reply.
doServerAuth <- function(con, authSecret) {
if (nchar(authSecret) == 0) {
stop("Auth secret not provided.")
}
writeString(con, authSecret)
flush(con)
reply <- readString(con)
if (reply != "ok") {
close(con)
stop("Unexpected reply from server.")
}
}
74 changes: 74 additions & 0 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

i <- 4
df <- createDataFrame(data.frame(dummy = 1:i))
expect_equal(count(df), i)

l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(columns(df), c("a", "b"))

a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

mtcarsdf <- createDataFrame(mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)

bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)

sparkR.session.stop()
})

test_that("spark.glm and predict", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

training <- suppressWarnings(createDataFrame(iris))
# gaussian family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# Gamma family
x <- runif(100, -1, 1)
y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
model <- glm(y ~ x, family = Gamma, df)
out <- capture.output(print(summary(model)))
expect_true(any(grepl("Dispersion parameter for gamma family", out)))

sparkR.session.stop()
})
4 changes: 3 additions & 1 deletion R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
port = port, open = "wb", blocking = TRUE, timeout = connectionTimeout)

SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

while (TRUE) {
ready <- socketSelect(list(inputCon))
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@ suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

outputCon <- socketConnection(
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(outputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
Expand Down Expand Up @@ -413,6 +417,12 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})

test_that("SPARK-17902: collect() with stringsAsFactors enabled", {
df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = TRUE))
expect_equal(class(iris$Species), class(df$Species))
expect_equal(iris$Species, df$Species)
})

test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
df <- data.frame(
id = 1:2,
Expand Down Expand Up @@ -2552,6 +2562,11 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
expect_identical(df1Collect, expected)

# gapply on empty grouping columns.
df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)

# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
31 changes: 29 additions & 2 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,40 @@ library(SparkR)
# Turn all warnings into errors
options("warn" = 2)

install.spark()
if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

# Setup global test environment
# Install Spark first to set SPARK_HOME
install.spark()

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)

sparkRTestMaster <- "local[1]"
sparkRTestConfig <- list()
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
sparkRTestMaster <- ""
} else {
# Disable hsperfdata on CRAN
old_java_opt <- Sys.getenv("_JAVA_OPTIONS")
Sys.setenv("_JAVA_OPTIONS" = paste("-XX:-UsePerfData", old_java_opt))
tmpDir <- tempdir()
tmpArg <- paste0("-Djava.io.tmpdir=", tmpDir)
sparkRTestConfig <- list(spark.driver.extraJavaOptions = tmpArg,
spark.executor.extraJavaOptions = tmpArg)
}

test_package("SparkR")

if (identical(Sys.getenv("NOT_CRAN"), "true")) {
# for testthat 1.0.2 later, change reporter from "summary" to default_reporter()
testthat:::run_tests("SparkR",
file.path(sparkRDir, "pkg", "tests", "fulltests"),
NULL,
"summary")
}
Loading

0 comments on commit 927d41e

Please sign in to comment.