diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index f8d1095a493d..234b208166b5 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -720,8 +720,9 @@ setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel
# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp}
#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp}
-#' @return \code{summary} returns a list containing \code{layers}, the label distribution, and
-#' \code{tables}, conditional probabilities given the target label.
+#' @return \code{summary} returns a list containing \code{labelCount}, \code{layers}, and
+#' \code{weights}. For \code{weights}, it is a numeric vector with length equal to
+#' the expected given the architecture (i.e., for 8-10-2 network, 100 connection weights).
#' @rdname spark.mlp
#' @export
#' @aliases summary,MultilayerPerceptronClassificationModel-method
@@ -732,7 +733,6 @@ setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel
labelCount <- callJMethod(jobj, "labelCount")
layers <- unlist(callJMethod(jobj, "layers"))
weights <- callJMethod(jobj, "weights")
- weights <- matrix(weights, nrow = length(weights))
list(labelCount = labelCount, layers = layers, weights = weights)
})
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index ac896cfbcfff..5b1404c621bd 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -369,6 +369,8 @@ test_that("spark.mlp", {
expect_equal(summary$labelCount, 3)
expect_equal(summary$layers, c(4, 5, 4, 3))
expect_equal(length(summary$weights), 64)
+ expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
+ tolerance = 1e-6)
# Test predict method
mlpTestDF <- df
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index df082e4a9274..43c8df721d5a 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
@@ -159,8 +160,7 @@ protected void serviceInit(Configuration conf) throws Exception {
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
- registeredExecutorFile =
- new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+ registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
@@ -196,7 +196,7 @@ protected void serviceInit(Configuration conf) throws Exception {
private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
- secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);
+ secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
@@ -328,37 +328,59 @@ public void setRecoveryPath(Path recoveryPath) {
}
/**
- * Get the recovery path, this will override the default one to get our own maintained
- * recovery path.
+ * Get the path specific to this auxiliary service to use for recovery.
+ */
+ protected Path getRecoveryPath(String fileName) {
+ return _recoveryPath;
+ }
+
+ /**
+ * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
+ * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
+ * it will uses a YARN local dir.
*/
- protected Path getRecoveryPath() {
+ protected File initRecoveryDb(String dbFileName) {
+ if (_recoveryPath != null) {
+ File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
+ if (recoveryFile.exists()) {
+ return recoveryFile;
+ }
+ }
+ // db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
- File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+ File f = new File(new Path(dir).toUri().getPath(), dbFileName);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
+ return f;
} else {
- // If NM recovery is enabled and the recovery file exists in old NM local dirs, which
- // means old version of Spark already generated the recovery file, we should copy the
- // old file in to a new recovery path for the compatibility.
- if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
- // Fail to move recovery file to new path
- logger.error("Failed to move recovery file {} to the path {}",
- RECOVERY_FILE_NAME, _recoveryPath.toString());
+ // If the recovery path is set then either NM recovery is enabled or another recovery
+ // DB has been initialized. If NM recovery is enabled and had set the recovery path
+ // make sure to move all DBs to the recovery path from the old NM local dirs.
+ // If another DB was initialized first just make sure all the DBs are in the same
+ // location.
+ File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
+ if (!newLoc.equals(f)) {
+ try {
+ Files.move(f.toPath(), newLoc.toPath());
+ } catch (Exception e) {
+ // Fail to move recovery file to new path, just continue on with new DB location
+ logger.error("Failed to move recovery file {} to the path {}",
+ dbFileName, _recoveryPath.toString(), e);
+ }
}
+ return newLoc;
}
- break;
}
}
-
if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}
- return _recoveryPath;
+ return new File(_recoveryPath.toUri().getPath(), dbFileName);
}
/**
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 98c3abe93b55..93dfbc0e6ed6 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -55,14 +55,16 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
- val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
- for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
- return r.partitioner.get
- }
- if (rdd.context.conf.contains("spark.default.parallelism")) {
- new HashPartitioner(rdd.context.defaultParallelism)
+ val rdds = (Seq(rdd) ++ others)
+ val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
+ if (hasPartitioner.nonEmpty) {
+ hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
- new HashPartitioner(bySize.head.partitions.length)
+ if (rdd.context.conf.contains("spark.default.parallelism")) {
+ new HashPartitioner(rdd.context.defaultParallelism)
+ } else {
+ new HashPartitioner(rdds.map(_.partitions.length).max)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 7d6a8805bc01..068f4ed8ad74 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -83,7 +83,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
+ throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
@@ -530,7 +530,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
+ throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
@@ -784,7 +784,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("Default partitioner cannot partition array keys.")
+ throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
@@ -802,7 +802,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("Default partitioner cannot partition array keys.")
+ throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
@@ -817,7 +817,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("Default partitioner cannot partition array keys.")
+ throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 8171dcc04637..ad1fddbde7b0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.ForkJoinTaskSupport
+import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
@@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
}
}
+object UnionRDD {
+ private[spark] lazy val partitionEvalTaskSupport =
+ new ForkJoinTaskSupport(new ForkJoinPool(8))
+}
+
@DeveloperApi
class UnionRDD[T: ClassTag](
sc: SparkContext,
@@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
- @transient private lazy val partitionEvalTaskSupport =
- new ForkJoinTaskSupport(new ForkJoinPool(8))
-
override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
- parArray.tasksupport = partitionEvalTaskSupport
+ parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
deleted file mode 100644
index f6e46ae9a481..000000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import org.apache.spark.SparkException
-
-private[spark]
-case class BlockFetchException(messages: String, throwable: Throwable)
- extends SparkException(messages, throwable)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0614646771bd..a724fdf00978 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
- if (!tryToReportBlockStatus(blockId, info, status)) {
+ if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
+ * NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
- val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
+ val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
- if (info.tellMaster) {
- val storageLevel = status.storageLevel
- val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val onDiskSize = status.diskSize
- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
- } else {
- true
- }
+ val storageLevel = status.storageLevel
+ val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val onDiskSize = status.diskSize
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}
/**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
+ BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -565,8 +559,9 @@ private[spark] class BlockManager(
// Give up trying anymore locations. Either we've tried all of the original locations,
// or we've refreshed the list of locations from the master, and have still
// hit failures after trying locations from the refreshed list.
- throw new BlockFetchException(s"Failed to fetch block after" +
- s" ${totalFailureCount} fetch failures. Most recent failure cause:", e)
+ logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
+ s"Most recent failure cause:", e)
+ return None
}
logWarning(s"Failed to fetch remote block $blockId " +
@@ -807,12 +802,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@@ -961,15 +954,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // tell the master about it.
+ // Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
@@ -1271,12 +1261,10 @@ private[spark] class BlockManager(
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
- reportBlockStatus(blockId, info, status, droppedMemorySize)
+ reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
- }
+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
@@ -1316,21 +1304,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or external block store")
- }
- blockInfoManager.removeBlock(blockId)
- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info, removeBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
- }
+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
+ }
+
+ /**
+ * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
+ * lock on the block.
+ */
+ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+ }
+ blockInfoManager.removeBlock(blockId)
+ if (tellMaster) {
+ reportBlockStatus(blockId, BlockStatus.empty)
+ }
+ }
+
+ private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index d220ab51d115..1a3bf2bb672c 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -663,31 +663,43 @@ private[spark] class MemoryStore(
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[T],
+ private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T] {
- private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[T] = {
- val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
- unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- })
- completionIterator ++ rest
+ private def releaseUnrollMemory(): Unit = {
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
+ // SPARK-17503: Garbage collects the unrolling memory before the life end of
+ // PartiallyUnrolledIterator.
+ unrolled = null
}
- override def hasNext: Boolean = iter.hasNext
- override def next(): T = iter.next()
+ override def hasNext: Boolean = {
+ if (unrolled == null) {
+ rest.hasNext
+ } else if (!unrolled.hasNext) {
+ releaseUnrollMemory()
+ rest.hasNext
+ } else {
+ true
+ }
+ }
+
+ override def next(): T = {
+ if (unrolled == null) {
+ rest.next()
+ } else {
+ unrolled.next()
+ }
+ }
/**
* Called to dispose of this iterator and free its memory.
*/
def close(): Unit = {
- if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- unrolledIteratorIsConsumed = true
+ if (unrolled != null) {
+ releaseUnrollMemory()
}
- iter = null
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 66b88129ee41..74bca9931acf 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable.{HashMap, LinkedHashMap}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
-import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
@@ -145,7 +144,6 @@ private[spark] object UIData {
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
- updatedBlockStatuses = m.updatedBlockStatuses.toList,
inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
outputMetrics =
OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
@@ -193,7 +191,6 @@ private[spark] object UIData {
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
peakExecutionMemory: Long,
- updatedBlockStatuses: Seq[(BlockId, BlockStatus)],
inputMetrics: InputMetricsUIData,
outputMetrics: OutputMetricsUIData,
shuffleReadMetrics: ShuffleReadMetricsUIData,
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 84ca750e1a96..0e330879d50f 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDInfo, StorageLevel}
/**
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
- def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
+ def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
@@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+ var rootNodeCount = 0
+ val addRDDIds = new mutable.HashSet[Int]()
+ val dropRDDIds = new mutable.HashSet[Int]()
+
// Find nodes, edges, and operation scopes that belong to this stage
- stage.rddInfos.foreach { rdd =>
- edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
+ stage.rddInfos.sortBy(_.id).foreach { rdd =>
+ val parentIds = rdd.parentIds
+ val isAllowed =
+ if (parentIds.isEmpty) {
+ rootNodeCount += 1
+ rootNodeCount <= retainedNodes
+ } else {
+ parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
+ }
+
+ if (isAllowed) {
+ addRDDIds += rdd.id
+ edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
+ } else {
+ dropRDDIds += rdd.id
+ }
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
-
if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
- rootCluster.attachChildNode(node)
+ if (isAllowed) {
+ rootCluster.attachChildNode(node)
+ }
} else {
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
@@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
rootCluster.attachChildCluster(cluster)
}
}
- rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ if (isAllowed) {
+ rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index bcae56e2f114..37a12a864693 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
+ // How many root nodes to retain in DAG Graph
+ private[ui] val retainedNodes =
+ conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)
+
// How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
@@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
- stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
trimStagesIfNecessary()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 87c8628ce97e..fdf28b7dcbcf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -513,10 +513,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store3.stop()
store3 = null
- // exception throw because there is no locations
- intercept[BlockFetchException] {
- store.getRemoteBytes("list1")
- }
+ // Should return None instead of throwing an exception:
+ assert(store.getRemoteBytes("list1").isEmpty)
}
test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
@@ -1186,9 +1184,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))
store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true)
- intercept[BlockFetchException] {
- store.getRemoteBytes("item")
- }
+ assert(store.getRemoteBytes("item").isEmpty)
}
test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") {
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala
new file mode 100644
index 000000000000..02c2331dc394
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.memory.MemoryMode.ON_HEAP
+import org.apache.spark.storage.memory.{MemoryStore, PartiallyUnrolledIterator}
+
+class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar {
+ test("join two iterators") {
+ val unrollSize = 1000
+ val unroll = (0 until unrollSize).iterator
+ val restSize = 500
+ val rest = (unrollSize until restSize + unrollSize).iterator
+
+ val memoryStore = mock[MemoryStore]
+ val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest)
+
+ // Firstly iterate over unrolling memory iterator
+ (0 until unrollSize).foreach { value =>
+ assert(joinIterator.hasNext)
+ assert(joinIterator.hasNext)
+ assert(joinIterator.next() == value)
+ }
+
+ joinIterator.hasNext
+ joinIterator.hasNext
+ verify(memoryStore, times(1))
+ .releaseUnrollMemoryForThisTask(Matchers.eq(ON_HEAP), Matchers.eq(unrollSize.toLong))
+
+ // Secondly, iterate over rest iterator
+ (unrollSize until unrollSize + restSize).foreach { value =>
+ assert(joinIterator.hasNext)
+ assert(joinIterator.hasNext)
+ assert(joinIterator.next() == value)
+ }
+
+ joinIterator.close()
+ // MemoryMode.releaseUnrollMemoryForThisTask is called only once
+ verifyNoMoreInteractions(memoryStore)
+ }
+}
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index c0e4f3b35afa..5392b4a9bcf4 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -2072,7 +2072,7 @@ unifiedStream.pprint()
-Another parameter that should be considered is the receiver's blocking interval,
+Another parameter that should be considered is the receiver's block interval,
which is determined by the [configuration parameter](configuration.html#spark-streaming)
`spark.streaming.blockInterval`. For most receivers, the received data is coalesced together into
blocks of data before storing inside Spark's memory. The number of blocks in each batch
diff --git a/examples/src/main/python/ml/quantile_discretizer_example.py b/examples/src/main/python/ml/quantile_discretizer_example.py
index 788a0baffebb..0fc1d1949a77 100644
--- a/examples/src/main/python/ml/quantile_discretizer_example.py
+++ b/examples/src/main/python/ml/quantile_discretizer_example.py
@@ -29,7 +29,7 @@
.getOrCreate()
# $example on$
- data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
+ data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
# $example off$
diff --git a/examples/src/main/python/ml/vector_slicer_example.py b/examples/src/main/python/ml/vector_slicer_example.py
index d2f46b190f9a..68c8cfe27e37 100644
--- a/examples/src/main/python/ml/vector_slicer_example.py
+++ b/examples/src/main/python/ml/vector_slicer_example.py
@@ -32,8 +32,8 @@
# $example on$
df = spark.createDataFrame([
- Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3}),),
- Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]),)])
+ Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
+ Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py
index 9b2a2c4e6a16..98b48908b5a1 100644
--- a/examples/src/main/python/sql/hive.py
+++ b/examples/src/main/python/sql/hive.py
@@ -79,7 +79,7 @@
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
- recordsDF = spark.createDataFrame(map(lambda i: Record(i, "val_" + str(i)), range(1, 101)))
+ recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
diff --git a/graphx/pom.xml b/graphx/pom.xml
index bd4e53371b86..10d5ba93ebb8 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -46,6 +46,11 @@
test-jar
test
+
+ org.apache.spark
+ spark-mllib-local_${scala.binary.version}
+ ${project.version}
+
org.apache.xbean
xbean-asm5-shaded
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 868658dfe55e..90907300be97 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -20,9 +20,10 @@ package org.apache.spark.graphx
import scala.reflect.ClassTag
import scala.util.Random
-import org.apache.spark.SparkException
import org.apache.spark.graphx.lib._
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -391,6 +392,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
}
+ /**
+ * Run parallel personalized PageRank for a given array of source vertices, such
+ * that all random walks are started relative to the source vertices
+ */
+ def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int,
+ resetProb: Double = 0.15) : Graph[Vector, Double] = {
+ PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources)
+ }
+
/**
* Run Personalized PageRank for a fixed number of iterations with
* with all iterations originating at the source node
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 2f5bd4ed4ff6..f4b00757a8b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -19,8 +19,11 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
+import breeze.linalg.{Vector => BV}
+
import org.apache.spark.graphx._
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.linalg.{Vector, Vectors}
/**
* PageRank algorithm implementation. There are two implementations of PageRank implemented.
@@ -162,6 +165,84 @@ object PageRank extends Logging {
rankGraph
}
+ /**
+ * Run Personalized PageRank for a fixed number of iterations, for a
+ * set of starting nodes in parallel. Returns a graph with vertex attributes
+ * containing the pagerank relative to all starting nodes (as a sparse vector) and
+ * edge attributes the normalized edge weight
+ *
+ * @tparam VD The original vertex attribute (not used)
+ * @tparam ED The original edge attribute (not used)
+ *
+ * @param graph The graph on which to compute personalized pagerank
+ * @param numIter The number of iterations to run
+ * @param resetProb The random reset probability
+ * @param sources The list of sources to compute personalized pagerank from
+ * @return the graph with vertex attributes
+ * containing the pagerank relative to all starting nodes (as a sparse vector) and
+ * edge attributes the normalized edge weight
+ */
+ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
+ numIter: Int, resetProb: Double = 0.15,
+ sources: Array[VertexId]): Graph[Vector, Double] = {
+ // TODO if one sources vertex id is outside of the int range
+ // we won't be able to store its activations in a sparse vector
+ val zero = Vectors.sparse(sources.size, List()).asBreeze
+ val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
+ val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
+ (vid, v)
+ }.toMap
+ val sc = graph.vertices.sparkContext
+ val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
+ // Initialize the PageRank graph with each edge attribute having
+ // weight 1/outDegree and each source vertex with attribute 1.0.
+ var rankGraph = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
+ // Set the weight on the edges based on the degree
+ .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
+ .mapVertices { (vid, attr) =>
+ if (sourcesInitMapBC.value contains vid) {
+ sourcesInitMapBC.value(vid)
+ } else {
+ zero
+ }
+ }
+
+ var i = 0
+ while (i < numIter) {
+ val prevRankGraph = rankGraph
+ // Propagates the message along outbound edges
+ // and adding start nodes back in with activation resetProb
+ val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
+ ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
+ (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
+
+ rankGraph = rankGraph.joinVertices(rankUpdates) {
+ (vid, oldRank, msgSum) =>
+ val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
+ val resetActivations = if (sourcesInitMapBC.value contains vid) {
+ sourcesInitMapBC.value(vid)
+ } else {
+ zero
+ }
+ popActivations :+ resetActivations
+ }.cache()
+
+ rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
+ prevRankGraph.vertices.unpersist(false)
+ prevRankGraph.edges.unpersist(false)
+
+ logInfo(s"Parallel Personalized PageRank finished iteration $i.")
+
+ i += 1
+ }
+
+ rankGraph.mapVertices { (vid, attr) =>
+ Vectors.fromBreeze(attr)
+ }
+ }
+
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index bdff31446f8e..b6305c8d00ab 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -118,11 +118,29 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ val parallelStaticRanks1 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
+
+ val parallelStaticRanks2 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
+
// We have one outbound edge from 1 to 0
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
.vertices.cache()
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
+ val otherParallelStaticRanks2 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
+ case (vertexId, vector) => vector(1)
+ }.vertices.cache()
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
+ assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
+ assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
}
} // end of test Star PersonalPageRank
@@ -177,6 +195,12 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+ val parallelStaticRanks = chain
+ .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 6c46be719674..b04e82838e71 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -69,7 +69,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
/**
* Param for the number of steps for the k-means|| initialization mode. This is an advanced
- * setting -- the default of 5 is almost always enough. Must be > 0. Default: 5.
+ * setting -- the default of 2 is almost always enough. Must be > 0. Default: 2.
* @group expertParam
*/
@Since("1.5.0")
@@ -262,7 +262,7 @@ class KMeans @Since("1.5.0") (
k -> 2,
maxIter -> 20,
initMode -> MLlibKMeans.K_MEANS_PARALLEL,
- initSteps -> 5,
+ initSteps -> 2,
tol -> 1e-4)
@Since("1.5.0")
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 67d037ed6e02..bd965acf5694 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -99,6 +99,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val aft = new AFTSurvivalRegression()
.setCensorCol(censorCol)
.setFitIntercept(rFormula.hasIntercept)
+ .setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, aft))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index b654233a8936..b70870295982 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -85,6 +85,7 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
.setK(k)
.setMaxIter(maxIter)
.setTol(tol)
+ .setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, gm))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 35313258f940..b1bb577e1ffe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -89,6 +89,7 @@ private[r] object GeneralizedLinearRegressionWrapper
.setMaxIter(maxIter)
.setWeightCol(weightCol)
.setRegParam(regParam)
+ .setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glr))
.fit(data)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index 2ed7d7b770cc..48632316f395 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -75,6 +75,7 @@ private[r] object IsotonicRegressionWrapper
.setIsotonic(isotonic)
.setFeatureIndex(featureIndex)
.setWeightCol(weightCol)
+ .setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, isotonicRegression))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 8616a8c01e5a..ea9458525aa3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -86,6 +86,7 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
.setK(k)
.setMaxIter(maxIter)
.setInitMode(initMode)
+ .setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, kMeans))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index f2cb24b96404..d1a39fea76ef 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -73,6 +73,7 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val naiveBayes = new NaiveBayes()
.setSmoothing(smoothing)
.setModelType("bernoulli")
+ .setFeaturesCol(rFormula.getFeaturesCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
index 6a435992e3b3..379007c4d948 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
@@ -19,14 +19,15 @@ package org.apache.spark.ml.r
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.Dataset
object RWrapperUtils extends Logging {
/**
* DataFrame column check.
- * When loading data, default columns "features" and "label" will be added. And these two names
- * would conflict with RFormula default feature and label column names.
+ * When loading libsvm data, default columns "features" and "label" will be added.
+ * And "features" would conflict with RFormula default feature column names.
* Here is to change the column name to avoid "column already exists" error.
*
* @param rFormula RFormula instance
@@ -34,38 +35,11 @@ object RWrapperUtils extends Logging {
* @return Unit
*/
def checkDataColumns(rFormula: RFormula, data: Dataset[_]): Unit = {
- if (data.schema.fieldNames.contains(rFormula.getLabelCol)) {
- val newLabelName = convertToUniqueName(rFormula.getLabelCol, data.schema.fieldNames)
- logWarning(
- s"data containing ${rFormula.getLabelCol} column, using new name $newLabelName instead")
- rFormula.setLabelCol(newLabelName)
- }
-
if (data.schema.fieldNames.contains(rFormula.getFeaturesCol)) {
- val newFeaturesName = convertToUniqueName(rFormula.getFeaturesCol, data.schema.fieldNames)
+ val newFeaturesName = s"${Identifiable.randomUID(rFormula.getFeaturesCol)}"
logWarning(s"data containing ${rFormula.getFeaturesCol} column, " +
s"using new name $newFeaturesName instead")
rFormula.setFeaturesCol(newFeaturesName)
}
}
-
- /**
- * Convert conflicting name to be an unique name.
- * Appending a sequence number, like originalName_output1
- * and incrementing until it is not already there
- *
- * @param originalName Original name
- * @param fieldNames Array of field names in existing schema
- * @return String
- */
- def convertToUniqueName(originalName: String, fieldNames: Array[String]): String = {
- var counter = 1
- var newName = originalName + "_output"
-
- while (fieldNames.contains(newName)) {
- newName = originalName + "_output" + counter
- counter += 1
- }
- newName
- }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index de9fa4aebf48..23141aaf42b4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -51,10 +51,10 @@ class KMeans private (
/**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,
- * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}.
+ * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}.
*/
@Since("0.8.0")
- def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong())
+ def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong())
/**
* Number of clusters to create (k).
@@ -134,7 +134,7 @@ class KMeans private (
/**
* Set the number of steps for the k-means|| initialization mode. This is an advanced
- * setting -- the default of 5 is almost always enough. Default: 5.
+ * setting -- the default of 2 is almost always enough. Default: 2.
*/
@Since("0.8.0")
def setInitializationSteps(initializationSteps: Int): this.type = {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 88f31a1cd26f..c9ba5a288aad 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -45,7 +45,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
assert(kmeans.getPredictionCol === "prediction")
assert(kmeans.getMaxIter === 20)
assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL)
- assert(kmeans.getInitSteps === 5)
+ assert(kmeans.getInitSteps === 2)
assert(kmeans.getTol === 1e-4)
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
index ddc24cb3a648..27b03918d951 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
@@ -35,22 +35,14 @@ class RWrapperUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
// after checking, model build is ok
RWrapperUtils.checkDataColumns(rFormula, data)
- assert(rFormula.getLabelCol == "label_output")
- assert(rFormula.getFeaturesCol == "features_output")
+ assert(rFormula.getLabelCol == "label")
+ assert(rFormula.getFeaturesCol.startsWith("features_"))
val model = rFormula.fit(data)
assert(model.isInstanceOf[RFormulaModel])
- assert(model.getLabelCol == "label_output")
- assert(model.getFeaturesCol == "features_output")
- }
-
- test("generate unique name by appending a sequence number") {
- val originalName = "label"
- val fieldNames = Array("label_output", "label_output1", "label_output2")
- val newName = RWrapperUtils.convertToUniqueName(originalName, fieldNames)
-
- assert(newName === "label_output3")
+ assert(model.getLabelCol == "label")
+ assert(model.getFeaturesCol.startsWith("features_"))
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index 3d81d375c716..b33b86b39a42 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -49,7 +49,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
val r1 = 1.0
val n1 = 10
val r2 = 4.0
- val n2 = 40
+ val n2 = 10
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
@@ -83,7 +83,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
val r1 = 1.0
val n1 = 10
val r2 = 4.0
- val n2 = 40
+ val n2 = 10
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
@@ -91,11 +91,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
}
val edges = similarities.flatMap { case (i, j, s) =>
- if (i != j) {
- Seq(Edge(i, j, s), Edge(j, i, s))
- } else {
- None
- }
+ Seq(Edge(i, j, s), Edge(j, i, s))
}
val graph = Graph.fromEdges(sc.parallelize(edges, 2), 0.0)
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 2a989dd4f7a1..77397eab81ed 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -88,15 +88,8 @@ object MimaBuild {
def mimaSettings(sparkHome: File, projectRef: ProjectRef) = {
val organization = "org.apache.spark"
- val previousSparkVersion = "1.6.0"
- // This check can be removed post-2.0
- val project = if (previousSparkVersion == "1.6.0" &&
- projectRef.project == "streaming-kafka-0-8"
- ) {
- "streaming-kafka"
- } else {
- projectRef.project
- }
+ val previousSparkVersion = "2.0.0"
+ val project = projectRef.project
val fullId = "spark-" + project + "_2.11"
mimaDefaultSettings ++
Seq(previousArtifact := Some(organization % fullId % previousSparkVersion),
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 16f26e7d283b..fbd78aeb20dd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,6 +37,8 @@ object MimaExcludes {
// Exclude rules for 2.1.x
lazy val v21excludes = v20excludes ++ {
Seq(
+ // [SPARK-14743] Improve delegation token handling in secure cluster
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"),
// [SPARK-16853][SQL] Fixes encoder error in DataSet typed select
@@ -51,12 +53,15 @@ object MimaExcludes {
Seq(
excludePackage("org.apache.spark.rpc"),
excludePackage("org.spark-project.jetty"),
+ excludePackage("org.spark_project.jetty"),
+ excludePackage("org.apache.spark.internal"),
excludePackage("org.apache.spark.unused"),
excludePackage("org.apache.spark.unsafe"),
excludePackage("org.apache.spark.memory"),
excludePackage("org.apache.spark.util.collection.unsafe"),
excludePackage("org.apache.spark.sql.catalyst"),
excludePackage("org.apache.spark.sql.execution"),
+ excludePackage("org.apache.spark.sql.internal"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.feature.PCAModel.this"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.StageData.this"),
ProblemFilters.exclude[MissingMethodProblem](
@@ -787,9 +792,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.parquetFile"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.applySchema")
) ++ Seq(
- // [SPARK-14743] Improve delegation token handling in secure cluster
- ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal")
- )
+ // SPARK-17096: Improve exception string reported through the StreamingQueryListener
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.stackTrace"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.this")
+ )
}
def excludes(version: String) = version match {
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d164ead4ba73..a39c93e9574f 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -353,7 +353,7 @@ object SparkBuild extends PomBuild {
val mimaProjects = allProjects.filterNot { x =>
Seq(
spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
- unsafe, tags, sketch, mllibLocal, streamingKafka010
+ unsafe, tags
).contains(x)
}
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 4dab83362a0a..7632f05c3b68 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -254,14 +254,14 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol
@keyword_only
def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
- initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
+ initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None):
"""
__init__(self, featuresCol="features", predictionCol="prediction", k=2, \
- initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
+ initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None)
"""
super(KMeans, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.KMeans", self.uid)
- self._setDefault(k=2, initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20)
+ self._setDefault(k=2, initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@@ -271,10 +271,10 @@ def _create_model(self, java_model):
@keyword_only
@since("1.5.0")
def setParams(self, featuresCol="features", predictionCol="prediction", k=2,
- initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
+ initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None):
"""
setParams(self, featuresCol="features", predictionCol="prediction", k=2, \
- initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
+ initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None)
Sets params for KMeans.
"""
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 29aa61512577..2036168e456f 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -306,7 +306,7 @@ class KMeans(object):
@classmethod
@since('0.9.0')
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||",
- seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None):
+ seed=None, initializationSteps=2, epsilon=1e-4, initialModel=None):
"""
Train a k-means clustering model.
@@ -330,9 +330,9 @@ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"
(default: None)
:param initializationSteps:
Number of steps for the k-means|| initialization mode.
- This is an advanced setting -- the default of 5 is almost
+ This is an advanced setting -- the default of 2 is almost
always enough.
- (default: 5)
+ (default: 2)
:param epsilon:
Distance threshold within which a center will be considered to
have converged. If all centers move less than this Euclidean
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fd8e9cec3e0b..769e4540720e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -376,6 +376,14 @@ def test_udf_in_generate(self):
row = df.select(explode(f(*df))).groupBy().sum().first()
self.assertEqual(row[0], 10)
+ def test_udf_with_order_by_and_limit(self):
+ from pyspark.sql.functions import udf
+ my_copy = udf(lambda x: x, IntegerType())
+ df = self.spark.range(10).orderBy("id")
+ res = df.select(df.id, my_copy(df.id).alias("copy")).limit(1)
+ res.explain(True)
+ self.assertEqual(res.collect(), [Row(id=0, copy=0)])
+
def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.spark.read.json(rdd)
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index a7a44cdde6c7..b7284487c511 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -26,5 +26,8 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
-export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:${PYTHONPATH}"
+if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
+ export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
+ export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:${PYTHONPATH}"
+ export PYSPARK_PYTHONPATH_SET=1
+fi
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 7512ace18856..fd62bd511fac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.util
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
@@ -61,7 +61,12 @@ class QuantileSummaries(
def insert(x: Double): QuantileSummaries = {
headSampled += x
if (headSampled.size >= defaultHeadSize) {
- this.withHeadBufferInserted
+ val result = this.withHeadBufferInserted
+ if (result.sampled.length >= compressThreshold) {
+ result.compress()
+ } else {
+ result
+ }
} else {
this
}
@@ -236,7 +241,7 @@ object QuantileSummaries {
if (currentSamples.isEmpty) {
return Array.empty[Stats]
}
- val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+ val res = ListBuffer.empty[Stats]
// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
var head = currentSamples.last
@@ -258,7 +263,10 @@ object QuantileSummaries {
}
res.prepend(head)
// If necessary, add the minimum element:
- res.prepend(currentSamples.head)
+ val currHead = currentSamples.head
+ if (currHead.value < head.value) {
+ res.prepend(currentSamples.head)
+ }
res.toArray
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index 89b2a22a3de4..5e90970b1bb2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -40,6 +40,20 @@ class QuantileSummariesSuite extends SparkFunSuite {
summary.compress()
}
+ /**
+ * Interleaves compression and insertions.
+ */
+ private def buildCompressSummary(
+ data: Seq[Double],
+ epsi: Double,
+ threshold: Int): QuantileSummaries = {
+ var summary = new QuantileSummaries(threshold, epsi)
+ data.foreach { x =>
+ summary = summary.insert(x).compress()
+ }
+ summary
+ }
+
private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
val approx = summary.query(quant)
// The rank of the approximation.
@@ -54,8 +68,8 @@ class QuantileSummariesSuite extends SparkFunSuite {
for {
(seq_name, data) <- Seq(increasing, decreasing, random)
- epsi <- Seq(0.1, 0.0001)
- compression <- Seq(1000, 10)
+ epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision
+ compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression
} {
test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
@@ -75,6 +89,17 @@ class QuantileSummariesSuite extends SparkFunSuite {
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}
+
+ test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " +
+ s"(interleaved)") {
+ val s = buildCompressSummary(data, epsi, compression)
+ assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+ checkQuantile(0.9999, data, s)
+ checkQuantile(0.9, data, s)
+ checkQuantile(0.5, data, s)
+ checkQuantile(0.1, data, s)
+ checkQuantile(0.001, data, s)
+ }
}
// Tests for merging procedure
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 2fa476b9cfb7..900d7c431e72 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -86,8 +86,9 @@ public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
col.getChildColumn(0).putInts(0, capacity, c.months);
col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
- Date date = (Date)row.get(fieldIdx, t);
- col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
+ col.putInts(0, capacity, row.getInt(fieldIdx));
+ } else if (t instanceof TimestampType) {
+ col.putLongs(0, capacity, row.getLong(fieldIdx));
}
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index f3afa8f938f8..62abc2a821a3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -137,6 +137,10 @@ public InternalRow copy() {
DataType dt = columns[i].dataType();
if (dt instanceof BooleanType) {
row.setBoolean(i, getBoolean(i));
+ } else if (dt instanceof ByteType) {
+ row.setByte(i, getByte(i));
+ } else if (dt instanceof ShortType) {
+ row.setShort(i, getShort(i));
} else if (dt instanceof IntegerType) {
row.setInt(i, getInt(i));
} else if (dt instanceof LongType) {
@@ -154,6 +158,8 @@ public InternalRow copy() {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (dt instanceof DateType) {
row.setInt(i, getInt(i));
+ } else if (dt instanceof TimestampType) {
+ row.setLong(i, getLong(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9597bdf34b71..6cdba406937d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}
- override val outputPartitioning: Partitioning = {
+ @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+
+ override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.flatMap { n =>
- output.find(_.name == n)
- }
- if (bucketColumns.size == spec.bucketColumnNames.size) {
- HashPartitioning(bucketColumns, numBuckets)
- } else {
- UnknownPartitioning(0)
- }
- }.getOrElse {
- UnknownPartitioning(0)
+ bucketSpec match {
+ case Some(spec) =>
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being read
+ // from the table.
+ // eg.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+ // above
+
+ def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ if (bucketColumns.size == spec.bucketColumnNames.size) {
+ val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+
+ val sortOrder = if (sortColumns.nonEmpty) {
+ // In case of bucketing, its possible to have multiple files belonging to the
+ // same bucket in a given relation. Each of these files are locally sorted
+ // but those files combined together are not globally sorted. Given that,
+ // the RDD partition will not be sorted even if the relation has sort columns set
+ // Current solution is to check if all the buckets have a single file in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+ if (singleFilePartitions) {
+ // TODO Currently Spark does not support writing columns sorting in descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
+ }
+ case _ =>
+ (UnknownPartitioning(0), Nil)
}
}
@@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))
private lazy val inputRDD: RDD[InternalRow] = {
- val selectedPartitions = relation.location.listFiles(partitionFilters)
-
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c389593b4f76..3441ccf53b45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,22 +66,22 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
- execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
- limit, order, Some(projectList), planLater(child)) :: Nil
+ limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
- execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
+ execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
- limit, order, Some(projectList), planLater(child)) :: Nil
+ limit, order, projectList, planLater(child)) :: Nil
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index a809076de541..7be5d31d4a76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -21,6 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -28,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ThreadUtils
/**
@@ -70,38 +72,47 @@ case class BroadcastExchangeExec(
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
- val beforeCollect = System.nanoTime()
- // Note that we use .executeCollect() because we don't want to convert data to Scala types
- val input: Array[InternalRow] = child.executeCollect()
- if (input.length >= 512000000) {
- throw new SparkException(
- s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+ try {
+ val beforeCollect = System.nanoTime()
+ // Note that we use .executeCollect() because we don't want to convert data to Scala types
+ val input: Array[InternalRow] = child.executeCollect()
+ if (input.length >= 512000000) {
+ throw new SparkException(
+ s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+ }
+ val beforeBuild = System.nanoTime()
+ longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+ longMetric("dataSize") += dataSize
+ if (dataSize >= (8L << 30)) {
+ throw new SparkException(
+ s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
+ }
+
+ // Construct and broadcast the relation.
+ val relation = mode.transform(input)
+ val beforeBroadcast = System.nanoTime()
+ longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+ val broadcasted = sparkContext.broadcast(relation)
+ longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+
+ // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
+ // directly without setting an execution id. We should be tolerant to it.
+ if (executionId != null) {
+ sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
+ executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
+ }
+
+ broadcasted
+ } catch {
+ case oe: OutOfMemoryError =>
+ throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " +
+ s"all worker nodes. As a workaround, you can either disable broadcast by setting " +
+ s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark driver " +
+ s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
+ .initCause(oe.getCause)
}
- val beforeBuild = System.nanoTime()
- longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
- val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
- longMetric("dataSize") += dataSize
- if (dataSize >= (8L << 30)) {
- throw new SparkException(
- s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
- }
-
- // Construct and broadcast the relation.
- val relation = mode.transform(input)
- val beforeBroadcast = System.nanoTime()
- longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
-
- val broadcasted = sparkContext.broadcast(relation)
- longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
-
- // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
- // directly without setting an execution id. We should be tolerant to it.
- if (executionId != null) {
- sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
- executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
- }
-
- broadcasted
}
}(BroadcastExchangeExec.executionContext)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 781c01609542..86a877071560 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -39,9 +39,10 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
protected override def doExecute(): RDD[InternalRow] = {
+ val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
- child.execute(), child.output, SinglePartition, serializer))
+ locallyLimited, child.output, SinglePartition, serializer))
shuffled.mapPartitionsInternal(_.take(limit))
}
}
@@ -114,11 +115,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
case class TakeOrderedAndProjectExec(
limit: Int,
sortOrder: Seq[SortOrder],
- projectList: Option[Seq[NamedExpression]],
+ projectList: Seq[NamedExpression],
child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = {
- projectList.map(_.map(_.toAttribute)).getOrElse(child.output)
+ projectList.map(_.toAttribute)
}
override def outputPartitioning: Partitioning = SinglePartition
@@ -126,8 +127,8 @@ case class TakeOrderedAndProjectExec(
override def executeCollect(): Array[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
- if (projectList.isDefined) {
- val proj = UnsafeProjection.create(projectList.get, child.output)
+ if (projectList != child.output) {
+ val proj = UnsafeProjection.create(projectList, child.output)
data.map(r => proj(r).copy())
} else {
data
@@ -148,8 +149,8 @@ case class TakeOrderedAndProjectExec(
localTopK, child.output, SinglePartition, serializer))
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
- if (projectList.isDefined) {
- val proj = UnsafeProjection.create(projectList.get, child.output)
+ if (projectList != child.output) {
+ val proj = UnsafeProjection.create(projectList, child.output)
topK.map(r => proj(r))
} else {
topK
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eac266cba55b..a2164f9ae3d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2661,4 +2661,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
data.selectExpr("`part.col1`", "`col.1`"))
}
}
+
+ test("SPARK-17515: CollectLimit.execute() should perform per-partition limits") {
+ val numRecordsRead = spark.sparkContext.longAccumulator
+ spark.range(1, 100, 1, numPartitions = 10).map { x =>
+ numRecordsRead.add(1)
+ x
+ }.limit(1).queryExecution.toRdd.count()
+ assert(numRecordsRead.value === 10)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 3217e34bd8ad..7e317a4d8026 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -59,7 +59,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
checkThatPlansAgree(
generateRandomInputData(),
input =>
- noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, None, input)),
+ noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, input.output, input)),
input =>
GlobalLimitExec(limit,
LocalLimitExec(limit,
@@ -74,7 +74,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
generateRandomInputData(),
input =>
noOpFilter(
- TakeOrderedAndProjectExec(limit, sortOrder, Some(Seq(input.output.last)), input)),
+ TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last), input)),
input =>
GlobalLimitExec(limit,
LocalLimitExec(limit,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 05f826a11b58..95672e01f554 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -43,6 +43,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
+ val path = System.getProperty("user.dir") + "/spark-warehouse"
+ Utils.deleteRecursively(new File(path))
super.afterEach()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4aa046bd91e0..3161a630af0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,11 +38,12 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
@@ -689,6 +690,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
}
+
+ test("VectorizedParquetRecordReader - partition column types") {
+ withTempPath { dir =>
+ Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
+
+ val dataTypes =
+ Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+ val constantValues =
+ Seq(
+ UTF8String.fromString("a string"),
+ true,
+ 1.toByte,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75D,
+ Decimal("1234.23456"),
+ DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+ DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+ dataTypes.zip(constantValues).foreach { case (dt, v) =>
+ val schema = StructType(StructField("pcol", dt) :: Nil)
+ val vectorizedReader = new VectorizedParquetRecordReader
+ val partitionValues = new GenericMutableRow(Array(v))
+ val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+
+ try {
+ vectorizedReader.initialize(file, null)
+ vectorizedReader.initBatch(schema, partitionValues)
+ vectorizedReader.nextKeyValue()
+ val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
+
+ // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+ // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+ val actual = row.copy().get(1, dt)
+ val expected = v
+ assert(actual == expected)
+ } finally {
+ vectorizedReader.close()
+ }
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index af282866669b..29317e288786 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -590,7 +590,9 @@ object SparkSubmitClassLoaderTest extends Logging {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")
val conf = new SparkConf()
+ val hiveWarehouseLocation = Utils.createTempDir()
conf.set("spark.ui.enabled", "false")
+ conf.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString)
val sc = new SparkContext(conf)
val hiveContext = new TestHiveContext(sc)
val df = hiveContext.createDataFrame((1 to 100).map(i => (i, i))).toDF("i", "j")
@@ -699,11 +701,13 @@ object SPARK_9757 extends QueryTest {
def main(args: Array[String]): Unit = {
Utils.configTestLog4j("INFO")
+ val hiveWarehouseLocation = Utils.createTempDir()
val sparkContext = new SparkContext(
new SparkConf()
.set("spark.sql.hive.metastore.version", "0.13.1")
.set("spark.sql.hive.metastore.jars", "maven")
- .set("spark.ui.enabled", "false"))
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString))
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05d0687fb7e4..dc4d099f0f66 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1787,6 +1787,27 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ sql(
+ """CREATE TABLE order(id INT)
+ |PARTITIONED BY (pd DATE, pt TIMESTAMP)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ sql(
+ """INSERT INTO TABLE order PARTITION(pd, pt)
+ |SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt
+ """.stripMargin)
+ val actual = sql("SELECT * FROM order")
+ val expected = sql(
+ "SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt")
+ checkAnswer(actual, expected)
+ sql("DROP TABLE order")
+ }
+ }
+
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index ca2ec9f6a5ed..3ff85176de10 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketSpecRight: Option[BucketSpec],
joinColumns: Seq[String],
shuffleLeft: Boolean,
- shuffleRight: Boolean): Unit = {
+ shuffleRight: Boolean,
+ sortLeft: Boolean = true,
+ sortRight: Boolean = true): Unit = {
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
@@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
spec.numBuckets,
spec.bucketColumnNames.head,
spec.bucketColumnNames.tail: _*)
+
+ if (spec.sortColumnNames.nonEmpty) {
+ writer.sortBy(
+ spec.sortColumnNames.head,
+ spec.sortColumnNames.tail: _*
+ )
+ } else {
+ writer
+ }
}.getOrElse(writer)
}
@@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
+ // check existence of shuffle
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
+
+ // check existence of sort
+ assert(
+ joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
+ s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+ assert(
+ joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
+ s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
}
}
}
@@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}
+ test("avoid shuffle and sort when bucket and sort columns are join keys") {
+ val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ testBucketing(
+ bucketSpec, bucketSpec, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("avoid shuffle and sort when sort columns are a super set of join keys") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("only sort one side when sort columns are different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
+ test("only sort one side when sort columns are same but their ordering is different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
test("avoid shuffle when grouping keys are equal to bucket keys") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 9a071862bbdb..c86bf7f70c98 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.stop()
}
- test("moving recovery file form NM local dir to recovery path") {
+ test("moving recovery file from NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility
// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir.
s1 = new YarnShuffleService
+ // set auth to true to test the secrets recovery
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext =
@@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
+ val secretsFile = s1.secretsFile
+ secretsFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
@@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.init(yarnConfig)
val execStateFile2 = s2.registeredExecutorFile
+ val secretsFile2 = s2.secretsFile
+
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
+ recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists())
}
+ eventually(timeout(10 seconds), interval(5 millis)) {
+ assert(!secretsFile.exists())
+ }
val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)