Skip to content

Commit 247e5fa

Browse files
committed
Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
2 parents c57a954 + 8650596 commit 247e5fa

File tree

180 files changed

+5448
-1974
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

180 files changed

+5448
-1974
lines changed

R/pkg/R/client.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
4848
jars <- paste("--jars", jars)
4949
}
5050

51-
if (packages != "") {
51+
if (!identical(packages, "")) {
5252
packages <- paste("--packages", packages)
5353
}
5454

R/pkg/R/mllib.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ setClass("PipelineModel", representation(model = "jobj"))
2727
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
2828
#'
2929
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
30-
#' operators are supported, including '~' and '+'.
30+
#' operators are supported, including '~', '+', '-', and '.'.
3131
#' @param data DataFrame for training
3232
#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
3333
#' @param lambda Regularization parameter

R/pkg/inst/tests/test_client.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,7 @@ test_that("no package specified doesn't add packages flag", {
3030
expect_equal(gsub("[[:space:]]", "", args),
3131
"")
3232
})
33+
34+
test_that("multiple packages don't produce a warning", {
35+
expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
36+
})

R/pkg/inst/tests/test_mllib.R

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,16 @@ test_that("glm and predict", {
3535

3636
test_that("predictions match with native glm", {
3737
training <- createDataFrame(sqlContext, iris)
38-
model <- glm(Sepal_Width ~ Sepal_Length, data = training)
38+
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
3939
vals <- collect(select(predict(model, training), "prediction"))
40-
rVals <- predict(glm(Sepal.Width ~ Sepal.Length, data = iris), iris)
41-
expect_true(all(abs(rVals - vals) < 1e-9), rVals - vals)
40+
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
41+
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
42+
})
43+
44+
test_that("dot minus and intercept vs native glm", {
45+
training <- createDataFrame(sqlContext, iris)
46+
model <- glm(Sepal_Width ~ . - Species + 0, data = training)
47+
vals <- collect(select(predict(model, training), "prediction"))
48+
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
49+
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
4250
})

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<name>Spark Project Core</name>
3535
<url>http://spark.apache.org/</url>
3636
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.avro</groupId>
39+
<artifactId>avro-mapred</artifactId>
40+
<classifier>${avro.mapred.classifier}</classifier>
41+
</dependency>
3742
<dependency>
3843
<groupId>com.google.guava</groupId>
3944
<artifactId>guava</artifactId>

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {
5959

6060
private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
6161

62-
private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
6362
@VisibleForTesting
6463
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
65-
@VisibleForTesting
66-
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
6764

6865
private final int initialSize;
6966
private final int numPartitions;
67+
private final int pageSizeBytes;
68+
@VisibleForTesting
69+
final int maxRecordSizeBytes;
7070
private final TaskMemoryManager memoryManager;
7171
private final ShuffleMemoryManager shuffleMemoryManager;
7272
private final BlockManager blockManager;
@@ -109,7 +109,10 @@ public UnsafeShuffleExternalSorter(
109109
this.numPartitions = numPartitions;
110110
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
111111
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
112-
112+
this.pageSizeBytes = (int) Math.min(
113+
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
114+
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
115+
this.maxRecordSizeBytes = pageSizeBytes - 4;
113116
this.writeMetrics = writeMetrics;
114117
initializeForWriting();
115118
}
@@ -272,7 +275,11 @@ void spill() throws IOException {
272275
}
273276

274277
private long getMemoryUsage() {
275-
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
278+
long totalPageSize = 0;
279+
for (MemoryBlock page : allocatedPages) {
280+
totalPageSize += page.size();
281+
}
282+
return sorter.getMemoryUsage() + totalPageSize;
276283
}
277284

278285
private long freeMemory() {
@@ -346,23 +353,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
346353
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
347354
// without using the free space at the end of the current page. We should also do this for
348355
// BytesToBytesMap.
349-
if (requiredSpace > PAGE_SIZE) {
356+
if (requiredSpace > pageSizeBytes) {
350357
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
351-
PAGE_SIZE + ")");
358+
pageSizeBytes + ")");
352359
} else {
353-
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
354-
if (memoryAcquired < PAGE_SIZE) {
360+
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
361+
if (memoryAcquired < pageSizeBytes) {
355362
shuffleMemoryManager.release(memoryAcquired);
356363
spill();
357-
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
358-
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
364+
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
365+
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
359366
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
360-
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
367+
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
361368
}
362369
}
363-
currentPage = memoryManager.allocatePage(PAGE_SIZE);
370+
currentPage = memoryManager.allocatePage(pageSizeBytes);
364371
currentPagePosition = currentPage.getBaseOffset();
365-
freeSpaceInCurrentPage = PAGE_SIZE;
372+
freeSpaceInCurrentPage = pageSizeBytes;
366373
allocatedPages.add(currentPage);
367374
}
368375
}

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ public UnsafeShuffleWriter(
129129
open();
130130
}
131131

132+
@VisibleForTesting
133+
public int maxRecordSizeBytes() {
134+
return sorter.maxRecordSizeBytes;
135+
}
136+
132137
/**
133138
* This convenience method should only be called in test code.
134139
*/

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {
4141

4242
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
4343

44-
private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
45-
@VisibleForTesting
46-
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
47-
44+
private final long pageSizeBytes;
4845
private final PrefixComparator prefixComparator;
4946
private final RecordComparator recordComparator;
5047
private final int initialSize;
@@ -91,6 +88,7 @@ public UnsafeExternalSorter(
9188
this.initialSize = initialSize;
9289
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
9390
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
91+
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
9492
initializeForWriting();
9593
}
9694

@@ -147,7 +145,11 @@ public void spill() throws IOException {
147145
}
148146

149147
private long getMemoryUsage() {
150-
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
148+
long totalPageSize = 0;
149+
for (MemoryBlock page : allocatedPages) {
150+
totalPageSize += page.size();
151+
}
152+
return sorter.getMemoryUsage() + totalPageSize;
151153
}
152154

153155
@VisibleForTesting
@@ -214,23 +216,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
214216
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
215217
// without using the free space at the end of the current page. We should also do this for
216218
// BytesToBytesMap.
217-
if (requiredSpace > PAGE_SIZE) {
219+
if (requiredSpace > pageSizeBytes) {
218220
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
219-
PAGE_SIZE + ")");
221+
pageSizeBytes + ")");
220222
} else {
221-
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
222-
if (memoryAcquired < PAGE_SIZE) {
223+
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
224+
if (memoryAcquired < pageSizeBytes) {
223225
shuffleMemoryManager.release(memoryAcquired);
224226
spill();
225-
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
226-
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
227+
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
228+
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
227229
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
228-
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
230+
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
229231
}
230232
}
231-
currentPage = memoryManager.allocatePage(PAGE_SIZE);
233+
currentPage = memoryManager.allocatePage(pageSizeBytes);
232234
currentPagePosition = currentPage.getBaseOffset();
233-
freeSpaceInCurrentPage = PAGE_SIZE;
235+
freeSpaceInCurrentPage = pageSizeBytes;
234236
allocatedPages.add(currentPage);
235237
}
236238
}

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,23 @@ package org.apache.spark
2424
private[spark] trait ExecutorAllocationClient {
2525

2626
/**
27-
* Express a preference to the cluster manager for a given total number of executors.
28-
* This can result in canceling pending requests or filing additional requests.
27+
* Update the cluster manager on our scheduling needs. Three bits of information are included
28+
* to help it make decisions.
29+
* @param numExecutors The total number of executors we'd like to have. The cluster manager
30+
* shouldn't kill any running executor to reach this number, but,
31+
* if all existing executors were to die, this is the number of executors
32+
* we'd want to be allocated.
33+
* @param localityAwareTasks The number of tasks in all active stages that have a locality
34+
* preferences. This includes running, pending, and completed tasks.
35+
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
36+
* that would like to like to run on that host.
37+
* This includes running, pending, and completed tasks.
2938
* @return whether the request is acknowledged by the cluster manager.
3039
*/
31-
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
40+
private[spark] def requestTotalExecutors(
41+
numExecutors: Int,
42+
localityAwareTasks: Int,
43+
hostToLocalTaskCount: Map[String, Int]): Boolean
3244

3345
/**
3446
* Request an additional number of executors from the cluster manager.

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ private[spark] class ExecutorAllocationManager(
161161
// (2) an executor idle timeout has elapsed.
162162
@volatile private var initializing: Boolean = true
163163

164+
// Number of locality aware tasks, used for executor placement.
165+
private var localityAwareTasks = 0
166+
167+
// Host to possible task running on it, used for executor placement.
168+
private var hostToLocalTaskCount: Map[String, Int] = Map.empty
169+
164170
/**
165171
* Verify that the settings specified through the config are valid.
166172
* If not, throw an appropriate exception.
@@ -295,7 +301,7 @@ private[spark] class ExecutorAllocationManager(
295301

296302
// If the new target has not changed, avoid sending a message to the cluster manager
297303
if (numExecutorsTarget < oldNumExecutorsTarget) {
298-
client.requestTotalExecutors(numExecutorsTarget)
304+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
299305
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
300306
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
301307
}
@@ -349,7 +355,8 @@ private[spark] class ExecutorAllocationManager(
349355
return 0
350356
}
351357

352-
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
358+
val addRequestAcknowledged = testing ||
359+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
353360
if (addRequestAcknowledged) {
354361
val executorsString = "executor" + { if (delta > 1) "s" else "" }
355362
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
@@ -519,13 +526,37 @@ private[spark] class ExecutorAllocationManager(
519526
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
520527
private var numRunningTasks: Int = _
521528

529+
// stageId to tuple (the number of task with locality preferences, a map where each pair is a
530+
// node and the number of tasks that would like to be scheduled on that node) map,
531+
// maintain the executor placement hints for each stage Id used by resource framework to better
532+
// place the executors.
533+
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
534+
522535
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
523536
initializing = false
524537
val stageId = stageSubmitted.stageInfo.stageId
525538
val numTasks = stageSubmitted.stageInfo.numTasks
526539
allocationManager.synchronized {
527540
stageIdToNumTasks(stageId) = numTasks
528541
allocationManager.onSchedulerBacklogged()
542+
543+
// Compute the number of tasks requested by the stage on each host
544+
var numTasksPending = 0
545+
val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
546+
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
547+
if (!locality.isEmpty) {
548+
numTasksPending += 1
549+
locality.foreach { location =>
550+
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
551+
hostToLocalTaskCountPerStage(location.host) = count
552+
}
553+
}
554+
}
555+
stageIdToExecutorPlacementHints.put(stageId,
556+
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
557+
558+
// Update the executor placement hints
559+
updateExecutorPlacementHints()
529560
}
530561
}
531562

@@ -534,6 +565,10 @@ private[spark] class ExecutorAllocationManager(
534565
allocationManager.synchronized {
535566
stageIdToNumTasks -= stageId
536567
stageIdToTaskIndices -= stageId
568+
stageIdToExecutorPlacementHints -= stageId
569+
570+
// Update the executor placement hints
571+
updateExecutorPlacementHints()
537572

538573
// If this is the last stage with pending tasks, mark the scheduler queue as empty
539574
// This is needed in case the stage is aborted for any reason
@@ -637,6 +672,29 @@ private[spark] class ExecutorAllocationManager(
637672
def isExecutorIdle(executorId: String): Boolean = {
638673
!executorIdToTaskIds.contains(executorId)
639674
}
675+
676+
/**
677+
* Update the Executor placement hints (the number of tasks with locality preferences,
678+
* a map where each pair is a node and the number of tasks that would like to be scheduled
679+
* on that node).
680+
*
681+
* These hints are updated when stages arrive and complete, so are not up-to-date at task
682+
* granularity within stages.
683+
*/
684+
def updateExecutorPlacementHints(): Unit = {
685+
var localityAwareTasks = 0
686+
val localityToCount = new mutable.HashMap[String, Int]()
687+
stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
688+
localityAwareTasks += numTasksPending
689+
localities.foreach { case (hostname, count) =>
690+
val updatedCount = localityToCount.getOrElse(hostname, 0) + count
691+
localityToCount(hostname) = updatedCount
692+
}
693+
}
694+
695+
allocationManager.localityAwareTasks = localityAwareTasks
696+
allocationManager.hostToLocalTaskCount = localityToCount.toMap
697+
}
640698
}
641699

642700
/**

0 commit comments

Comments
 (0)