From bf29d4a1d8e332941caba337286f26f2238095d4 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 16 Sep 2014 21:22:12 +0900 Subject: [PATCH 1/4] Modified FileSegment to close channel Modifid ShuffleBlockFetcherIterator to close block data file --- .../apache/spark/network/ManagedBuffer.scala | 10 ++++++-- .../storage/ShuffleBlockFetcherIterator.scala | 25 +++++++++++++------ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index dcecb6beeea9..81ab71976dce 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -19,6 +19,7 @@ package org.apache.spark.network import java.io.{FileInputStream, RandomAccessFile, File, InputStream} import java.nio.ByteBuffer +import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode import com.google.common.io.ByteStreams @@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt override def size: Long = length override def nioByteBuffer(): ByteBuffer = { - val channel = new RandomAccessFile(file, "r").getChannel - channel.map(MapMode.READ_ONLY, offset, length) + var channel: FileChannel = null + try { + channel = new RandomAccessFile(file, "r").getChannel + channel.map(MapMode.READ_ONLY, offset, length) + } finally { + channel.close() + } } override def inputStream(): InputStream = { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c8e708aa6b1b..d8c8b07a5dd5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,13 +17,14 @@ package org.apache.spark.storage +import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet import scala.collection.mutable.Queue -import org.apache.spark.{TaskContext, Logging, SparkException} +import org.apache.spark.{TaskContext, Logging} import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService} import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), - () => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + var is: InputStream = null + try { + is = data.inputStream() + results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( + blockManager.wrapForCompression(BlockId(blockId), is)).asIterator + )) + shuffleMetrics.remoteBytesRead += data.size + shuffleMetrics.remoteBlocksFetched += 1 + logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + } finally { + if (is != null) { + is.close() + } + } } override def onBlockFetchFailure(e: Throwable): Unit = { From b37231a7285836d540a21b263f812953e7c6d800 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 16 Sep 2014 22:22:12 +0900 Subject: [PATCH 2/4] Modified FileSegmentManagedBuffer#nioByteBuffer to check null or not before invoking channel.close --- .../main/scala/org/apache/spark/network/ManagedBuffer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index 81ab71976dce..e990c1da6730 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -72,7 +72,9 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt channel = new RandomAccessFile(file, "r").getChannel channel.map(MapMode.READ_ONLY, offset, length) } finally { - channel.close() + if (channel != null) { + channel.close() + } } } From 5f63f67fb8e1dc85788436581f49adc2cb8b32bc Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 17 Sep 2014 00:11:28 +0900 Subject: [PATCH 3/4] Move metrics increment logic and debug logging outside try block --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index d8c8b07a5dd5..6a6427c080d6 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -119,14 +119,14 @@ final class ShuffleBlockFetcherIterator( () => serializer.newInstance().deserializeStream( blockManager.wrapForCompression(BlockId(blockId), is)).asIterator )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } finally { if (is != null) { is.close() } } + shuffleMetrics.remoteBytesRead += data.size + shuffleMetrics.remoteBlocksFetched += 1 + logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(e: Throwable): Unit = { From 074781d220f37fa3edaa22ecce7312d0ca22596a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 17 Sep 2014 03:11:11 +0900 Subject: [PATCH 4/4] Modified SuffleBlockFetcherIterator --- .../storage/ShuffleBlockFetcherIterator.scala | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6a6427c080d6..d868758a7f54 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,7 +17,6 @@ package org.apache.spark.storage -import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.ArrayBuffer @@ -112,18 +111,10 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - var is: InputStream = null - try { - is = data.inputStream() - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), - () => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), is)).asIterator - )) - } finally { - if (is != null) { - is.close() - } - } + results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( + blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator + )) shuffleMetrics.remoteBytesRead += data.size shuffleMetrics.remoteBlocksFetched += 1 logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))