Skip to content

Commit edbc135

Browse files
committed
Add retries in the exectutor to connect to the driver
1 parent 8949bc7 commit edbc135

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,11 @@ public TransportClient createClient(String remoteHost, int remotePort)
172172
final long preResolveHost = System.nanoTime();
173173
final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
174174
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
175+
final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed";
175176
if (hostResolveTimeMs > 2000) {
176-
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
177+
logger.warn("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
177178
} else {
178-
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
179+
logger.trace("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
179180
}
180181

181182
synchronized (clientPool.locks[clientIndex]) {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,19 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
283283
executorConf,
284284
new SecurityManager(executorConf),
285285
clientMode = true)
286-
val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
286+
287+
var driver: RpcEndpointRef = null
288+
val nTries = 3
289+
for (i <- 0 until nTries if driver == null) {
290+
try {
291+
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
292+
} catch {
293+
case e: Throwable => if (i == nTries - 1) {
294+
throw e
295+
}
296+
}
297+
}
298+
287299
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
288300
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
289301
fetcher.shutdown()

0 commit comments

Comments
 (0)