Skip to content

Commit 922707e

Browse files
committed
Receive InputStream/OutputStream instead of File to be compatible with any filesystems
1 parent d94dc9c commit 922707e

File tree

2 files changed

+35
-29
lines changed

2 files changed

+35
-29
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSnapshotter.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,73 +38,70 @@ public KVStoreSnapshotter(KVStoreSerializer serializer) {
3838
this.serializer = serializer;
3939
}
4040

41-
public void dump(KVStore store, File snapshotFile) throws Exception {
42-
DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile));
43-
41+
/** Dump current KVStore to the output stream - caller should close the output stream. */
42+
public void dump(KVStore store, DataOutputStream snapshotStream) throws Exception {
4443
// store metadata if it exists
4544
Class<?> metadataType = store.metadataType();
4645
if (metadataType != null) {
47-
writeClassName(metadataType, output);
46+
writeClassName(metadataType, snapshotStream);
4847
Object metadata = store.getMetadata(metadataType);
49-
writeObject(metadata, output);
50-
writeEndOfType(output);
48+
writeObject(metadata, snapshotStream);
49+
writeEndOfType(snapshotStream);
5150
} else {
52-
writeEndOfType(output);
51+
writeEndOfType(snapshotStream);
5352
}
5453

5554
Set<Class<?>> types = store.types();
5655
for (Class<?> clazz : types) {
57-
writeClassName(clazz, output);
56+
writeClassName(clazz, snapshotStream);
5857

5958
KVStoreView<?> view = store.view(clazz);
6059
for (Object obj : view) {
61-
writeObject(obj, output);
60+
writeObject(obj, snapshotStream);
6261
}
6362

64-
writeEndOfType(output);
63+
writeEndOfType(snapshotStream);
6564
}
6665

67-
writeEndOfFile(output);
68-
output.close();
66+
writeEndOfFile(snapshotStream);
6967
}
7068

71-
public void restore(File snapshotFile, KVStore store) throws Exception {
72-
DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile));
73-
69+
/** Restore current KVStore from the input stream - caller should close the input stream. */
70+
public void restore(DataInputStream snapshotStream, KVStore store) throws Exception {
7471
// first one would be metadata
75-
int metadataClazzLen = input.readInt();
72+
int metadataClazzLen = snapshotStream.readInt();
7673
if (metadataClazzLen > 0) {
77-
Class<?> metadataClazz = readClassName(input, metadataClazzLen);
74+
Class<?> metadataClazz = readClassName(snapshotStream, metadataClazzLen);
7875
// metadata presented
79-
int objLen = input.readInt();
80-
Object metadata = readObj(input, metadataClazz, objLen);
76+
int objLen = snapshotStream.readInt();
77+
Object metadata = readObj(snapshotStream, metadataClazz, objLen);
8178
store.setMetadata(metadata);
8279

8380
// additionally read -2 as end of type
84-
consumeEndOfType(input);
81+
consumeEndOfType(snapshotStream);
8582
}
8683

8784
boolean eof = false;
8885
while (!eof) {
89-
int typeClazzNameLen = input.readInt();
86+
int typeClazzNameLen = snapshotStream.readInt();
9087
if (typeClazzNameLen == MARKER_END_OF_FILE) {
9188
eof = true;
9289
} else {
93-
Class<?> typeClazz = readClassName(input, typeClazzNameLen);
90+
Class<?> typeClazz = readClassName(snapshotStream, typeClazzNameLen);
9491
boolean eot = false;
9592
while (!eot) {
96-
int objLen = input.readInt();
93+
int objLen = snapshotStream.readInt();
9794
if (objLen == MARKER_END_OF_TYPE) {
9895
eot = true;
9996
} else {
100-
Object obj = readObj(input, typeClazz, objLen);
97+
Object obj = readObj(snapshotStream, typeClazz, objLen);
10198
store.write(obj);
10299
}
103100
}
104101
}
105102
}
106103

107-
input.close();
104+
snapshotStream.close();
108105
}
109106

110107
private void writeClassName(Class<?> clazz, DataOutputStream output) throws IOException {

common/kvstore/src/test/java/org/apache/spark/util/kvstore/KVStoreSnapshotterSuite.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
import org.apache.commons.io.FileUtils;
2121
import org.junit.Test;
2222

23+
import java.io.DataInputStream;
24+
import java.io.DataOutputStream;
2325
import java.io.File;
26+
import java.io.FileInputStream;
27+
import java.io.FileOutputStream;
2428
import java.util.Arrays;
2529
import java.util.HashSet;
2630
import java.util.Set;
@@ -88,12 +92,17 @@ private void runTestSnapshotAndRestore(KVStore source, KVStore destination) thro
8892
snapshotFile.delete();
8993
assertFalse(snapshotFile.exists());
9094

95+
9196
try {
92-
snapshotter.dump(source, snapshotFile);
93-
assertTrue(snapshotFile.exists() && snapshotFile.isFile());
94-
assertTrue(snapshotFile.length() > 0);
97+
try (DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile))) {
98+
snapshotter.dump(source, output);
99+
assertTrue(snapshotFile.exists() && snapshotFile.isFile());
100+
assertTrue(snapshotFile.length() > 0);
101+
}
95102

96-
snapshotter.restore(snapshotFile, destination);
103+
try (DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile))) {
104+
snapshotter.restore(input, destination);
105+
}
97106

98107
Class<?> metadataType = source.metadataType();
99108
assertEquals(destination.metadataType(), metadataType);

0 commit comments

Comments
 (0)