Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into refactor-collectl…
Browse files Browse the repository at this point in the history
…imit
  • Loading branch information
viirya committed Oct 25, 2016
2 parents 58e8383 + 483c37c commit fbf4fd6
Show file tree
Hide file tree
Showing 185 changed files with 3,515 additions and 1,501 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
importFrom("methods", "is", "new", "signature", "show")
importFrom("stats", "gaussian", "setNames")
importFrom("utils", "download.file", "packageVersion", "untar")
importFrom("utils", "download.file", "object.size", "packageVersion", "untar")

# Disable native libraries till we figure out how to package it
# See SPARKR-7839
Expand Down Expand Up @@ -71,6 +71,7 @@ exportMethods("arrange",
"covar_samp",
"covar_pop",
"createOrReplaceTempView",
"crossJoin",
"crosstab",
"dapply",
"dapplyCollect",
Expand Down
61 changes: 47 additions & 14 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ setMethod("colnames<-",

# Check if the column names have . in it
if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
stop("Colum names cannot contain the '.' symbol.")
stop("Column names cannot contain the '.' symbol.")
}

sdf <- callJMethod(x@sdf, "toDF", as.list(value))
Expand Down Expand Up @@ -2271,12 +2271,13 @@ setMethod("dropDuplicates",

#' Join
#'
#' Join two SparkDataFrames based on the given join expression.
#' Joins two SparkDataFrames based on the given join expression.
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
#' @param joinType The type of join to perform. The following join types are available:
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
Expand All @@ -2285,23 +2286,24 @@ setMethod("dropDuplicates",
#' @aliases join,SparkDataFrame,SparkDataFrame-method
#' @rdname join
#' @name join
#' @seealso \link{merge}
#' @seealso \link{merge} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' join(df1, df2) # Performs a Cartesian
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
#' join(df1, df2) # Attempts an inner join
#' }
#' @note join since 1.4.0
setMethod("join",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL) {
if (is.null(joinExpr)) {
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
# this may not fail until the planner checks for Cartesian join later on.
sdf <- callJMethod(x@sdf, "join", y@sdf)
} else {
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
if (is.null(joinType)) {
Expand All @@ -2322,22 +2324,52 @@ setMethod("join",
dataFrame(sdf)
})

#' CrossJoin
#'
#' Returns Cartesian Product on two SparkDataFrames.
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
#' @return A SparkDataFrame containing the result of the join operation.
#' @family SparkDataFrame functions
#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
#' @rdname crossJoin
#' @name crossJoin
#' @seealso \link{merge} \link{join}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' crossJoin(df1, df2) # Performs a Cartesian
#' }
#' @note crossJoin since 2.1.0
setMethod("crossJoin",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
dataFrame(sdf)
})

#' Merges two data frames
#'
#' @name merge
#' @param x the first data frame to be joined
#' @param y the second data frame to be joined
#' @param x the first data frame to be joined.
#' @param y the second data frame to be joined.
#' @param by a character vector specifying the join columns. If by is not
#' specified, the common column names in \code{x} and \code{y} will be used.
#' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
#' Product of x and y will be returned.
#' @param by.x a character vector specifying the joining columns for x.
#' @param by.y a character vector specifying the joining columns for y.
#' @param all a boolean value setting \code{all.x} and \code{all.y}
#' if any of them are unset.
#' @param all.x a boolean value indicating whether all the rows in x should
#' be including in the join
#' be including in the join.
#' @param all.y a boolean value indicating whether all the rows in y should
#' be including in the join
#' @param sort a logical argument indicating whether the resulting columns should be sorted
#' be including in the join.
#' @param sort a logical argument indicating whether the resulting columns should be sorted.
#' @param suffixes a string vector of length 2 used to make colnames of
#' \code{x} and \code{y} unique.
#' The first element is appended to each colname of \code{x}.
Expand All @@ -2351,20 +2383,21 @@ setMethod("join",
#' @family SparkDataFrame functions
#' @aliases merge,SparkDataFrame,SparkDataFrame-method
#' @rdname merge
#' @seealso \link{join}
#' @seealso \link{join} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' merge(df1, df2) # Performs a Cartesian
#' merge(df1, df2) # Performs an inner join by common columns
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
#' merge(df1, df2, by = NULL) # Performs a Cartesian join
#' }
#' @note merge since 1.5.0
setMethod("merge",
Expand Down Expand Up @@ -2401,7 +2434,7 @@ setMethod("merge",
joinY <- by
} else {
# if by or both by.x and by.y have length 0, use Cartesian Product
joinRes <- join(x, y)
joinRes <- crossJoin(x, y)
return (joinRes)
}

Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ setGeneric("createOrReplaceTempView",
standardGeneric("createOrReplaceTempView")
})

# @rdname crossJoin
# @export
setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })

#' @rdname dapply
#' @export
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
Expand Down
28 changes: 24 additions & 4 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- createDataFrame(iris)
df <- suppressWarnings(createDataFrame(iris))

# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
Expand Down Expand Up @@ -390,6 +390,19 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})

test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
df <- data.frame(
id = 1:2,
time = c(as.POSIXlt("2016-01-10"), NA),
date = c(as.Date("2016-10-01"), NA))

DF <- collect(createDataFrame(df))
expect_true(is.na(DF$date[2]))
expect_equal(DF$date[1], as.Date("2016-10-01"))
expect_true(is.na(DF$time[2]))
expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
})

test_that("create DataFrame with complex types", {
e <- new.env()
assign("n", 3L, envir = e)
Expand Down Expand Up @@ -832,7 +845,7 @@ test_that("names() colnames() set the column names", {
expect_equal(names(df)[1], "col3")

expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
"Colum names cannot contain the '.' symbol.")
"Column names cannot contain the '.' symbol.")
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
expect_error(colnames(df) <- c("a"),
"Column names must have the same length as the number of columns in the dataset.")
Expand Down Expand Up @@ -1572,7 +1585,7 @@ test_that("filter() on a DataFrame", {
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
})

test_that("join() and merge() on a DataFrame", {
test_that("join(), crossJoin() and merge() on a DataFrame", {
df <- read.json(jsonPath)

mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
Expand All @@ -1583,7 +1596,14 @@ test_that("join() and merge() on a DataFrame", {
writeLines(mockLines2, jsonPath2)
df2 <- read.json(jsonPath2)

joined <- join(df, df2)
# inner join, not cartesian join
expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
# cartesian join
expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
" INNER join between logical plans).*"))

joined <- crossJoin(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
expect_equal(count(joined), 12)
expect_equal(names(collect(joined)), c("age", "name", "name", "test"))
Expand Down
6 changes: 4 additions & 2 deletions R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ rm -f $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"

# Also run the documentation tests for CRAN
CRAN_CHECK_LOG_FILE=$FWDIR/cran-check.out
rm -f $CRAN_CHECK_LOG_FILE
Expand All @@ -37,10 +39,10 @@ NUM_CRAN_WARNING="$(grep -c WARNING$ $CRAN_CHECK_LOG_FILE)"
NUM_CRAN_ERROR="$(grep -c ERROR$ $CRAN_CHECK_LOG_FILE)"
NUM_CRAN_NOTES="$(grep -c NOTE$ $CRAN_CHECK_LOG_FILE)"

if [[ $FAILED != 0 ]]; then
if [[ $FAILED != 0 || $NUM_TEST_WARNING != 0 ]]; then
cat $LOGFILE
echo -en "\033[31m" # Red
echo "Had test failures; see logs."
echo "Had test warnings or failures; see logs."
echo -en "\033[0m" # No color
exit -1
else
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.3-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.4-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
1 change: 1 addition & 0 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.3</version>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime, offset) {
return a.value - b.value
},
editable: false,
align: 'left',
showCurrentTime: false,
min: startTime,
zoomable: false,
Expand Down Expand Up @@ -99,6 +100,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime, offset) {
return a.value - b.value;
},
editable: false,
align: 'left',
showCurrentTime: false,
min: startTime,
zoomable: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.10.3-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.10.4-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ object WriteInputFormatTestDataGenerator {

// Create test data for ArrayWritable
val data = Seq(
(1, Array()),
(1, Array.empty[Double]),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,34 @@ private[spark] object SerDe {
}

def readDate(in: DataInputStream): Date = {
Date.valueOf(readString(in))
try {
val inStr = readString(in)
if (inStr == "NA") {
null
} else {
Date.valueOf(inStr)
}
} catch {
// TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
case _: NegativeArraySizeException => null
}
}

def readTime(in: DataInputStream): Timestamp = {
val seconds = in.readDouble()
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
try {
val seconds = in.readDouble()
if (java.lang.Double.isNaN(seconds)) {
null
} else {
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
}
} catch {
// TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
case _: NegativeArraySizeException => null
}
}

def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,22 @@ object HiveCatalogMetrics extends Source {
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))

/**
* Tracks the total number of files served from the file status cache instead of discovered.
*/
val METRIC_FILE_CACHE_HITS = metricRegistry.counter(MetricRegistry.name("fileCacheHits"))

/**
* Resets the values of all metrics to zero. This is useful in tests.
*/
def reset(): Unit = {
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
}

// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
}
Loading

0 comments on commit fbf4fd6

Please sign in to comment.