Skip to content

Commit acedb62

Browse files
committed
switch to writing out one record per executor
1 parent 79922b7 commit acedb62

File tree

2 files changed

+54
-51
lines changed

2 files changed

+54
-51
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import com.google.common.base.Objects;
2929
import com.google.common.collect.Maps;
3030
import org.fusesource.leveldbjni.JniDBFactory;
31+
import org.fusesource.leveldbjni.internal.NativeDB;
3132
import org.iq80.leveldb.DB;
33+
import org.iq80.leveldb.DBIterator;
3234
import org.iq80.leveldb.Options;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
@@ -66,7 +68,7 @@ public class ExternalShuffleBlockResolver {
6668
@VisibleForTesting
6769
final DB db;
6870

69-
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) {
71+
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException {
7072
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
7173
// Add `spark` prefix because it will run in NM in Yarn mode.
7274
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
@@ -77,18 +79,31 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
7779
ExternalShuffleBlockResolver(
7880
TransportConf conf,
7981
File registeredExecutorFile,
80-
Executor directoryCleaner) {
82+
Executor directoryCleaner) throws IOException {
8183
this.conf = conf;
8284
this.registeredExecutorFile = registeredExecutorFile;
8385
if (registeredExecutorFile != null) {
8486
Options options = new Options();
85-
options.createIfMissing(true);
87+
options.createIfMissing(false);
8688
options.logger(new LevelDBLogger());
87-
JniDBFactory factory = new JniDBFactory();
8889
DB tmpDb;
8990
ConcurrentMap<AppExecId, ExecutorShuffleInfo> tmpExecutors;
9091
try {
91-
tmpDb = factory.open(registeredExecutorFile, options);
92+
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
93+
} catch (NativeDB.DBException e) {
94+
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
95+
logger.info("Creating state database at " + registeredExecutorFile);
96+
options.createIfMissing(true);
97+
try {
98+
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
99+
} catch (NativeDB.DBException dbExc) {
100+
throw new IOException("Unable to create state store", dbExc);
101+
}
102+
} else {
103+
throw e;
104+
}
105+
}
106+
try {
92107
tmpExecutors = reloadRegisteredExecutors(tmpDb);
93108
} catch (Exception e) {
94109
logger.info("Error opening leveldb file {}", registeredExecutorFile, e);
@@ -114,7 +129,13 @@ public void registerExecutor(
114129
synchronized (executors) {
115130
executors.put(fullId, executorInfo);
116131
try {
117-
saveRegisteredExecutors();
132+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
133+
ObjectOutputStream out = new ObjectOutputStream(bytesOut);
134+
out.writeObject(executorInfo);
135+
out.close();
136+
if (db != null) {
137+
db.put(dbAppExecKey(new AppExecId(appId, execId)), bytesOut.toByteArray());
138+
}
118139
} catch (Exception e) {
119140
logger.error("Error saving registered executors", e);
120141
}
@@ -173,6 +194,9 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
173194
// Only touch executors associated with the appId that was removed.
174195
if (appId.equals(fullId.appId)) {
175196
it.remove();
197+
if (db != null) {
198+
db.delete(dbAppExecKey(fullId));
199+
}
176200

177201
if (cleanupLocalDirs) {
178202
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
@@ -187,14 +211,6 @@ public void run() {
187211
}
188212
}
189213
}
190-
synchronized (executors) {
191-
try {
192-
saveRegisteredExecutors();
193-
} catch (Exception e) {
194-
logger.error("Error saving registered executors", e);
195-
}
196-
}
197-
198214
}
199215

200216
/**
@@ -308,41 +324,35 @@ public String toString() {
308324
}
309325
}
310326

311-
/**
312-
* write out the set of registered executors to a file so we can reload them on restart.
313-
* You must have a lock on executors when calling this
314-
*/
315-
private void saveRegisteredExecutors() throws IOException {
316-
if (db != null) {
317-
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
318-
ObjectOutputStream out = new ObjectOutputStream(bytes);
319-
out.writeObject(executors);
320-
out.close();
321-
db.put("registeredExecutors".getBytes(Charsets.UTF_8), bytes.toByteArray());
322-
}
327+
private static byte[] dbAppExecKey(AppExecId appExecId) {
328+
return (appExecId.appId + ";" + appExecId.execId).getBytes(Charsets.UTF_8);
329+
}
330+
331+
private static AppExecId parseDbAppExecKey(byte[] bytes) {
332+
String s = new String(bytes, Charsets.UTF_8);
333+
int p = s.indexOf(';');
334+
return new AppExecId(s.substring(0, p), s.substring(p + 1));
323335
}
324336

325337
@VisibleForTesting
326338
static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
327339
throws IOException, ClassNotFoundException {
340+
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
328341
if (db != null) {
329-
ObjectInputStream in = null;
330-
byte[] bytes = db.get("registeredExecutors".getBytes(Charsets.UTF_8));
331-
if (bytes != null) {
332-
try {
333-
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
334-
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors =
335-
(ConcurrentMap<AppExecId, ExecutorShuffleInfo>) in.readObject();
336-
in.close();
337-
return registeredExecutors;
338-
} finally {
339-
if (in != null) {
340-
in.close();
341-
}
342-
}
342+
DBIterator itr = db.iterator();
343+
itr.seekToFirst();
344+
while (itr.hasNext()) {
345+
Map.Entry<byte[], byte[]> e = itr.next();
346+
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(e.getValue()));
347+
AppExecId id = parseDbAppExecKey(e.getKey());
348+
registeredExecutors.put(
349+
id,
350+
(ExecutorShuffleInfo) in.readObject()
351+
);
352+
in.close();
343353
}
344354
}
345-
return Maps.newConcurrentMap();
355+
return registeredExecutors;
346356
}
347357

348358
private static class LevelDBLogger implements org.iq80.leveldb.Logger {

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,15 @@
1717

1818
package org.apache.spark.network.yarn;
1919

20-
import java.io.*;
20+
import java.io.File;
2121
import java.nio.ByteBuffer;
22-
import java.util.*;
23-
import java.util.Map.Entry;
22+
import java.util.List;
2423

2524
import com.google.common.annotations.VisibleForTesting;
2625
import com.google.common.collect.Lists;
2726
import org.apache.hadoop.conf.Configuration;
2827
import org.apache.hadoop.yarn.api.records.ContainerId;
29-
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
30-
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
31-
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
32-
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
33-
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
34-
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
35-
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
28+
import org.apache.hadoop.yarn.server.api.*;
3629
import org.slf4j.Logger;
3730
import org.slf4j.LoggerFactory;
3831

0 commit comments

Comments
 (0)