Skip to content

Commit 049f428

Browse files
authored
Merge branch 'master' into interval-mul-div
2 parents 014cde5 + 6d4cc7b commit 049f428

File tree

135 files changed

+2339
-1129
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+2339
-1129
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
5353
private final SecretKeyHolder secretKeyHolder;
5454
private final long registrationTimeoutMs;
5555

56-
protected TransportClientFactory clientFactory;
56+
protected volatile TransportClientFactory clientFactory;
5757
protected String appId;
5858

5959
/**
@@ -102,9 +102,14 @@ public void fetchBlocks(
102102
try {
103103
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
104104
(blockIds1, listener1) -> {
105-
TransportClient client = clientFactory.createClient(host, port);
106-
new OneForOneBlockFetcher(client, appId, execId,
107-
blockIds1, listener1, conf, downloadFileManager).start();
105+
// Unless this client is closed.
106+
if (clientFactory != null) {
107+
TransportClient client = clientFactory.createClient(host, port);
108+
new OneForOneBlockFetcher(client, appId, execId,
109+
blockIds1, listener1, conf, downloadFileManager).start();
110+
} else {
111+
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
112+
}
108113
};
109114

110115
int maxRetries = conf.maxIORetries();

core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,10 @@ public interface ShuffleDataIO {
4646
* are only invoked on the executors.
4747
*/
4848
ShuffleExecutorComponents executor();
49+
50+
/**
51+
* Called once on driver process to bootstrap the shuffle metadata modules that
52+
* are maintained by the driver.
53+
*/
54+
ShuffleDriverComponents driver();
4955
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.util.Map;
21+
22+
import org.apache.spark.annotation.Private;
23+
24+
/**
25+
* :: Private ::
26+
* An interface for building shuffle support modules for the Driver.
27+
*/
28+
@Private
29+
public interface ShuffleDriverComponents {
30+
31+
/**
32+
* Called once in the driver to bootstrap this module that is specific to this application.
33+
* This method is called before submitting executor requests to the cluster manager.
34+
*
35+
* This method should prepare the module with its shuffle components i.e. registering against
36+
* an external file servers or shuffle services, or creating tables in a shuffle
37+
* storage data database.
38+
*
39+
* @return additional SparkConf settings necessary for initializing the executor components.
40+
* This would include configurations that cannot be statically set on the application, like
41+
* the host:port of external services for shuffle storage.
42+
*/
43+
Map<String, String> initializeApplication();
44+
45+
/**
46+
* Called once at the end of the Spark application to clean up any existing shuffle state.
47+
*/
48+
void cleanupApplication();
49+
50+
/**
51+
* Called once per shuffle id when the shuffle id is first generated for a shuffle stage.
52+
*
53+
* @param shuffleId The unique identifier for the shuffle stage.
54+
*/
55+
default void registerShuffle(int shuffleId) {}
56+
57+
/**
58+
* Removes shuffle data associated with the given shuffle.
59+
*
60+
* @param shuffleId The unique identifier for the shuffle stage.
61+
* @param blocking Whether this call should block on the deletion of the data.
62+
*/
63+
default void removeShuffle(int shuffleId, boolean blocking) {}
64+
}

core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.shuffle.api;
1919

2020
import java.io.IOException;
21+
import java.util.Map;
2122
import java.util.Optional;
2223

2324
import org.apache.spark.annotation.Private;
@@ -34,21 +35,26 @@ public interface ShuffleExecutorComponents {
3435
/**
3536
* Called once per executor to bootstrap this module with state that is specific to
3637
* that executor, specifically the application ID and executor ID.
38+
*
39+
* @param appId The Spark application id
40+
* @param execId The unique identifier of the executor being initialized
41+
* @param extraConfigs Extra configs that were returned by
42+
* {@link ShuffleDriverComponents#initializeApplication()}
3743
*/
38-
void initializeExecutor(String appId, String execId);
44+
void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs);
3945

4046
/**
4147
* Called once per map task to create a writer that will be responsible for persisting all the
4248
* partitioned bytes written by that map task.
4349
*
4450
* @param shuffleId Unique identifier for the shuffle the map task is a part of
45-
* @param mapId An ID of the map task. The ID is unique within this Spark application.
51+
* @param mapTaskId An ID of the map task. The ID is unique within this Spark application.
4652
* @param numPartitions The number of partitions that will be written by the map task. Some of
4753
* these partitions may be empty.
4854
*/
4955
ShuffleMapOutputWriter createMapOutputWriter(
5056
int shuffleId,
51-
long mapId,
57+
long mapTaskId,
5258
int numPartitions) throws IOException;
5359

5460
/**

core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.spark.shuffle.sort.io;
1919

2020
import org.apache.spark.SparkConf;
21-
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
2221
import org.apache.spark.shuffle.api.ShuffleDataIO;
22+
import org.apache.spark.shuffle.api.ShuffleDriverComponents;
23+
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
2324

2425
/**
2526
* Implementation of the {@link ShuffleDataIO} plugin system that replicates the local shuffle
@@ -37,4 +38,9 @@ public LocalDiskShuffleDataIO(SparkConf sparkConf) {
3738
public ShuffleExecutorComponents executor() {
3839
return new LocalDiskShuffleExecutorComponents(sparkConf);
3940
}
41+
42+
@Override
43+
public ShuffleDriverComponents driver() {
44+
return new LocalDiskShuffleDriverComponents();
45+
}
4046
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort.io;
19+
20+
import java.util.Collections;
21+
import java.util.Map;
22+
23+
import org.apache.spark.SparkEnv;
24+
import org.apache.spark.shuffle.api.ShuffleDriverComponents;
25+
import org.apache.spark.storage.BlockManagerMaster;
26+
27+
public class LocalDiskShuffleDriverComponents implements ShuffleDriverComponents {
28+
29+
private BlockManagerMaster blockManagerMaster;
30+
31+
@Override
32+
public Map<String, String> initializeApplication() {
33+
blockManagerMaster = SparkEnv.get().blockManager().master();
34+
return Collections.emptyMap();
35+
}
36+
37+
@Override
38+
public void cleanupApplication() {
39+
// nothing to clean up
40+
}
41+
42+
@Override
43+
public void removeShuffle(int shuffleId, boolean blocking) {
44+
if (blockManagerMaster == null) {
45+
throw new IllegalStateException("Driver components must be initialized before using");
46+
}
47+
blockManagerMaster.removeShuffle(shuffleId, blocking);
48+
}
49+
}

core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.shuffle.sort.io;
1919

20+
import java.util.Map;
2021
import java.util.Optional;
2122

2223
import com.google.common.annotations.VisibleForTesting;
@@ -50,7 +51,7 @@ public LocalDiskShuffleExecutorComponents(
5051
}
5152

5253
@Override
53-
public void initializeExecutor(String appId, String execId) {
54+
public void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs) {
5455
blockManager = SparkEnv.get().blockManager();
5556
if (blockManager == null) {
5657
throw new IllegalStateException("No blockManager available from the SparkEnv.");
@@ -61,14 +62,14 @@ public void initializeExecutor(String appId, String execId) {
6162
@Override
6263
public ShuffleMapOutputWriter createMapOutputWriter(
6364
int shuffleId,
64-
long mapId,
65+
long mapTaskId,
6566
int numPartitions) {
6667
if (blockResolver == null) {
6768
throw new IllegalStateException(
6869
"Executor components must be initialized before getting writers.");
6970
}
7071
return new LocalDiskShuffleMapOutputWriter(
71-
shuffleId, mapId, numPartitions, blockResolver, sparkConf);
72+
shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
7273
}
7374

7475
@Override

core/src/main/resources/org/apache/spark/log4j-defaults.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
3333
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
3434
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
3535

36-
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
36+
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
37+
# in SparkSQL with Hive support
3738
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
3839
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
3940

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config._
2929
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
30+
import org.apache.spark.shuffle.api.ShuffleDriverComponents
3031
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils}
3132

3233
/**
@@ -58,7 +59,9 @@ private class CleanupTaskWeakReference(
5859
* to be processed when the associated object goes out of scope of the application. Actual
5960
* cleanup is performed in a separate daemon thread.
6061
*/
61-
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
62+
private[spark] class ContextCleaner(
63+
sc: SparkContext,
64+
shuffleDriverComponents: ShuffleDriverComponents) extends Logging {
6265

6366
/**
6467
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
@@ -221,7 +224,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
221224
try {
222225
logDebug("Cleaning shuffle " + shuffleId)
223226
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
224-
blockManagerMaster.removeShuffle(shuffleId, blocking)
227+
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
225228
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
226229
logDebug("Cleaned shuffle " + shuffleId)
227230
} catch {
@@ -269,7 +272,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
269272
}
270273
}
271274

272-
private def blockManagerMaster = sc.env.blockManager.master
273275
private def broadcastManager = sc.env.broadcastManager
274276
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
275277
}

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
9696
shuffleId, this)
9797

9898
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
99+
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
99100
}
100101

101102

0 commit comments

Comments
 (0)