From 24235553adc748ccedf3998b0c174711b214e970 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 17 Sep 2014 22:07:11 +0000 Subject: [PATCH 1/4] [SPARK-3570] Include time to open files in shuffle write time. Opening shuffle files can be very significant when the disk is contended, especially when using ext3. While writing data to a file can avoid hitting disk (and instead hit the buffer cache), opening a file always involves writing some metadata about the file to disk, so the open time can be a very significant portion of the shuffle write time. In one recent job, the time to write shuffle data to the file was only 4ms for each task, but the time to open the file was about 100x as long (~400ms). When we added metrics about spilled data (#2504), we should ensure that the file open time is also included there. --- .../org/apache/spark/shuffle/FileShuffleBlockManager.scala | 4 ++++ .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index 7de2f9cbb286..7aec9f90f1e0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null + val openStartTime = System.nanoTime val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => @@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics) } } + // Creating the file to write to and creating a disk writer both involve interacting with + // the disk, so should be included in the shuffle write time. + writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) override def releaseWriters(success: Boolean) { if (consolidateShuffleFiles) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eaec5a71e681..dcd32421af54 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -352,6 +352,7 @@ private[spark] class ExternalSorter[K, V, C]( // Create our file writers if we haven't done so yet if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() + val openStartTime = System.nanoTime() partitionWriters = Array.fill(numPartitions) { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use @@ -359,6 +360,9 @@ private[spark] class ExternalSorter[K, V, C]( val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } + // Creating the file to write to and creating a disk writer both involve interacting with + // the disk, so should be included in the shuffle write time. + curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) } // No need to sort stuff, just write each element out From 42b7e43722817ff9540e3a92ee2621b997cf8d10 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 11:28:04 -0800 Subject: [PATCH 2/4] Fixed parens for nanotime --- .../scala/org/apache/spark/util/collection/ExternalSorter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index dcd32421af54..bd825eca90a7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -352,7 +352,7 @@ private[spark] class ExternalSorter[K, V, C]( // Create our file writers if we haven't done so yet if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() - val openStartTime = System.nanoTime() + val openStartTime = System.nanoTime partitionWriters = Array.fill(numPartitions) { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use From fdc518503e9ad492f0254960564fcd465c7e11e8 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 12:56:32 -0800 Subject: [PATCH 3/4] Improved comment --- .../org/apache/spark/util/collection/ExternalSorter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index bd825eca90a7..12e7439116cb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -361,7 +361,8 @@ private[spark] class ExternalSorter[K, V, C]( blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } // Creating the file to write to and creating a disk writer both involve interacting with - // the disk, so should be included in the shuffle write time. + // the disk, and can take a long time in aggregate when we open many files, so should be + // included in the shuffle write time. curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) } From ea3a4aed439c65e5ae3cedd8cddd0edf5d51b33b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 15 Feb 2015 23:12:01 -0800 Subject: [PATCH 4/4] Added comment about excluded open time --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 27496c5a289c..621b264b5e48 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } + // Don't bother including the time to open the merged output file in the shuffle write time, + // because it just opens a single file, so is typically too fast to measure accurately + // (see SPARK-3570). val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)