Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7098a12
Streaming doc correction.
Sep 9, 2016
a3981c2
[SPARK-17433] YarnShuffleService doesn't handle moving credentials le…
Sep 9, 2016
f7d2143
[SPARK-17354] [SQL] Partitioning by dates/timestamps should work with…
HyukjinKwon Sep 9, 2016
3354917
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` inf…
tejasapatil Sep 10, 2016
1fec3ce
[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank
Sep 10, 2016
bcdd259
[SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should suppor…
yanboliang Sep 10, 2016
6ea5055
[SPARK-17396][CORE] Share the task support between UnionRDD instances.
rdblue Sep 10, 2016
71b7d42
[SPARK-16445][MLLIB][SPARKR] Fix @return description for sparkR mlp s…
keypointt Sep 10, 2016
29ba957
[SPARK-17389][ML][MLLIB] KMeans speedup with better choice of k-means…
srowen Sep 11, 2016
180796e
[SPARK-17439][SQL] Fixing compression issues with approximate quantil…
thunterdb Sep 11, 2016
bf22217
[SPARK-17330][SPARK UT] Clean up spark-warehouse in UT
Sep 11, 2016
c76baff
[SPARK-17336][PYSPARK] Fix appending multiple times to PYTHONPATH fro…
BryanCutler Sep 11, 2016
883c763
[SPARK-17389][FOLLOW-UP][ML] Change KMeans k-means|| default init ste…
yanboliang Sep 11, 2016
767d480
[SPARK-17415][SQL] Better error message for driver-side broadcast joi…
sameeragarwal Sep 11, 2016
72eec70
[SPARK-17486] Remove unused TaskMetricsUIData.updatedBlockStatuses field
JoshRosen Sep 12, 2016
cc87280
[SPARK-17171][WEB UI] DAG will list all partitions in the graph
cenyuhai Sep 12, 2016
4efcdb7
[SPARK-17447] Performance improvement in Partitioner.defaultPartition…
codlife Sep 12, 2016
b3c2291
[SPARK-16992][PYSPARK] use map comprehension in doc
gsemet Sep 12, 2016
8087ecf
[SPARK CORE][MINOR] fix "default partitioner cannot partition array k…
WeichenXu123 Sep 12, 2016
1742c3a
[SPARK-17503][CORE] Fix memory leak in Memory store when unable to ca…
clockfly Sep 12, 2016
3d40896
[SPARK-17483] Refactoring in BlockManager status reporting and block …
JoshRosen Sep 12, 2016
7c51b99
[SPARK-14818] Post-2.0 MiMa exclusion and build changes
JoshRosen Sep 12, 2016
f9c580f
[SPARK-17485] Prevent failed remote reads of cached blocks from faili…
JoshRosen Sep 12, 2016
a91ab70
[SPARK-17474] [SQL] fix python udf in TakeOrderedAndProjectExec
Sep 12, 2016
46f5c20
[BUILD] Closing some stale PRs and ones suggested to be closed by com…
HyukjinKwon Sep 13, 2016
3f6a2bb
[SPARK-17515] CollectLimit.execute() should perform per-partition limits
JoshRosen Sep 13, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,9 @@ setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel
# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp}

#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp}
#' @return \code{summary} returns a list containing \code{layers}, the label distribution, and
#' \code{tables}, conditional probabilities given the target label.
#' @return \code{summary} returns a list containing \code{labelCount}, \code{layers}, and
#' \code{weights}. For \code{weights}, it is a numeric vector with length equal to
#' the expected given the architecture (i.e., for 8-10-2 network, 100 connection weights).
#' @rdname spark.mlp
#' @export
#' @aliases summary,MultilayerPerceptronClassificationModel-method
Expand All @@ -732,7 +733,6 @@ setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel
labelCount <- callJMethod(jobj, "labelCount")
layers <- unlist(callJMethod(jobj, "layers"))
weights <- callJMethod(jobj, "weights")
weights <- matrix(weights, nrow = length(weights))
list(labelCount = labelCount, layers = layers, weights = weights)
})

Expand Down
2 changes: 2 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ test_that("spark.mlp", {
expect_equal(summary$labelCount, 3)
expect_equal(summary$layers, c(4, 5, 4, 3))
expect_equal(length(summary$weights), 64)
expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
tolerance = 1e-6)

# Test predict method
mlpTestDF <- df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -159,8 +160,7 @@ protected void serviceInit(Configuration conf) throws Exception {
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void serviceInit(Configuration conf) throws Exception {

private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);
secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
Expand Down Expand Up @@ -328,37 +328,59 @@ public void setRecoveryPath(Path recoveryPath) {
}

/**
* Get the recovery path, this will override the default one to get our own maintained
* recovery path.
* Get the path specific to this auxiliary service to use for recovery.
*/
protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}

/**
* Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
* it will uses a YARN local dir.
*/
protected Path getRecoveryPath() {
protected File initRecoveryDb(String dbFileName) {
if (_recoveryPath != null) {
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (recoveryFile.exists()) {
return recoveryFile;
}
}
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
File f = new File(new Path(dir).toUri().getPath(), dbFileName);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
return f;
} else {
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
// means old version of Spark already generated the recovery file, we should copy the
// old file in to a new recovery path for the compatibility.
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
// Fail to move recovery file to new path
logger.error("Failed to move recovery file {} to the path {}",
RECOVERY_FILE_NAME, _recoveryPath.toString());
// If the recovery path is set then either NM recovery is enabled or another recovery
// DB has been initialized. If NM recovery is enabled and had set the recovery path
// make sure to move all DBs to the recovery path from the old NM local dirs.
// If another DB was initialized first just make sure all the DBs are in the same
// location.
File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (!newLoc.equals(f)) {
try {
Files.move(f.toPath(), newLoc.toPath());
} catch (Exception e) {
// Fail to move recovery file to new path, just continue on with new DB location
logger.error("Failed to move recovery file {} to the path {}",
dbFileName, _recoveryPath.toString(), e);
}
}
return newLoc;
}
break;
}
}

if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}

return _recoveryPath;
return new File(_recoveryPath.toUri().getPath(), dbFileName);
}

/**
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
new HashPartitioner(bySize.head.partitions.length)
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
Expand Down Expand Up @@ -530,7 +530,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
Expand Down Expand Up @@ -784,7 +784,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
Expand All @@ -802,7 +802,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
Expand All @@ -817,7 +817,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag

Expand Down Expand Up @@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
}
}

object UnionRDD {
private[spark] lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
}

@DeveloperApi
class UnionRDD[T: ClassTag](
sc: SparkContext,
Expand All @@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)

@transient private lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))

override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
parArray.tasksupport = partitionEvalTaskSupport
parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds
Expand Down

This file was deleted.

Loading