Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
FileSegment fileSegment = pResolver.getFileSegment(blockId);
// if getBlockLocation returns null, close the channel
if (fileSegment == null) {
//ctx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface PathResolver {
/** Get the file segment in which the given block resides. */
FileSegment getBlockLocation(BlockId blockId);
FileSegment getFileSegment(BlockId blockId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.{BlockId, FileSegment}
import org.apache.spark.storage.{FileObjectId, BlockId, FileSegment}

private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {

Expand Down Expand Up @@ -53,7 +53,7 @@ private[spark] object ShuffleSender {
val localDirs = args.drop(2).map(new File(_))

val pResovler = new PathResolver {
override def getBlockLocation(blockId: BlockId): FileSegment = {
override def getFileSegment(blockId: BlockId): FileSegment = {
if (!blockId.isShuffle) {
throw new Exception("Block " + blockId + " is not a shuffle block")
}
Expand All @@ -63,7 +63,7 @@ private[spark] object ShuffleSender {
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.name)
new FileSegment(file, 0, file.length())
new FileSegment(FileObjectId(file), 0, file.length())
}
}
val sender = new ShuffleSender(port, pResovler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HashShuffleWriter[K, V](
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
val size = writer.objectSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class BlockManager(
// Actual storage of where blocks are kept
private var tachyonInitialized = false
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[storage] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
Expand All @@ -78,7 +78,7 @@ private[spark] class BlockManager(
private val nettyPort: Int = {
val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
}

val blockManagerId = BlockManagerId(
Expand Down Expand Up @@ -562,12 +562,10 @@ private[spark] class BlockManager(
*/
def getDiskWriter(
blockId: BlockId,
file: File,
objectId: ObjectId,
serializer: Serializer,
bufferSize: Int): BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
diskStore.getBlockObjectWriter(blockId, objectId, serializer, bufferSize)
}

/**
Expand Down Expand Up @@ -1035,6 +1033,7 @@ private[spark] class BlockManager(
}
connectionManager.stop()
shuffleBlockManager.stop()
diskStore.stop()
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
/**
* Returns the file segment of committed data that this Writer has written.
*/
def fileSegment(): FileSegment
def objectSegment(): ObjectSegment

/**
* Cumulative time spent performing blocking writes, in ns.
Expand All @@ -74,7 +74,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
objectId: FileObjectId,
serializer: Serializer,
bufferSize: Int,
compressStream: OutputStream => OutputStream,
Expand Down Expand Up @@ -102,6 +102,7 @@ private[spark] class DiskBlockObjectWriter(
}

/** The file channel, used for repositioning / truncating the file. */
private val file = objectId.file
private var channel: FileChannel = null
private var bs: OutputStream = null
private var fos: FileOutputStream = null
Expand Down Expand Up @@ -179,8 +180,8 @@ private[spark] class DiskBlockObjectWriter(
objOut.writeObject(value)
}

override def fileSegment(): FileSegment = {
new FileSegment(file, initialPosition, bytesWritten)
override def objectSegment(): ObjectSegment = {
new ObjectSegment(objectId, initialPosition, bytesWritten)
}

// Only valid if called after close()
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer

/**
* Abstract class to store blocks.
Expand Down Expand Up @@ -68,4 +69,5 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
def contains(blockId: BlockId): Boolean

def clear() { }

}
36 changes: 16 additions & 20 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.{Date, Random, UUID}

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.Utils

/**
Expand All @@ -35,7 +34,7 @@ import org.apache.spark.util.Utils
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
extends PathResolver with Logging {
extends Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)
Expand All @@ -45,21 +44,25 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
* having really large inodes at the top level. */
private val localDirs: Array[File] = createLocalDirs()
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null

addShutdownHook()

/**
* Returns the physical file segment in which the given BlockId is located.
* If the BlockId has been mapped to a specific FileSegment, that will be returned.
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/
def getBlockLocation(blockId: BlockId): FileSegment = {
def getFileSegment(blockId: BlockId): FileSegment = {
if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
val objectSegment = shuffleManager.getBlockLocation(
blockId.asInstanceOf[ShuffleBlockId])
val fileObjectId = FileObjectId.toFileObjectId(objectSegment.objectId)
fileObjectId match {
case Some(id: FileObjectId) =>
new FileSegment(id, objectSegment.offset, objectSegment.length)
case None =>
// Should not come here when we only have one diskStore
// Fix this when pluggable storage is implemented.
throw BlockException(blockId, "Block Not found")
}
} else {
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
new FileSegment(FileObjectId(file), 0, file.length())
}
}

Expand Down Expand Up @@ -92,7 +95,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getBlockLocation(blockId).file.exists()
// only for test
getFile(blockId.name).exists()
}

/** List all the blocks currently stored on disk by the disk manager. */
Expand Down Expand Up @@ -167,14 +171,6 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
}
}

if (shuffleSender != null) {
shuffleSender.stop()
}
}

private[storage] def startShuffleBlockSender(port: Int): Int = {
shuffleSender = new ShuffleSender(port, this)
logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}")
shuffleSender.port
}
}
128 changes: 120 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,32 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, RandomAccessFile}
import java.io._
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{HashMap, ArrayBuffer}

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.Stage
import java.util.concurrent.atomic.AtomicInteger
import scala.Some
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}

/**
* Stores BlockManager blocks on disk.
*/
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {
private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with PathResolver with Logging {

val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
private val nameToObjectId = new HashMap[String, ObjectId]
private var shuffleSender : ShuffleSender = null

override def getSize(blockId: BlockId): Long = {
diskManager.getBlockLocation(blockId).length
getFileSegment(blockId).length
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
Expand Down Expand Up @@ -92,7 +98,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
}

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val segment = diskManager.getBlockLocation(blockId)
val segment = getFileSegment(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel

try {
Expand Down Expand Up @@ -123,7 +129,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
}

override def remove(blockId: BlockId): Boolean = {
val fileSegment = diskManager.getBlockLocation(blockId)
val fileSegment = getFileSegment(blockId)
val file = fileSegment.file
if (file.exists() && file.length() == fileSegment.length) {
file.delete()
Expand All @@ -136,7 +142,113 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
}

override def contains(blockId: BlockId): Boolean = {
val file = diskManager.getBlockLocation(blockId).file
val file = getFileSegment(blockId).file
file.exists()
}

def getBlockObjectWriter(
blockId: BlockId,
objectId: ObjectId,
serializer: Serializer,
bufferSize: Int): BlockObjectWriter = {

val fileObjectId = FileObjectId.toFileObjectId(objectId)

fileObjectId match {
case Some(foid: FileObjectId) =>
val compressStream: OutputStream =>
OutputStream = blockManager.wrapForCompression(blockId, _)
val syncWrites = blockManager.conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, foid, serializer, bufferSize, compressStream, syncWrites)
case None =>
null
}

}

override def getFileSegment(blockId: BlockId): FileSegment = {
diskManager.getFileSegment(blockId)
}

def createTempBlock(): (TempBlockId, ObjectId) = {

val (blockId, file) = diskManager.createTempBlock()
val objectId = new FileObjectId(file)

(blockId, objectId)
}

def getInputStream(objectId: ObjectId): InputStream = {
val fileObjectId = FileObjectId.toFileObjectId(objectId)

fileObjectId match {
case Some(FileObjectId(file)) =>
new FileInputStream(file)
case None =>
null
}
}

def getOrNewObject(name: String): ObjectId = {

val objectId = nameToObjectId.get(name)

if (objectId.isEmpty) {
val file = diskManager.getFile(name)
val newObjectId = new FileObjectId(file)
nameToObjectId.put(name, newObjectId)
newObjectId
} else {
objectId.get
}
}

def getObjectSize(objectId: ObjectId): Long = {
val fileObjectId = FileObjectId.toFileObjectId(objectId)

fileObjectId match {
case Some(FileObjectId(file)) =>
file.length
case None =>
0
}
}

def isObjectExists(objectId: ObjectId): Boolean = {
val fileObjectId = FileObjectId.toFileObjectId(objectId)

fileObjectId match {
case Some(FileObjectId(file)) =>
file.exists
case None =>
false
}
}

def removeObject(id: ObjectId): Boolean = {
val fileObjectId = FileObjectId.toFileObjectId(id)

fileObjectId match {
case Some(FileObjectId(file)) =>
if (file != null && file.exists()) {
return file.delete()
}
case _ =>
false
}
false
}

private[storage] def startShuffleBlockSender(port: Int): Int = {
shuffleSender = new ShuffleSender(port, this)
logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}")
shuffleSender.port
}

/** stop shuffle sender. */
private[spark] def stop() {
if (shuffleSender != null) {
shuffleSender.stop()
}
}
}
Loading