diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index b886fce9be21..eb602ea10a6b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -150,6 +150,11 @@ protected void handleMessage( int numRemovedBlocks = blockManager.removeBlocks(msg.appId, msg.execId, msg.blockIds); callback.onSuccess(new BlocksRemoved(numRemovedBlocks).toByteBuffer()); + } else if (msgObj instanceof AreExecutorsRegistered) { + AreExecutorsRegistered msg = (AreExecutorsRegistered) msgObj; + checkAuth(client, msg.appId); + callback.onSuccess(blockManager.areExecutorsRegistered(msg.appId, msg.execIds)); + } else { throw new UnsupportedOperationException("Unexpected message: " + msgObj); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 85d278138c2b..49fe699868ad 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -128,6 +128,18 @@ public void fetchBlocks( } } + public ByteBuffer queryExecutorStatuses( + String host, + int port, + String[] execIds) throws IOException, InterruptedException{ + checkInit(); + logger.debug("Query executor statuses in External shuffle service from {}:{}", host, port); + TransportClient client = clientFactory.createClient(host, port); + ByteBuffer queryExecutorStatusesMsg = new AreExecutorsRegistered(appId, execIds).toByteBuffer(); + int timeout = conf.connectionTimeoutMs() + conf.ioRetryWaitTimeMs(); + return client.sendRpcSync(queryExecutorStatusesMsg, timeout); + } + @Override public MetricSet shuffleMetrics() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index beca5d6e5a78..12ca6f7ff3e5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -18,6 +18,7 @@ package org.apache.spark.network.shuffle; import java.io.*; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentMap; @@ -164,6 +165,19 @@ public void registerExecutor( executors.put(fullId, executorInfo); } + /** + * Judge whether these executors are registered. + */ + public ByteBuffer areExecutorsRegistered(String appId, String[] execIds) { + byte[] statuses = new byte[execIds.length]; + for (int i = 0; i < execIds.length; i++) { + if(executors.containsKey(new AppExecId(appId, execIds[i]))) { + statuses[i] = 1; + } + } + return ByteBuffer.wrap(statuses); + } + /** * Obtains a FileSegmentManagedBuffer from a single block (shuffleId, mapId, reduceId). */ diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/AreExecutorsRegistered.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/AreExecutorsRegistered.java new file mode 100644 index 000000000000..01919f6fc8e3 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/AreExecutorsRegistered.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; +import org.apache.spark.network.protocol.Encoders; + +import java.util.Arrays; + +// Needed by ScalaDoc. See SPARK-7726 + +/** Request to query whether these executors are registered. */ +public class AreExecutorsRegistered extends BlockTransferMessage { + public final String appId; + public final String[] execIds; + + public AreExecutorsRegistered(String appId, String[] execIds) { + this.appId = appId; + this.execIds = execIds; + } + + @Override + protected Type type() { return Type.ARE_EXECUTORS_REGISTERED; } + + @Override + public int hashCode() { + return Objects.hashCode(appId) * 41 + Arrays.hashCode(execIds); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appId", appId) + .add("execIds", Arrays.toString(execIds)) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof AreExecutorsRegistered) { + AreExecutorsRegistered o = (AreExecutorsRegistered) other; + return Objects.equal(appId, o.appId) + && Arrays.equals(execIds, o.execIds); + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId) + + Encoders.StringArrays.encodedLength(execIds); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + Encoders.StringArrays.encode(buf, execIds); + } + + public static AreExecutorsRegistered decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + String[] execIds = Encoders.StringArrays.decode(buf); + return new AreExecutorsRegistered(appId, execIds); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 41dd55847ebd..81e6c3a2b74e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -47,7 +47,7 @@ public abstract class BlockTransferMessage implements Encodable { public enum Type { OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8), - FETCH_SHUFFLE_BLOCKS(9); + FETCH_SHUFFLE_BLOCKS(9), ARE_EXECUTORS_REGISTERED(10); private final byte id; @@ -76,6 +76,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 7: return RemoveBlocks.decode(buf); case 8: return BlocksRemoved.decode(buf); case 9: return FetchShuffleBlocks.decode(buf); + case 10: return AreExecutorsRegistered.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index 455351fcf767..4e06b8681f10 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -36,6 +36,7 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.shuffle.protocol.AreExecutorsRegistered; import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks; @@ -222,4 +223,14 @@ public void testBadMessages() { verify(callback, never()).onSuccess(any(ByteBuffer.class)); verify(callback, never()).onFailure(any(Throwable.class)); } + + @Test + public void testAreExecutorsRegistered() { + RpcResponseCallback callback = mock(RpcResponseCallback.class); + String appId = "app0"; + String[] execIds = new String[] {"exec1", "exec2"}; + ByteBuffer areExecutorsRegistered = new AreExecutorsRegistered(appId, execIds).toByteBuffer(); + handler.receive(client, areExecutorsRegistered, callback); + verify(blockResolver, times(1)).areExecutorsRegistered(appId, execIds); + } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 09b31430b1eb..800412ca7612 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; @@ -162,4 +163,13 @@ private void assertPathsMatch(String p1, String p2, String p3, String expectedPa String returnedPath = file.getPath(); assertTrue(normPathname == returnedPath); } + + @Test + public void testAreExecutorsRegistered() throws IOException { + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); + resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo(SORT_MANAGER)); + String[] execIds = new String[] {"exec2", "exec3"}; + ByteBuffer statuses = resolver.areExecutorsRegistered("app0", execIds); + assert(statuses.equals(ByteBuffer.wrap(new byte[] {1, 0}))); + } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f4a6239a09e..8048e6f23cb3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -152,6 +152,15 @@ private class ShuffleStatus(numPartitions: Int) { removeOutputsByFilter(x => x.executorId == execId) } + /** + * Removes all map outputs associated with these specified executors. Note that this will also + * remove outputs which are served by an external shuffle server (if one exists), as they are + * still registered with these execIds. + */ + def removeOutputsOnExecutors(execIds: Seq[String]): Unit = withWriteLock { + removeOutputsByFilter(x => execIds.contains(x.executorId)) + } + /** * Removes all shuffle outputs which satisfies the filter. Note that this will also * remove outputs which are served by an external shuffle server (if one exists). @@ -518,6 +527,14 @@ private[spark] class MapOutputTrackerMaster( incrementEpoch() } + /** + * Get all executors registered on this host. + */ + def getExecutorsRegisteredOnHost(host: String): Array[String] = { + shuffleStatuses.valuesIterator.flatMap(_.mapStatuses.map(_.location).filter(_ == host) + .map(_.executorId)).toSet.toArray + } + /** * Removes all shuffle outputs associated with this executor. Note that this will also remove * outputs which are served by an external shuffle server (if one exists), as they are still @@ -528,6 +545,16 @@ private[spark] class MapOutputTrackerMaster( incrementEpoch() } + /** + * Removes all shuffle outputs associated with these executors. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists), as they are still + * registered with these execIds. + */ + def removeOutputsOnExecutors(execIds: Seq[String]): Unit = { + shuffleStatuses.valuesIterator.foreach{ _.removeOutputsOnExecutors(execIds) } + incrementEpoch() + } + /** Check if the given shuffle is being tracked */ def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c3e1cd8b23f1..fdc28860bdae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,6 +35,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY +import org.apache.spark.network.shuffle.ExternalBlockStoreClient import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData} @@ -1668,8 +1669,9 @@ private[spark] class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { + val executorNotRegistered = failureMessage.contains("Executor is not registered") val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { + (unRegisterOutputOnHostOnFetchFailure || executorNotRegistered)) { // We had a fetch failure with the external shuffle service, so we // assume all shuffle data on the node is bad. Some(bmAddress.host) @@ -1682,7 +1684,8 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + maybeEpoch = Some(task.epoch), + unRegisterOutputOnHostOnFetchFailure) } } @@ -1838,18 +1841,36 @@ private[spark] class DAGScheduler( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], - maybeEpoch: Option[Long] = None): Unit = { + maybeEpoch: Option[Long] = None, + unRegisterOutputOnHost: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) if (fileLost) { - hostToUnregisterOutputs match { - case Some(host) => + (hostToUnregisterOutputs, unRegisterOutputOnHost) match { + case (Some(host), false) => + val execIdsOnHost = mapOutputTracker.getExecutorsRegisteredOnHost(host) + val port = SparkEnv.get.conf.get(config.SHUFFLE_SERVICE_PORT) + try { + val statuses = SparkEnv.get.blockManager.blockStoreClient + .asInstanceOf[ExternalBlockStoreClient] + .queryExecutorStatuses(host, port, execIdsOnHost).array() + if (statuses.size == execIdsOnHost.size) { + val execIdsNotRegistered = execIdsOnHost.zip(statuses).filter(_._2 == 0).map(_._1) + logInfo("Shuffle files lost for executors: %s (epoch %d)" + .format(execIdsNotRegistered.toString, currentEpoch)) + mapOutputTracker.removeOutputsOnExecutors(execIdsNotRegistered) + } + } catch { + case e: Exception => + logError("Exception thrown when querying executor statuses", e) + } + case (Some(host), true) => logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) mapOutputTracker.removeOutputsOnHost(host) - case None => + case (None, _) => logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) mapOutputTracker.removeOutputsOnExecutor(execId) }