diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index c0133e19c7f7..38149fd37128 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -42,7 +42,7 @@ class FileServerHandler extends SimpleChannelInboundHandler { @Override public void channelRead0(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); - FileSegment fileSegment = pResolver.getBlockLocation(blockId); + FileSegment fileSegment = pResolver.getFileSegment(blockId); // if getBlockLocation returns null, close the channel if (fileSegment == null) { //ctx.close(); diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 7ad8d03efbad..5a644800ab52 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -22,5 +22,5 @@ public interface PathResolver { /** Get the file segment in which the given block resides. */ - FileSegment getBlockLocation(BlockId blockId); + FileSegment getFileSegment(BlockId blockId); } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 7ef7aecc6a9f..8d9d5507597d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.Logging import org.apache.spark.util.Utils -import org.apache.spark.storage.{BlockId, FileSegment} +import org.apache.spark.storage.{FileObjectId, BlockId, FileSegment} private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -53,7 +53,7 @@ private[spark] object ShuffleSender { val localDirs = args.drop(2).map(new File(_)) val pResovler = new PathResolver { - override def getBlockLocation(blockId: BlockId): FileSegment = { + override def getFileSegment(blockId: BlockId): FileSegment = { if (!blockId.isShuffle) { throw new Exception("Block " + blockId + " is not a shuffle block") } @@ -63,7 +63,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - new FileSegment(file, 0, file.length()) + new FileSegment(FileObjectId(file), 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 9b78228519da..1f1badb4cf0f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -98,7 +98,7 @@ class HashShuffleWriter[K, V]( val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() - val size = writer.fileSegment().length + val size = writer.objectSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d2f7baf928b6..2f3b03d01d7f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -62,7 +62,7 @@ private[spark] class BlockManager( // Actual storage of where blocks are kept private var tachyonInitialized = false private[storage] val memoryStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore = new DiskStore(this, diskBlockManager) + private[spark] val diskStore = new DiskStore(this, diskBlockManager) private[storage] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") @@ -78,7 +78,7 @@ private[spark] class BlockManager( private val nettyPort: Int = { val useNetty = conf.getBoolean("spark.shuffle.use.netty", false) val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0) - if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 + if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 } val blockManagerId = BlockManagerId( @@ -562,12 +562,10 @@ private[spark] class BlockManager( */ def getDiskWriter( blockId: BlockId, - file: File, + objectId: ObjectId, serializer: Serializer, bufferSize: Int): BlockObjectWriter = { - val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val syncWrites = conf.getBoolean("spark.shuffle.sync", false) - new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites) + diskStore.getBlockObjectWriter(blockId, objectId, serializer, bufferSize) } /** @@ -1035,6 +1033,7 @@ private[spark] class BlockManager( } connectionManager.stop() shuffleBlockManager.stop() + diskStore.stop() diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index a2687e6be4e3..3672721b2cb0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -58,7 +58,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { /** * Returns the file segment of committed data that this Writer has written. */ - def fileSegment(): FileSegment + def objectSegment(): ObjectSegment /** * Cumulative time spent performing blocking writes, in ns. @@ -74,7 +74,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ private[spark] class DiskBlockObjectWriter( blockId: BlockId, - file: File, + objectId: FileObjectId, serializer: Serializer, bufferSize: Int, compressStream: OutputStream => OutputStream, @@ -102,6 +102,7 @@ private[spark] class DiskBlockObjectWriter( } /** The file channel, used for repositioning / truncating the file. */ + private val file = objectId.file private var channel: FileChannel = null private var bs: OutputStream = null private var fos: FileOutputStream = null @@ -179,8 +180,8 @@ private[spark] class DiskBlockObjectWriter( objOut.writeObject(value) } - override def fileSegment(): FileSegment = { - new FileSegment(file, initialPosition, bytesWritten) + override def objectSegment(): ObjectSegment = { + new ObjectSegment(objectId, initialPosition, bytesWritten) } // Only valid if called after close() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index b9b53b1a2f11..4215056caf1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging +import org.apache.spark.serializer.Serializer /** * Abstract class to store blocks. @@ -68,4 +69,5 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends def contains(blockId: BlockId): Boolean def clear() { } + } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2ec46d416f37..0d62dd283ce1 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -23,7 +23,6 @@ import java.util.{Date, Random, UUID} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils /** @@ -35,7 +34,7 @@ import org.apache.spark.util.Utils * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) - extends PathResolver with Logging { + extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64) @@ -45,21 +44,25 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - private var shuffleSender : ShuffleSender = null addShutdownHook() - /** - * Returns the physical file segment in which the given BlockId is located. - * If the BlockId has been mapped to a specific FileSegment, that will be returned. - * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. - */ - def getBlockLocation(blockId: BlockId): FileSegment = { + def getFileSegment(blockId: BlockId): FileSegment = { if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) { - shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) + val objectSegment = shuffleManager.getBlockLocation( + blockId.asInstanceOf[ShuffleBlockId]) + val fileObjectId = FileObjectId.toFileObjectId(objectSegment.objectId) + fileObjectId match { + case Some(id: FileObjectId) => + new FileSegment(id, objectSegment.offset, objectSegment.length) + case None => + // Should not come here when we only have one diskStore + // Fix this when pluggable storage is implemented. + throw BlockException(blockId, "Block Not found") + } } else { val file = getFile(blockId.name) - new FileSegment(file, 0, file.length()) + new FileSegment(FileObjectId(file), 0, file.length()) } } @@ -92,7 +95,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getBlockLocation(blockId).file.exists() + // only for test + getFile(blockId.name).exists() } /** List all the blocks currently stored on disk by the disk manager. */ @@ -167,14 +171,6 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } } - if (shuffleSender != null) { - shuffleSender.stop() - } } - private[storage] def startShuffleBlockSender(port: Int): Int = { - shuffleSender = new ShuffleSender(port, this) - logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}") - shuffleSender.port - } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index ebff0cb5ba15..845e1fd91fc9 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,26 +17,32 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{HashMap, ArrayBuffer} import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils +import org.apache.spark.scheduler.Stage +import java.util.concurrent.atomic.AtomicInteger +import scala.Some +import org.apache.spark.network.netty.{PathResolver, ShuffleSender} /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) - extends BlockStore(blockManager) with Logging { +private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) + extends BlockStore(blockManager) with PathResolver with Logging { val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) + private val nameToObjectId = new HashMap[String, ObjectId] + private var shuffleSender : ShuffleSender = null override def getSize(blockId: BlockId): Long = { - diskManager.getBlockLocation(blockId).length + getFileSegment(blockId).length } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { @@ -92,7 +98,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage } override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val segment = diskManager.getBlockLocation(blockId) + val segment = getFileSegment(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel try { @@ -123,7 +129,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage } override def remove(blockId: BlockId): Boolean = { - val fileSegment = diskManager.getBlockLocation(blockId) + val fileSegment = getFileSegment(blockId) val file = fileSegment.file if (file.exists() && file.length() == fileSegment.length) { file.delete() @@ -136,7 +142,113 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage } override def contains(blockId: BlockId): Boolean = { - val file = diskManager.getBlockLocation(blockId).file + val file = getFileSegment(blockId).file file.exists() } + + def getBlockObjectWriter( + blockId: BlockId, + objectId: ObjectId, + serializer: Serializer, + bufferSize: Int): BlockObjectWriter = { + + val fileObjectId = FileObjectId.toFileObjectId(objectId) + + fileObjectId match { + case Some(foid: FileObjectId) => + val compressStream: OutputStream => + OutputStream = blockManager.wrapForCompression(blockId, _) + val syncWrites = blockManager.conf.getBoolean("spark.shuffle.sync", false) + new DiskBlockObjectWriter(blockId, foid, serializer, bufferSize, compressStream, syncWrites) + case None => + null + } + + } + + override def getFileSegment(blockId: BlockId): FileSegment = { + diskManager.getFileSegment(blockId) + } + + def createTempBlock(): (TempBlockId, ObjectId) = { + + val (blockId, file) = diskManager.createTempBlock() + val objectId = new FileObjectId(file) + + (blockId, objectId) + } + + def getInputStream(objectId: ObjectId): InputStream = { + val fileObjectId = FileObjectId.toFileObjectId(objectId) + + fileObjectId match { + case Some(FileObjectId(file)) => + new FileInputStream(file) + case None => + null + } + } + + def getOrNewObject(name: String): ObjectId = { + + val objectId = nameToObjectId.get(name) + + if (objectId.isEmpty) { + val file = diskManager.getFile(name) + val newObjectId = new FileObjectId(file) + nameToObjectId.put(name, newObjectId) + newObjectId + } else { + objectId.get + } + } + + def getObjectSize(objectId: ObjectId): Long = { + val fileObjectId = FileObjectId.toFileObjectId(objectId) + + fileObjectId match { + case Some(FileObjectId(file)) => + file.length + case None => + 0 + } + } + + def isObjectExists(objectId: ObjectId): Boolean = { + val fileObjectId = FileObjectId.toFileObjectId(objectId) + + fileObjectId match { + case Some(FileObjectId(file)) => + file.exists + case None => + false + } + } + + def removeObject(id: ObjectId): Boolean = { + val fileObjectId = FileObjectId.toFileObjectId(id) + + fileObjectId match { + case Some(FileObjectId(file)) => + if (file != null && file.exists()) { + return file.delete() + } + case _ => + false + } + false + } + + private[storage] def startShuffleBlockSender(port: Int): Int = { + shuffleSender = new ShuffleSender(port, this) + logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}") + shuffleSender.port + } + + /** stop shuffle sender. */ + private[spark] def stop() { + if (shuffleSender != null) { + shuffleSender.stop() + } + } } diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala index 132502b75f8c..18f8b94ba399 100644 --- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -23,6 +23,9 @@ import java.io.File * References a particular segment of a file (potentially the entire file), * based off an offset and a length. */ -private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) { - override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) +private[spark] class FileSegment(override val objectId: FileObjectId, offset: Long, length: Long) + extends ObjectSegment(objectId, offset, length) { + override def toString = "(name=%s, offset=%d, length=%d)" + .format(objectId.file.getName, offset, length) + def file = objectId.file } diff --git a/core/src/main/scala/org/apache/spark/storage/ObjectId.scala b/core/src/main/scala/org/apache/spark/storage/ObjectId.scala new file mode 100644 index 000000000000..e03f02f51393 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ObjectId.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File + +/** + * Represent an external object written by specific BlockStore + */ + +abstract class ObjectId { + def id: String + override def toString = "(block object %d)".format(id) + override def hashCode = id.hashCode + override def equals(other: Any): Boolean = other match { + case o: ObjectId => getClass == o.getClass && id.equals(o.id) + case _ => false + } +} + +case class FileObjectId(file: File) extends ObjectId { + def id = file.getName + override def toString = "(File block object %d)".format(id) +} + +object FileObjectId { + def toFileObjectId(id: ObjectId): Option[FileObjectId] = { + if (id.isInstanceOf[FileObjectId]) { + Some(id.asInstanceOf[FileObjectId]) + } else { + None + } + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/ObjectSegment.scala b/core/src/main/scala/org/apache/spark/storage/ObjectSegment.scala new file mode 100644 index 000000000000..ac4d892a79d5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ObjectSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File + +/** + * References a particular segment of a Object (potentially the entire object), + * based off an offset and a length. + */ +private[spark] class ObjectSegment(val objectId: ObjectId, val offset: Long, val length: Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(objectId, offset, length) +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 35910e552fe8..8819f618fac7 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -104,26 +104,27 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) } } else { + val diskStore = blockManager.diskStore Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val blockFile = blockManager.diskBlockManager.getFile(blockId) + val objectId = diskStore.getOrNewObject(blockId.name) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. - if (blockFile.exists) { - if (blockFile.delete()) { - logInfo(s"Removed existing shuffle file $blockFile") + if (diskStore.isObjectExists(objectId)) { + if (diskStore.removeObject(objectId)) { + logInfo(s"Removed existing shuffle object $objectId") } else { - logWarning(s"Failed to remove existing shuffle file $blockFile") + logWarning(s"Failed to remove existing shuffle object $objectId") } } - blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) + blockManager.getDiskWriter(blockId, objectId, serializer, bufferSize) } } override def releaseWriters(success: Boolean) { if (consolidateShuffleFiles) { if (success) { - val offsets = writers.map(_.fileSegment().offset) + val offsets = writers.map(_.objectSegment().offset) fileGroup.recordMapOutput(mapId, offsets) } recycleFileGroup(fileGroup) @@ -139,11 +140,11 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private def newFileGroup(): ShuffleFileGroup = { val fileId = shuffleState.nextFileId.getAndIncrement() - val files = Array.tabulate[File](numBuckets) { bucketId => + val objects = Array.tabulate[ObjectId](numBuckets) { bucketId => val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.diskBlockManager.getFile(filename) + blockManager.diskStore.getOrNewObject(filename) } - val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files) + val fileGroup = new ShuffleFileGroup(blockManager, fileId, shuffleId, objects) shuffleState.allFileGroups.add(fileGroup) fileGroup } @@ -159,11 +160,11 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { * This function should only be called if shuffle file consolidation is enabled, as it is * an error condition if we don't find the expected block. */ - def getBlockLocation(id: ShuffleBlockId): FileSegment = { + def getBlockLocation(id: ShuffleBlockId): ObjectSegment = { // Search all file groups associated with this shuffle. val shuffleState = shuffleStates(id.shuffleId) for (fileGroup <- shuffleState.allFileGroups) { - val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) + val segment = fileGroup.getObjectSegmentFor(id.mapId, id.reduceId) if (segment.isDefined) { return segment.get } } throw new IllegalStateException("Failed to find shuffle block: " + id) @@ -183,8 +184,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { shuffleStates.get(shuffleId) match { case Some(state) => if (consolidateShuffleFiles) { - for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { - file.delete() + for (fileGroup <- state.allFileGroups; objectId <- fileGroup.objects) { + blockManager.diskStore.removeObject(objectId) } } else { for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { @@ -219,7 +220,11 @@ object ShuffleBlockManager { * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. */ - private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + private class ShuffleFileGroup( + val blockManager: BlockManager, + val shuffleId: Int, + val fileId: Int, + val objects: Array[ObjectId]) { /** * Stores the absolute index of each mapId in the files of this group. For instance, * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. @@ -232,13 +237,13 @@ object ShuffleBlockManager { * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * reducer. */ - private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](objects.length) { new PrimitiveVector[Long]() } def numBlocks = mapIdToIndex.size - def apply(bucketId: Int) = files(bucketId) + def apply(bucketId: Int) = objects(bucketId) def recordMapOutput(mapId: Int, offsets: Array[Long]) { mapIdToIndex(mapId) = numBlocks @@ -247,9 +252,9 @@ object ShuffleBlockManager { } } - /** Returns the FileSegment associated with the given map task, or None if no entry exists. */ - def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { - val file = files(reducerId) + /** Returns the ObjectSegment associated with the given map task, or None if no entry exists. */ + def getObjectSegmentFor(mapId: Int, reducerId: Int): Option[ObjectSegment] = { + val blockObject = objects(reducerId) val blockOffsets = blockOffsetsByReducer(reducerId) val index = mapIdToIndex.getOrElse(mapId, -1) if (index >= 0) { @@ -258,10 +263,10 @@ object ShuffleBlockManager { if (index + 1 < numBlocks) { blockOffsets(index + 1) - offset } else { - file.length() - offset + blockManager.diskStore.getObjectSize(blockObject) - offset } assert(length >= 0) - Some(new FileSegment(file, offset, length)) + Some(new ObjectSegment(blockObject, offset, length)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd3160f..b2d978ab7811 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.serializer.Serializer -import org.apache.spark.storage.{BlockId, BlockManager} +import org.apache.spark.storage.{ObjectId, BlockId, BlockManager} /** * :: DeveloperApi :: @@ -71,7 +71,7 @@ class ExternalAppendOnlyMap[K, V, C]( private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf - private val diskBlockManager = blockManager.diskBlockManager + private val diskStore = blockManager.diskStore // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { @@ -156,8 +156,8 @@ class ExternalAppendOnlyMap[K, V, C]( spillCount += 1 logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)" .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) - val (blockId, file) = diskBlockManager.createTempBlock() - var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) + val (blockId, objectId) = diskStore.createTempBlock() + var writer = blockManager.getDiskWriter(blockId, objectId, serializer, fileBufferSize) var objectsWritten = 0 // List of batch sizes (bytes) in the order they are written to disk @@ -182,7 +182,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() writer.close() - writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) + writer = blockManager.getDiskWriter(blockId, objectId, serializer, fileBufferSize) } } if (objectsWritten > 0) { @@ -194,7 +194,7 @@ class ExternalAppendOnlyMap[K, V, C]( } currentMap = new SizeTrackingAppendOnlyMap[K, C] - spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) + spilledMaps.append(new DiskMapIterator(objectId, blockId, batchSizes)) // Reset the amount of shuffle memory used by this map in the global pool val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap @@ -348,10 +348,10 @@ class ExternalAppendOnlyMap[K, V, C]( /** * An iterator that returns (K, C) pairs in sorted order from an on-disk map */ - private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) + private class DiskMapIterator(objectId: ObjectId, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { - private val fileStream = new FileInputStream(file) - private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + private val objectStream = blockManager.diskStore.getInputStream(objectId) + private val bufferedStream = new BufferedInputStream(objectStream, fileBufferSize) // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams @@ -416,7 +416,7 @@ class ExternalAppendOnlyMap[K, V, C]( // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { deserializeStream.close() - file.delete() + blockManager.diskStore.removeObject(objectId) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aaa771404973..6b2d1bb84619 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -108,13 +108,13 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before val blockId0 = new ShuffleBlockId(1, 2, 3) val newFile = diskBlockManager.getFile(filename) writeToFile(newFile, 15) - shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) + shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(FileObjectId(newFile), 0, 15) assertSegmentEquals(blockId0, filename, 0, 15) val blockId1 = new ShuffleBlockId(1, 2, 4) val newFile2 = diskBlockManager.getFile(filename) writeToFile(newFile2, 12) - shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) + shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(FileObjectId(newFile), 15, 12) assertSegmentEquals(blockId1, filename, 15, 12) assert(newFile === newFile2) @@ -122,7 +122,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before } def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { - val segment = diskBlockManager.getBlockLocation(blockId) + val segment = diskBlockManager.getFileSegment(blockId) assert(segment.file.getName === filename) assert(segment.offset === offset) assert(segment.length === length) diff --git a/core/src/test/scala/org/apache/spark/storage/ObjectIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/ObjectIdSuite.scala new file mode 100644 index 000000000000..92f5523dc9c6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/ObjectIdSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import java.io.File + +class ObjectIdSuite extends FunSuite { + def assertSame(id1: ObjectId, id2: ObjectId) { + assert(id1.id === id2.id) + assert(id1.hashCode === id2.hashCode) + assert(id1 === id2) + } + + def assertDifferent(id1: ObjectId, id2: ObjectId) { + assert(id1.id != id2.id) + assert(id1.hashCode != id2.hashCode) + assert(id1 != id2) + } + + test("file object id") { + val file = new File("/tmpfile") + val id = FileObjectId(file) + assertSame(id, FileObjectId(new File("/tmpfile"))) + assertDifferent(id, FileObjectId(new File("/tmpfile2"))) + assert(id.id === file.getName) + assert(id.file === file) + } + + test("as instance") { + val file = new File("/tmpfile") + val fid = FileObjectId(file) + val oid: ObjectId = fid + val fid1 = FileObjectId.toFileObjectId(fid).get + assertSame(fid, fid1) + val fid2 = FileObjectId.toFileObjectId(oid).get + assertSame(fid, fid2) + } +} diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 8e8c35615a71..8c07e02a75fb 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -63,7 +63,7 @@ object StoragePerfTester { } writers.map {w => w.commit() - total.addAndGet(w.fileSegment().length) + total.addAndGet(w.objectSegment().length) w.close() }