diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index ed304c2a481b..ffb6dbd702cc 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1352,9 +1352,10 @@ setMethod("where",
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
-#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
+#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
#' @param joinType The type of join to perform. The following join types are available:
-#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
+#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
+#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
#' @return A DataFrame containing the result of the join operation.
#' @rdname join
#' @name join
@@ -1379,11 +1380,15 @@ setMethod("join",
if (is.null(joinType)) {
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
} else {
- if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
+ if (joinType %in% c("inner", "outer", "full", "fullouter",
+ "leftouter", "left_outer", "left",
+ "rightouter", "right_outer", "right", "leftsemi")) {
+ joinType <- gsub("_", "", joinType)
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
} else {
stop("joinType must be one of the following types: ",
- "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
+ "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
+ 'rightouter', 'right_outer', 'right', 'leftsemi'")
}
}
}
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index aedc385c4ff4..63513d9f380c 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -952,7 +952,7 @@ test_that("join() and merge() on a DataFrame", {
expect_equal(names(joined2), c("age", "name", "name", "test"))
expect_equal(count(joined2), 3)
- joined3 <- join(df, df2, df$name == df2$name, "right_outer")
+ joined3 <- join(df, df2, df$name == df2$name, "rightouter")
expect_equal(names(joined3), c("age", "name", "name", "test"))
expect_equal(count(joined3), 4)
expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
@@ -963,11 +963,34 @@ test_that("join() and merge() on a DataFrame", {
expect_equal(count(joined4), 4)
expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
+ joined5 <- join(df, df2, df$name == df2$name, "leftouter")
+ expect_equal(names(joined5), c("age", "name", "name", "test"))
+ expect_equal(count(joined5), 3)
+ expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))
+
+ joined6 <- join(df, df2, df$name == df2$name, "inner")
+ expect_equal(names(joined6), c("age", "name", "name", "test"))
+ expect_equal(count(joined6), 3)
+
+ joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
+ expect_equal(names(joined7), c("age", "name"))
+ expect_equal(count(joined7), 3)
+
+ joined8 <- join(df, df2, df$name == df2$name, "left_outer")
+ expect_equal(names(joined8), c("age", "name", "name", "test"))
+ expect_equal(count(joined8), 3)
+ expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))
+
+ joined9 <- join(df, df2, df$name == df2$name, "right_outer")
+ expect_equal(names(joined9), c("age", "name", "name", "test"))
+ expect_equal(count(joined9), 4)
+ expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))
+
merged <- select(merge(df, df2, df$name == df2$name, "outer"),
alias(df$age + 5, "newAge"), df$name, df2$test)
expect_equal(names(merged), c("newAge", "name", "test"))
expect_equal(count(merged), 4)
- expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24)
+ expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24)
})
test_that("toJSON() returns an RDD of the correct values", {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 21dc8f0b6548..68a9f912a5d2 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph
private[spark] object UIUtils extends Logging {
val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed"
val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
+ val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable"
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 01cddda4c62c..1a29b0f41260 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -62,7 +62,7 @@ private[ui] class ExecutorsPage(
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val execTable =
-
+
| Executor ID |
Address |
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index d5cdbfac104f..be144f6065ba 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -50,7 +50,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
hasBytesSpilled = data.hasBytesSpilled
})
-
+
| Executor ID |
Address |
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 831331222671..2dd2c036c930 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1751,6 +1751,13 @@ private[spark] object Utils extends Logging {
if (uri.getScheme() != null) {
return uri
}
+ // make sure to handle if the path has a fragment (applies to yarn
+ // distributed cache)
+ if (uri.getFragment() != null) {
+ val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
+ return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
+ uri.getFragment())
+ }
} catch {
case e: URISyntaxException =>
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1fb81ad565b4..68b0da76bc13 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -384,7 +384,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
assertResolves("spark.jar", s"file:$cwd/spark.jar")
- assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar")
+ assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt")
if (Utils.isWindows) {
assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt")
@@ -414,10 +414,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6",
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6")
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
if (Utils.isWindows) {
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
}
}
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index 9dac43ce5442..cb79e9eba06e 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -70,7 +70,7 @@ GIT_REF=${GIT_REF:-master}
# Destination directory parent on remote server
REMOTE_PARENT_DIR=${REMOTE_PARENT_DIR:-/home/$ASF_USERNAME/public_html}
-SSH="ssh -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
+SSH="ssh -o ConnectTimeout=300 -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
GPG="gpg --no-tty --batch"
NEXUS_ROOT=https://repository.apache.org/service/local/staging
NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
@@ -141,8 +141,12 @@ if [[ "$1" == "package" ]]; then
export ZINC_PORT=$ZINC_PORT
echo "Creating distribution: $NAME ($FLAGS)"
- ./make-distribution.sh --name $NAME --tgz $FLAGS -DzincPort=$ZINC_PORT 2>&1 > \
- ../binary-release-$NAME.log
+
+ # Get maven home set by MVN
+ MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'`
+
+ ./make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \
+ -DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
cd ..
cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz .
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4db32cfd628b..de9729f0c00a 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -205,6 +205,11 @@ can be set to control the SBT build. For example:
build/sbt -Pyarn -Phadoop-2.3 assembly
+To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt
+in interactive mode by running `build/sbt`, and then run all build commands at the command
+prompt. For more recommendations on reducing build time, refer to the
+[wiki page](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-ReducingBuildTimes).
+
# Testing with SBT
Some of the tests require Spark to be packaged first, so always run `build/sbt assembly` the first time. The following is an example of a correct (build, test) sequence:
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index a302ed8ec243..e4500a0bc831 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -633,7 +633,7 @@ def update(rdd):
self._model = LogisticRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
- regParam=self.regParam, convergenceTol=self.convergenceTol)
+ regParam=self.regParam)
dstream.foreachRDD(update)
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index 30d05d0ec360..2db2dec275bd 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -732,6 +732,9 @@ def __getitem__(self, index):
raise ValueError("Index %d out of bounds." % index)
insert_index = np.searchsorted(inds, index)
+ if insert_index >= inds.size:
+ return 0.
+
row_ind = inds[insert_index]
if row_ind == index:
return vals[insert_index]
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 562fb3615804..6bbac026507c 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -669,7 +669,7 @@ def update(rdd):
self._model = LinearRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
- intercept=self._model.intercept, convergenceTol=self.convergenceTol)
+ intercept=self._model.intercept)
dstream.foreachRDD(update)
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 5097c5e8ba4c..4d471443d192 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -205,15 +205,17 @@ def test_conversion(self):
self.assertTrue(dv.array.dtype == 'float64')
def test_sparse_vector_indexing(self):
- sv = SparseVector(4, {1: 1, 3: 2})
- self.assertEquals(sv[0], 0.)
- self.assertEquals(sv[3], 2.)
- self.assertEquals(sv[1], 1.)
- self.assertEquals(sv[2], 0.)
- self.assertEquals(sv[-1], 2)
- self.assertEquals(sv[-2], 0)
- self.assertEquals(sv[-4], 0)
- for ind in [4, -5]:
+ sv = SparseVector(5, {1: 1, 3: 2})
+ self.assertEqual(sv[0], 0.)
+ self.assertEqual(sv[3], 2.)
+ self.assertEqual(sv[1], 1.)
+ self.assertEqual(sv[2], 0.)
+ self.assertEqual(sv[4], 0.)
+ self.assertEqual(sv[-1], 0.)
+ self.assertEqual(sv[-2], 2.)
+ self.assertEqual(sv[-3], 0.)
+ self.assertEqual(sv[-5], 0.)
+ for ind in [5, -6]:
self.assertRaises(ValueError, sv.__getitem__, ind)
for ind in [7.8, '1']:
self.assertRaises(TypeError, sv.__getitem__, ind)
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 4b74a501521a..3c631a0328de 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -30,6 +30,7 @@
from pyspark.sql import since
from pyspark.sql.types import StringType
from pyspark.sql.column import Column, _to_java_column, _to_seq
+from pyspark.sql.dataframe import DataFrame
def _create_function(name, doc=""):
@@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)
+@since(1.6)
+def broadcast(df):
+ """Marks a DataFrame as small enough for use in broadcast joins."""
+
+ sc = SparkContext._active_spark_context
+ return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)
+
+
@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6b647f3aacfa..14414b3d566f 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1056,6 +1056,33 @@ def test_with_column_with_existing_name(self):
keys = self.df.withColumn("key", self.df.key).select("key").collect()
self.assertEqual([r.key for r in keys], list(range(100)))
+ # regression test for SPARK-10417
+ def test_column_iterator(self):
+
+ def foo():
+ for x in self.df.key:
+ break
+
+ self.assertRaises(TypeError, foo)
+
+ # add test for SPARK-10577 (test broadcast join hint)
+ def test_functions_broadcast(self):
+ from pyspark.sql.functions import broadcast
+
+ df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+ df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+
+ # equijoin - should be converted into broadcast join
+ plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
+ self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
+
+ # no join key -- should not be a broadcast join
+ plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
+ self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
+
+ # planner should not crash without a join
+ broadcast(df1)._jdf.queryExecution().executedPlan()
+
class HiveContextSQLTests(ReusedPySparkTestCase):
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aacfbf9af760..47db44880f53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -825,6 +825,10 @@ class Analyzer(
val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
extractedExprBuffer += withName
withName.toAttribute
+
+ // Extracts other attributes
+ case attr: Attribute => extractExpr(attr)
+
}.asInstanceOf[NamedExpression]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f024db7ae2cd..3fce47fa655d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1625,7 +1625,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.jdbc()", "1.4.0")
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
- val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
+ val w = if (overwrite) write.mode(SaveMode.Overwrite) else write.mode(SaveMode.Append)
w.jdbc(url, table, new Properties)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7e204ec88aa1..55035f4bc5f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -145,13 +145,10 @@ case class Window(
// Construct the ordering. This is used to compare the result of current value projection
// to the result of bound value projection. This is done manually because we want to use
// Code Generation (if it is enabled).
- val (sortExprs, schema) = exprs.map { case e =>
- // This AttributeReference does not need to have unique IDs, it's OK to be called
- // in executor.
- val ref = AttributeReference("ordExpr", e.dataType, e.nullable)()
- (SortOrder(ref, e.direction), ref)
- }.unzip
- val ordering = newOrdering(sortExprs, schema)
+ val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
+ SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
+ }
+ val ordering = newOrdering(sortExprs, Nil)
RangeBoundOrdering(ordering, current, bound)
case RowFrame => RowBoundOrdering(offset)
}
@@ -203,17 +200,19 @@ case class Window(
* This method uses Code Generation. It can only be used on the executor side.
*
* @param expressions unbound ordered function expressions.
- * @param attributes output attributes
* @return the final resulting projection.
*/
private[this] def createResultProjection(
- expressions: Seq[Expression],
- attributes: Seq[Attribute]): MutableProjection = {
- val unboundToAttrMap = expressions.zip(attributes).toMap
- val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap))
+ expressions: Seq[Expression]): MutableProjection = {
+ val references = expressions.zipWithIndex.map{ case (e, i) =>
+ // Results of window expressions will be on the right side of child's output
+ BoundReference(child.output.size + i, e.dataType, e.nullable)
+ }
+ val unboundToRefMap = expressions.zip(references).toMap
+ val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
newMutableProjection(
projectList ++ patchedWindowExpression,
- child.output ++ attributes)()
+ child.output)()
}
protected override def doExecute(): RDD[InternalRow] = {
@@ -248,17 +247,12 @@ case class Window(
factories(index) = () => createFrameProcessor(frame, functions, ordinal)
}
- // AttributeReference can only be created in driver, or the id will not be unique
- val outputAttributes = unboundExpressions.map {
- e => AttributeReference("windowResult", e.dataType, e.nullable)()
- }
-
// Start processing.
child.execute().mapPartitions { stream =>
new Iterator[InternalRow] {
// Get all relevant projections.
- val result = createResultProjection(unboundExpressions, outputAttributes)
+ val result = createResultProjection(unboundExpressions)
val grouping = if (child.outputsUnsafeRows) {
UnsafeProjection.create(partitionSpec, child.output)
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eb5bffc18f84..223a2fbfbe02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1756,4 +1756,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
df1.withColumn("diff", lit(0)))
}
}
+
+ test("SPARK-10389: order by non-attribute grouping expression on Aggregate") {
+ withTempTable("src") {
+ Seq((1, 1), (-1, 1)).toDF("key", "value").registerTempTable("src")
+ checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"),
+ Seq(Row(1), Row(1)))
+ checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY (key + 1) * 2"),
+ Seq(Row(1), Row(1)))
+ }
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index a0643cec0fb7..a4fd0c3ce970 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -55,7 +55,6 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
server.init(sqlContext.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b72249b3bf8c..19b2f24456ab 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.net.URL
import java.sql.{Date, DriverManager, SQLException, Statement}
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
@@ -431,6 +432,32 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
)
}
+
+ test("Checks Hive version via SET -v") {
+ withJdbcStatement { statement =>
+ val resultSet = statement.executeQuery("SET -v")
+
+ val conf = mutable.Map.empty[String, String]
+ while (resultSet.next()) {
+ conf += resultSet.getString(1) -> resultSet.getString(2)
+ }
+
+ assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+ }
+ }
+
+ test("Checks Hive version via SET") {
+ withJdbcStatement { statement =>
+ val resultSet = statement.executeQuery("SET")
+
+ val conf = mutable.Map.empty[String, String]
+ while (resultSet.next()) {
+ conf += resultSet.getString(1) -> resultSet.getString(2)
+ }
+
+ assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+ }
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7af84057cee3..8a6236ce018f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -648,6 +648,11 @@ private[hive] object HiveContext {
doc = "Version of the Hive metastore. Available options are " +
s"0.12.0 through $hiveExecutionVersion.")
+ val HIVE_EXECUTION_VERSION = stringConf(
+ key = "spark.sql.hive.version",
+ defaultValue = Some(hiveExecutionVersion),
+ doc = "Version of Hive used internally by Spark SQL.")
+
val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
defaultValue = Some("builtin"),
doc = s"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 1d5ee22e99e0..788118b6e308 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -29,7 +29,8 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
import org.apache.spark._
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{SQLContext, QueryTest}
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.sql.types.DecimalType
@@ -107,6 +108,16 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("SPARK-11009 fix wrong result of Window function in cluster mode") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SPARK_11009.getClass.getName.stripSuffix("$"),
+ "--name", "SparkSQLConfTest",
+ "--master", "local-cluster[2,1,1024]",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -314,3 +325,33 @@ object SPARK_9757 extends QueryTest with Logging {
}
}
}
+
+object SPARK_11009 extends QueryTest {
+ import org.apache.spark.sql.functions._
+
+ protected var sqlContext: SQLContext = _
+
+ def main(args: Array[String]): Unit = {
+ Utils.configTestLog4j("INFO")
+
+ val sparkContext = new SparkContext(
+ new SparkConf()
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.shuffle.partitions", "100"))
+
+ val hiveContext = new TestHiveContext(sparkContext)
+ sqlContext = hiveContext
+
+ try {
+ val df = sqlContext.range(1 << 20)
+ val df2 = df.select((df("id") % 1000).alias("A"), (df("id") / 1000).alias("B"))
+ val ws = Window.partitionBy(df2("A")).orderBy(df2("B"))
+ val df3 = df2.select(df2("A"), df2("B"), rowNumber().over(ws).alias("rn")).filter("rn < 0")
+ if (df3.rdd.count() != 0) {
+ throw new Exception("df3 should have 0 output row.")
+ }
+ } finally {
+ sparkContext.stop()
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 72038f70df07..c98fddb3cb47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -835,6 +835,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils {
).map(i => Row(i._1, i._2, i._3)))
}
+ test("window function: refer column in inner select block") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+ |from (select month, area, product, 1 as tmp1 from windowData) tmp
+ """.stripMargin),
+ Seq(
+ ("a", 2),
+ ("a", 3),
+ ("b", 2),
+ ("b", 3),
+ ("c", 2),
+ ("c", 3)
+ ).map(i => Row(i._1, i._2)))
+ }
+
test("window function: partition and order expressions") {
val data = Seq(
WindowData(1, "a", 5),
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 1b717b64542d..a19b85a51d28 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -443,7 +443,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = {
-
+
| Input |
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 28228c2656a5..f21f5efbfb19 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -318,7 +318,8 @@ private[spark] class Client(
destName: Option[String] = None,
targetDir: Option[String] = None,
appMasterOnly: Boolean = false): (Boolean, String) = {
- val localURI = new URI(path.trim())
+ val trimmedPath = path.trim()
+ val localURI = Utils.resolveURI(trimmedPath)
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
@@ -334,7 +335,7 @@ private[spark] class Client(
(false, null)
}
} else {
- (true, path.trim())
+ (true, trimmedPath)
}
}
@@ -555,10 +556,10 @@ private[spark] class Client(
LOCALIZED_PYTHON_DIR)
}
(pySparkArchives ++ pyArchives).foreach { path =>
- val uri = new URI(path)
+ val uri = Utils.resolveURI(path)
if (uri.getScheme != LOCAL_SCHEME) {
pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- new Path(path).getName())
+ new Path(uri).getName())
} else {
pythonPath += uri.getPath()
}
@@ -1143,7 +1144,7 @@ object Client extends Logging {
} else {
getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
}
- mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+ mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
val secondaryJars =
if (args != null) {
@@ -1152,10 +1153,10 @@ object Client extends Logging {
getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
}
secondaryJars.foreach { x =>
- addFileToClasspath(sparkConf, x, null, env)
+ addFileToClasspath(sparkConf, conf, x, null, env)
}
}
- addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1175,7 +1176,7 @@ object Client extends Logging {
private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
mainJar.flatMap { path =>
- val uri = new URI(path)
+ val uri = Utils.resolveURI(path)
if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
}.orElse(Some(new URI(APP_JAR)))
}
@@ -1190,15 +1191,17 @@ object Client extends Logging {
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
- * If not a "local:" file and no alternate name, the environment is not modified.
+ * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
*
- * @param conf Spark configuration.
- * @param uri URI to add to classpath (optional).
- * @param fileName Alternate name for the file (optional).
- * @param env Map holding the environment variables.
+ * @param conf Spark configuration.
+ * @param hadoopConf Hadoop configuration.
+ * @param uri URI to add to classpath (optional).
+ * @param fileName Alternate name for the file (optional).
+ * @param env Map holding the environment variables.
*/
private def addFileToClasspath(
conf: SparkConf,
+ hadoopConf: Configuration,
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
@@ -1207,6 +1210,11 @@ object Client extends Logging {
} else if (fileName != null) {
addClasspathEntry(buildPath(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+ } else if (uri != null) {
+ val localPath = getQualifiedLocalPath(uri, hadoopConf)
+ val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+ addClasspathEntry(buildPath(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
}
}