Skip to content

Commit 5fd0294

Browse files
jerryshaosrowen
authored andcommitted
[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths
## What changes were proposed in this pull request? Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects: 1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM. 2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause. https://bugs.openjdk.java.net/browse/JDK-8080225 https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle. Left unchanged FileInputStream in core which I think is not so critical: ``` ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467: val file = new DataInputStream(new FileInputStream(filename)) ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942: val in = new FileInputStream(new File(path)) ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76: val fileIn = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248: val fis = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910: input = new FileInputStream(new File(t)) ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream} ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132: case Some(f) => new FileInputStream(f) ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream} ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77: val fis = new FileInputStream(f) ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94: new DataInputStream(new NioBufferedFileInputStream(index)) ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111: val channel = new FileInputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219: val channel = new FileInputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} ./core/src/main/scala/org/apache/spark/TestUtils.scala:106: val in = new FileInputStream(file) ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89: inputStream = new FileInputStream(activeFile) ./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] ./core/src/main/scala/org/apache/spark/util/Utils.scala:332: val inChannel = in.asInstanceOf[FileInputStream].getChannel() ./core/src/main/scala/org/apache/spark/util/Utils.scala:1533: gzInputStream = new GZIPInputStream(new FileInputStream(file)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:1560: new GZIPInputStream(new FileInputStream(file)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:1562: new FileInputStream(file) ./core/src/main/scala/org/apache/spark/util/Utils.scala:2090: val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8) ``` Left unchanged FileOutputStream in core: ``` ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957: val out = new FileOutputStream(file) ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException} ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131: val dos = new DataOutputStream(new FileOutputStream(f)) ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62: val fileOut = new FileOutputStream(file) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160: val outStream = new FileOutputStream(outPath) ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239: val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949: val out = new FileOutputStream(tempFile) ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException} ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106: val out = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109: * Therefore, for local files, use FileOutputStream instead. */ ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112: new FileOutputStream(uri.getPath) ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71: private var fos: FileOutputStream = null ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102: fos = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213: var truncateStream: FileOutputStream = null ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215: truncateStream = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153: val out = new FileOutputStream(file).getChannel() ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} ./core/src/main/scala/org/apache/spark/TestUtils.scala:81: val jarStream = new JarOutputStream(new FileOutputStream(jarFile)) ./core/src/main/scala/org/apache/spark/TestUtils.scala:96: val jarFileStream = new FileOutputStream(jarFile) ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException} ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31: volatile private var outputStream: FileOutputStream = null ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97: outputStream = new FileOutputStream(file, true) ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90: gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile)) ./core/src/main/scala/org/apache/spark/util/Utils.scala:329: if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] ./core/src/main/scala/org/apache/spark/util/Utils.scala:333: val outChannel = out.asInstanceOf[FileOutputStream].getChannel() ./core/src/main/scala/org/apache/spark/util/Utils.scala:527: val out = new FileOutputStream(tempFile) ``` Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy to change to NIO Files API. For the `FileInputStream` and `FileOutputStream` in common/shuffle* I changed them all. ## How was this patch tested? Existing tests and manual verification. Author: jerryshao <sshao@hortonworks.com> Closes #18684 from jerryshao/SPARK-21475.
1 parent 110695d commit 5fd0294

File tree

8 files changed

+49
-37
lines changed

8 files changed

+49
-37
lines changed

common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark.network.buffer;
1919

2020
import java.io.File;
21-
import java.io.FileInputStream;
2221
import java.io.IOException;
2322
import java.io.InputStream;
2423
import java.io.RandomAccessFile;
2524
import java.nio.ByteBuffer;
2625
import java.nio.channels.FileChannel;
26+
import java.nio.file.Files;
27+
import java.nio.file.StandardOpenOption;
2728

2829
import com.google.common.base.Objects;
2930
import com.google.common.io.ByteStreams;
@@ -93,9 +94,9 @@ public ByteBuffer nioByteBuffer() throws IOException {
9394

9495
@Override
9596
public InputStream createInputStream() throws IOException {
96-
FileInputStream is = null;
97+
InputStream is = null;
9798
try {
98-
is = new FileInputStream(file);
99+
is = Files.newInputStream(file.toPath());
99100
ByteStreams.skipFully(is, offset);
100101
return new LimitedInputStream(is, length);
101102
} catch (IOException e) {
@@ -132,7 +133,7 @@ public Object convertToNetty() throws IOException {
132133
if (conf.lazyFileDescriptor()) {
133134
return new DefaultFileRegion(file, offset, length);
134135
} else {
135-
FileChannel fileChannel = new FileInputStream(file).getChannel();
136+
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
136137
return new DefaultFileRegion(fileChannel, offset, length);
137138
}
138139
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.apache.spark.network.shuffle;
1919

2020
import java.io.File;
21-
import java.io.FileOutputStream;
2221
import java.io.IOException;
2322
import java.nio.ByteBuffer;
2423
import java.nio.channels.Channels;
2524
import java.nio.channels.WritableByteChannel;
25+
import java.nio.file.Files;
2626
import java.util.Arrays;
2727

2828
import org.slf4j.Logger;
@@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback {
165165

166166
DownloadCallback(int chunkIndex) throws IOException {
167167
this.targetFile = tempShuffleFileManager.createTempShuffleFile();
168-
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
168+
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
169169
this.chunkIndex = chunkIndex;
170170
}
171171

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import java.io.DataInputStream;
2121
import java.io.File;
22-
import java.io.FileInputStream;
2322
import java.io.IOException;
2423
import java.nio.ByteBuffer;
2524
import java.nio.LongBuffer;
25+
import java.nio.file.Files;
2626

2727
/**
2828
* Keeps the index information for a particular map output
@@ -38,7 +38,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
3838
offsets = buffer.asLongBuffer();
3939
DataInputStream dis = null;
4040
try {
41-
dis = new DataInputStream(new FileInputStream(indexFile));
41+
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
4242
dis.readFully(buffer.array());
4343
} finally {
4444
if (dis != null) {

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.shuffle.sort;
1919

2020
import java.io.File;
21-
import java.io.FileInputStream;
22-
import java.io.FileOutputStream;
2321
import java.io.IOException;
22+
import java.nio.channels.FileChannel;
23+
import static java.nio.file.StandardOpenOption.*;
2424
import javax.annotation.Nullable;
2525

2626
import scala.None$;
@@ -75,7 +75,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7575
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
7676

7777
private final int fileBufferSize;
78-
private final boolean transferToEnabled;
7978
private final int numPartitions;
8079
private final BlockManager blockManager;
8180
private final Partitioner partitioner;
@@ -107,7 +106,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
107106
SparkConf conf) {
108107
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
109108
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
110-
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
111109
this.blockManager = blockManager;
112110
final ShuffleDependency<K, V, V> dep = handle.dependency();
113111
this.mapId = mapId;
@@ -188,17 +186,21 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
188186
return lengths;
189187
}
190188

191-
final FileOutputStream out = new FileOutputStream(outputFile, true);
189+
// This file needs to opened in append mode in order to work around a Linux kernel bug that
190+
// affects transferTo; see SPARK-3948 for more details.
191+
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
192192
final long writeStartTime = System.nanoTime();
193193
boolean threwException = true;
194194
try {
195195
for (int i = 0; i < numPartitions; i++) {
196196
final File file = partitionWriterSegments[i].file();
197197
if (file.exists()) {
198-
final FileInputStream in = new FileInputStream(file);
198+
final FileChannel in = FileChannel.open(file.toPath(), READ);
199199
boolean copyThrewException = true;
200200
try {
201-
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
201+
long size = in.size();
202+
Utils.copyFileStreamNIO(in, out, 0, size);
203+
lengths[i] = size;
202204
copyThrewException = false;
203205
} finally {
204206
Closeables.close(in, copyThrewException);

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import javax.annotation.Nullable;
2121
import java.io.*;
2222
import java.nio.channels.FileChannel;
23+
import static java.nio.file.StandardOpenOption.*;
2324
import java.util.Iterator;
2425

2526
import scala.Option;
@@ -290,7 +291,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
290291
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
291292
try {
292293
if (spills.length == 0) {
293-
new FileOutputStream(outputFile).close(); // Create an empty file
294+
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
294295
return new long[partitioner.numPartitions()];
295296
} else if (spills.length == 1) {
296297
// Here, we don't need to perform any metrics updates because the bytes written to this
@@ -367,7 +368,7 @@ private long[] mergeSpillsWithFileStream(
367368
final InputStream[] spillInputStreams = new InputStream[spills.length];
368369

369370
final OutputStream bos = new BufferedOutputStream(
370-
new FileOutputStream(outputFile),
371+
java.nio.file.Files.newOutputStream(outputFile.toPath()),
371372
outputBufferSizeInBytes);
372373
// Use a counting output stream to avoid having to close the underlying file and ask
373374
// the file system for its size after each partition is written.
@@ -442,11 +443,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
442443
boolean threwException = true;
443444
try {
444445
for (int i = 0; i < spills.length; i++) {
445-
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
446+
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
446447
}
447448
// This file needs to opened in append mode in order to work around a Linux kernel bug that
448449
// affects transferTo; see SPARK-3948 for more details.
449-
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
450+
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
450451

451452
long bytesWrittenToMergedFile = 0;
452453
for (int partition = 0; partition < numPartitions; partition++) {

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.shuffle
1919

2020
import java.io._
21+
import java.nio.file.Files
2122

2223
import com.google.common.io.ByteStreams
2324

@@ -141,7 +142,8 @@ private[spark] class IndexShuffleBlockResolver(
141142
val indexFile = getIndexFile(shuffleId, mapId)
142143
val indexTmp = Utils.tempFileWith(indexFile)
143144
try {
144-
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
145+
val out = new DataOutputStream(
146+
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
145147
Utils.tryWithSafeFinally {
146148
// We take in lengths of each block, need to convert it to offsets.
147149
var offset = 0L
@@ -196,7 +198,7 @@ private[spark] class IndexShuffleBlockResolver(
196198
// find out the consolidated file, then the offset within that from our index
197199
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
198200

199-
val in = new DataInputStream(new FileInputStream(indexFile))
201+
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
200202
try {
201203
ByteStreams.skipFully(in, blockId.reduceId * 8)
202204
val offset = in.readLong()

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.util.collection
1919

2020
import java.io._
21+
import java.nio.channels.{Channels, FileChannel}
22+
import java.nio.file.StandardOpenOption
2123
import java.util.Comparator
2224

2325
import scala.collection.BufferedIterator
@@ -460,7 +462,7 @@ class ExternalAppendOnlyMap[K, V, C](
460462
)
461463

462464
private var batchIndex = 0 // Which batch we're in
463-
private var fileStream: FileInputStream = null
465+
private var fileChannel: FileChannel = null
464466

465467
// An intermediate stream that reads from exactly one batch
466468
// This guards against pre-fetching and other arbitrary behavior of higher level streams
@@ -477,22 +479,23 @@ class ExternalAppendOnlyMap[K, V, C](
477479
if (batchIndex < batchOffsets.length - 1) {
478480
if (deserializeStream != null) {
479481
deserializeStream.close()
480-
fileStream.close()
482+
fileChannel.close()
481483
deserializeStream = null
482-
fileStream = null
484+
fileChannel = null
483485
}
484486

485487
val start = batchOffsets(batchIndex)
486-
fileStream = new FileInputStream(file)
487-
fileStream.getChannel.position(start)
488+
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
489+
fileChannel.position(start)
488490
batchIndex += 1
489491

490492
val end = batchOffsets(batchIndex)
491493

492494
assert(end >= start, "start = " + start + ", end = " + end +
493495
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
494496

495-
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
497+
val bufferedStream = new BufferedInputStream(
498+
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
496499
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
497500
ser.deserializeStream(wrappedStream)
498501
} else {
@@ -552,9 +555,9 @@ class ExternalAppendOnlyMap[K, V, C](
552555
ds.close()
553556
deserializeStream = null
554557
}
555-
if (fileStream != null) {
556-
fileStream.close()
557-
fileStream = null
558+
if (fileChannel != null) {
559+
fileChannel.close()
560+
fileChannel = null
558561
}
559562
if (file.exists()) {
560563
if (!file.delete()) {

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.util.collection
1919

2020
import java.io._
21+
import java.nio.channels.{Channels, FileChannel}
22+
import java.nio.file.StandardOpenOption
2123
import java.util.Comparator
2224

2325
import scala.collection.mutable
@@ -492,7 +494,7 @@ private[spark] class ExternalSorter[K, V, C](
492494

493495
// Intermediate file and deserializer streams that read from exactly one batch
494496
// This guards against pre-fetching and other arbitrary behavior of higher level streams
495-
var fileStream: FileInputStream = null
497+
var fileChannel: FileChannel = null
496498
var deserializeStream = nextBatchStream() // Also sets fileStream
497499

498500
var nextItem: (K, C) = null
@@ -505,22 +507,23 @@ private[spark] class ExternalSorter[K, V, C](
505507
if (batchId < batchOffsets.length - 1) {
506508
if (deserializeStream != null) {
507509
deserializeStream.close()
508-
fileStream.close()
510+
fileChannel.close()
509511
deserializeStream = null
510-
fileStream = null
512+
fileChannel = null
511513
}
512514

513515
val start = batchOffsets(batchId)
514-
fileStream = new FileInputStream(spill.file)
515-
fileStream.getChannel.position(start)
516+
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
517+
fileChannel.position(start)
516518
batchId += 1
517519

518520
val end = batchOffsets(batchId)
519521

520522
assert(end >= start, "start = " + start + ", end = " + end +
521523
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
522524

523-
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
525+
val bufferedStream = new BufferedInputStream(
526+
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
524527

525528
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
526529
serInstance.deserializeStream(wrappedStream)
@@ -610,7 +613,7 @@ private[spark] class ExternalSorter[K, V, C](
610613
batchId = batchOffsets.length // Prevent reading any other batch
611614
val ds = deserializeStream
612615
deserializeStream = null
613-
fileStream = null
616+
fileChannel = null
614617
if (ds != null) {
615618
ds.close()
616619
}

0 commit comments

Comments
 (0)