Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import org.apache.spark.{TaskContext, ShuffleDependency}
* boolean isDriver as parameters.
*/
private[spark] trait ShuffleManager {

/** Return short name for the ShuffleManager */
val shortShuffleMgrName: String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just call this shortName


/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager

private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)

override val shortShuffleMgrName: String = "hash"

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

override val shortShuffleMgrName: String = "sort"

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] class BlockManager(
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName)
shuffleManager.shortShuffleMgrName)

val MAX_ATTEMPTS = 3
val SLEEP_TIME_SECS = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}

if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
return getHashBasedShuffleBlockData(executor, blockId);
} else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)
|| "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager".equals(executor.shuffleManager)) {
if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
} else if ("hash".equals(executor.shuffleManager)) {
return getHashBasedShuffleBlockData(executor, blockId);
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle manager: " + executor.shuffleManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ public synchronized void onBlockFetchFailure(String blockId, Throwable t) {

// Register an executor so that the next steps work.
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[] { System.getProperty("java.io.tmpdir") }, 1,
"org.apache.spark.shuffle.sort.SortShuffleManager");
new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testBadRequests() throws IOException {

// Nonexistent shuffle block
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
dataContext.createExecutorInfo("sort"));
try {
resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
fail("Should have failed");
Expand All @@ -96,7 +96,7 @@ public void testBadRequests() throws IOException {
public void testSortShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
dataContext.createExecutorInfo("sort"));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
Expand All @@ -115,7 +115,7 @@ public void testSortShuffleBlocks() throws IOException {
public void testHashShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
dataContext.createExecutorInfo("hash"));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
public class ExternalShuffleIntegrationSuite {

static String APP_ID = "app-id";
static String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
static String HASH_MANAGER = "org.apache.spark.shuffle.hash.HashShuffleManager";
static String SORT_MANAGER = "sort";
static String HASH_MANAGER = "hash";

// Executor 0 is sort-based
static TestShuffleDataContext dataContext0;
Expand Down