diff --git a/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java b/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java new file mode 100644 index 000000000000..339174484036 --- /dev/null +++ b/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + + +import com.google.common.base.Objects; +import com.google.common.io.ByteStreams; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.LimitedInputStream; +import org.apache.spark.network.util.TransportConf; +import org.apache.spark.storage.ExternalBlockManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.Math.toIntExact; + +/** + * A {@link ManagedBuffer} backed by a segment in a file. + */ +public final class AlluxioManagedBuffer extends ManagedBuffer { + private static final Logger logger = LoggerFactory.getLogger(TransportContext.class); + + private final TransportConf conf; + private final Path file; + private final long offset; + private final long length; + private final ExternalBlockManager blockManager; + + public AlluxioManagedBuffer(TransportConf conf, Path file, long offset, long length, ExternalBlockManager blockManager) { + this.conf = conf; + this.file = file; + this.offset = offset; + this.length = length; + this.blockManager = blockManager; + } + + @Override + public long size() { + return length; + } + + @Override + public ByteBuffer nioByteBuffer() throws IOException { + FileSystem fs = blockManager.fs(); + if (!fs.exists(file)) { + return ByteBuffer.wrap(new byte[0]); + } else { + long size = fs.getFileStatus(file).getLen(); + if (size == 0) { + return ByteBuffer.wrap(new byte[0]); + } else { + InputStream input = fs.open(file); + try { + byte[] buffer = new byte[toIntExact(size)]; + ByteStreams.readFully(input, buffer); + return ByteBuffer.wrap(buffer,toIntExact(offset),toIntExact(length)); + } catch (IOException e){ + logger.info("Test-log: Failed to get bytes of block $blockId from Alluxio", e); + return ByteBuffer.wrap(new byte[0]); + } finally { + input.close(); + } + } + } + } + + @Override + public InputStream createInputStream() throws IOException { + + logger.info("Test-log: use function createInputStream"); + InputStream is = blockManager.createInputStream(file).get(); + logger.info("Test-log: create inputStream successfully"); + try { + + ByteStreams.skipFully(is, offset); + logger.info("Test-log: read data from inputstream from offset " + offset + " and size is " + length ); + return new LimitedInputStream(is, length); + } catch (IOException e) { + try { + if (is != null) { + long size = blockManager.fs().getFileStatus(file).getLen(); + throw new IOException("Error in reading " + this + " (actual file length " + size + ")", + e); + } + } catch (IOException ignored) { + // ignore + } finally { + JavaUtils.closeQuietly(is); + } + throw new IOException("Error in opening " + this, e); + } catch (RuntimeException e) { + JavaUtils.closeQuietly(is); + throw e; + } + } + + @Override + public ManagedBuffer retain() { + return this; + } + + @Override + public ManagedBuffer release() { + return this; + } + + @Override + public Object convertToNetty() throws IOException { + return new Object(); + } + + public Path getFile() { return file; } + + public long getOffset() { return offset; } + + public long getLength() { return length; } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("file", file) + .add("offset", offset) + .add("length", length) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 323a5d3c5283..8df8c16e26c9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -90,6 +90,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private FileSegment[] partitionWriterSegments; @Nullable private MapStatus mapStatus; private long[] partitionLengths; + private boolean useAlluxio; /** * Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -108,6 +109,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); + this.useAlluxio = conf.getBoolean("spark.alluxio.shuffle.enabled",false); this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; @@ -162,6 +164,11 @@ public void write(Iterator> records) throws IOException { try { partitionLengths = writePartitionedFile(tmp); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + if(useAlluxio) { + logger.info("shuffle block id " + new ShuffleDataBlockId(shuffleId, mapId, 0).name() + "\nfile path is " + + output.getPath() + "\nfile name is " + output.getName() + "file size is " + output.length()); + blockManager.externalBlockStore().externalBlockManager().get().putFile(shuffleId, new ShuffleDataBlockId(shuffleId, mapId, 0), output); + } } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03e91cdd310e..9037c279c078 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -503,6 +503,9 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) + if (_conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + _env.blockManager.master.init() + } if (_conf.getBoolean("spark.ui.reverseProxy", false)) { System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 375aeb0c3466..891d6b5e72de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -368,8 +368,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logInfo(s"Disabling executor $executorId.") - scheduler.executorLost(executorId, LossReasonPending) + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + scheduler.executorLost(executorId, ExecutorKilled) + } else { + scheduler.executorLost(executorId, LossReasonPending) + } } shouldDisable diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d3f1c7ec1bbe..d63f885fb5cd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -181,6 +181,10 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + blockManager.externalBlockStore.externalBlockManager.get.putFile(shuffleId, + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), indexFile) + } } } } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 274399b9cc1f..fa76824d68c8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -21,7 +21,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} -import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} import org.apache.spark.util.Utils import org.apache.spark.util.collection.ExternalSorter @@ -70,6 +70,13 @@ private[spark] class SortShuffleWriter[K, V, C]( val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + if (output.length() != 0) { + blockManager.externalBlockStore.externalBlockManager.get.putFile( + dep.shuffleId, ShuffleDataBlockId(dep.shuffleId, mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID), output) + } + } mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala new file mode 100644 index 000000000000..b1668e3fd0d2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, FileInputStream, InputStream} +import java.nio.ByteBuffer + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ByteBufferInputStream, Utils} + +private[spark] class AlluxioBlockManager extends ExternalBlockManager with Logging { + + private var chroot: Path = _ + private var subDirs: Array[Array[Path]] = _ + private var hdfsDirs: Array[Array[Path]] = _ + private var hdfsroot: Path = _ + private var usehdfs: Boolean = _ + + override def toString: String = "ExternalBlockStore-Alluxio" + + override def init(blockManager: BlockManager): Unit = { + super.init(blockManager) + + val conf = blockManager.conf + + val masterUrl = conf.get(ExternalBlockStore.MASTER_URL, "alluxio://localhost:19998") + val hdfsUrl = conf.get("spark.alluxio.hdfs.nameservice", "viewfs://nsX") + val storeDir = conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_alluxio") + val folderName = conf.get(ExternalBlockStore.FOLD_NAME) + val subDirsPerAlluxio = conf.getInt( + "spark.externalBlockStore.subDirectories", + ExternalBlockStore.SUB_DIRS_PER_DIR.toInt) + + val master = new Path(masterUrl) + chroot = new Path(master, s"$storeDir/$folderName/" + conf.getAppId ) + fs = master.getFileSystem(new Configuration) + + val hdfsMaster = new Path(hdfsUrl) + hdfsFs = hdfsMaster.getFileSystem(new Configuration) + hdfsroot = new Path(hdfsMaster, s"$storeDir/$folderName/" + conf.getAppId) + + subDirs = Array.fill(subDirsPerAlluxio)(new Array[Path](subDirsPerAlluxio)) + hdfsDirs = Array.fill(subDirsPerAlluxio)(new Array[Path](subDirsPerAlluxio)) + + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = { + val path = getFile(blockId) + val output = fs.create(path, true) + try { + output.write(bytes.array()) + } catch { + case NonFatal(e) => + logWarning(s"Failed to put values of block $blockId into Alluxio", e) + } finally { + try { + output.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close output when put bytes to Alluxion", e) + } + } + } + + override def putValues(blockId: BlockId, values: Iterator[_]): Unit = { + val output = fs.create(getFile(blockId), true) + try { + blockManager.serializerManager.dataSerializeStream(blockId, output, values) + } catch { + case NonFatal(e) => + logWarning(s"Failed to put values of block $blockId into Alluxio", e) + } finally { + try { + output.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close output when put bytes to Alluxion", e) + } + } + } + + override def putFile(shuffleId : Int, blockId: BlockId, file : File): Unit = { + + val in = new FileInputStream(file) + val bytes = new Array[Byte](file.length().toInt) + in.read(bytes) + in.close() + putBytes(blockId, ByteBuffer.wrap(bytes)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val path = getFile(blockId) + if (!fs.exists(path)) { + None + } else { + val size = fs.getFileStatus(path).getLen + if (size == 0) { + None + } else { + val input = fs.open(path) + var flag = true + try { + val buffer = new Array[Byte](size.toInt) + ByteStreams.readFully(input, buffer) + Some(ByteBuffer.wrap(buffer)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to get bytes of block $blockId from Alluxio", e) + flag = false + getBytesFromHdfs(blockId) + } finally { + if (flag) { + try { + input.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close input when get bytes to Alluxion", e) + } + } + } + } + } + } + + def getBytesFromHdfs(blockId: BlockId): Option[ByteBuffer] = { + usehdfs = true + val path = changePathToHDFS(getFile(blockId)) + if (!hdfsFs.exists(path)) { + None + } else { + val size = hdfsFs.getFileStatus(path).getLen + if (size == 0) { + logInfo("Test-log path size is " + size ) + None + } else { + val input = hdfsFs.open(path) + try { + val buffer = new Array[Byte](size.toInt) + ByteStreams.readFully(input, buffer) + Some(ByteBuffer.wrap(buffer)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to get bytes of block $blockId from HDFS", e) + None + } finally { + try { + input.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close input when get bytes to Alluxion", e) + } + } + } + } + } + + override def createInputStream(path: Path): Option[InputStream] = { + if (usehdfs) { + createInputStreamFromHDFS(path) + } else { + createInputStreamFromAlluxio(path) + } + } + + def createInputStreamFromAlluxio(path: Path): Option[InputStream] = { + + if (!fs.exists(path)) { + None + } else { + val size = fs.getFileStatus(path).getLen + if (size == 0) { + None + } else { + try { + val input = fs.open(path) + Some(input) + } catch { + case NonFatal(e) => + logInfo(s"Failed to createInputStream $path from Alluxio", e) + None + } + } + } + } + + def createInputStreamFromHDFS(path: Path): Option[InputStream] = { + val hdfsPath = changePathToHDFS(path) + if (!hdfsFs.exists(hdfsPath)) { + None + } else { + val size = hdfsFs.getFileStatus(hdfsPath).getLen + if (size == 0) { + None + } else { + try { + val input = hdfsFs.open(hdfsPath) + Some(input) + } catch { + case NonFatal(e) => + logInfo(s"Failed to createInputStream $hdfsPath from HDFS", e) + None + } + } + } + } + + def changePathToHDFS(path: Path): Path = { + val pathName = path.getName + getFileFromHDFS(BlockId(pathName)) + } + + override def getSize(path: Path): Long = { + val size = if (!fs.exists(path)) { + fs.getFileStatus(path).getLen + } else 0L + size + } + + override def getValues(blockId: BlockId): Option[Iterator[_]] = { + val ct = implicitly[ClassTag[Iterator[_]]] + val bytes: Option[ByteBuffer] = getBytes(blockId) + bytes.map(bs => + // alluxio.hadoop.HdfsFileInputStream#available unsupport! + // blockManager.dataDeserialize(blockId, input) + blockManager.serializerManager.dataDeserializeStream(blockId, + inputStream = new ByteBufferInputStream(bs))(ct) + ) + } + + override def getSize(blockId: BlockId): Long = + fs.getFileStatus(getFile(blockId)).getLen + + override def blockExists(blockId: BlockId): Boolean = + fs.exists(getFile(blockId)) + + override def removeBlock(blockId: BlockId): Boolean = + fs.delete(getFile(blockId), false) + + override def shutdown(): Unit = { + hdfsFs.close() + } + + override def getFile(blockId: BlockId): Path = getFile(blockId.name) + + def getFile(filename: String): Path = { + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % subDirs.length + val subDirId = hash % subDirs.length + + val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val path = new Path(chroot, s"$dirId/" + subDirId.toString) + subDirs(dirId)(subDirId) = path + hdfsDirs(dirId)(subDirId) = new Path(hdfsroot, s"$dirId/" + subDirId.toString) + path + } + } + new Path(subDir, filename) + } + + def getFileFromHDFS(blockId: BlockId): Path = { + val hash = Utils.nonNegativeHash(blockId.name) + val dirId = hash % subDirs.length + val subDirId = hash % subDirs.length + + val subDir = hdfsDirs(dirId)(subDirId) + + new Path(subDir, blockId.name) + } + +} diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala new file mode 100644 index 000000000000..7c2ac2e6824e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + +private[spark] class AlluxioBlockManagerMaster extends Logging { + + private var chroot: Path = _ + private var fs : FileSystem = _ + + override def toString: String = "ExternalBlockStore-Alluxio" + + def init(conf: SparkConf): Unit = { + + val masterUrl = conf.get(ExternalBlockStore.MASTER_URL, "alluxio://localhost:19998") + val storeDir = conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_alluxio") + val subDirsPerAlluxio = conf.getInt( + "spark.externalBlockStore.subDirectories", + ExternalBlockStore.SUB_DIRS_PER_DIR.toInt) + val folderName = conf.get(ExternalBlockStore.FOLD_NAME) + + val master = new Path(masterUrl) + chroot = new Path(master, s"$storeDir/$folderName/" + conf.getAppId ) + fs = master.getFileSystem(new Configuration) + if (!fs.exists(chroot)) { + fs.mkdirs(chroot) + } + mkdir(subDirsPerAlluxio) + } + + def delete(): Unit = { + try { + if (fs.exists(chroot)) { + fs.delete(chroot, true) + } + } catch { + case NonFatal(e) => + logWarning(s"$chroot has been deleted", e) + } finally + fs.close() + } + + def mkdir(n: Int): Unit = { + for (i <- 0 until n) { + val path = new Path(chroot, i.toString + s"/$i") + if (!fs.exists(path)) { + fs.mkdirs(path) + } + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5cd21e31c955..99b6249a16b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -149,11 +149,16 @@ private[spark] class BlockManager( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // Actual storage of where blocks are kept + private var externalBlockStoreInitialized = false private[spark] val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager) memoryManager.setMemoryStore(memoryStore) + private[spark] lazy val externalBlockStore: ExternalBlockStore = { + externalBlockStoreInitialized = true + new ExternalBlockStore(this) + } // Note: depending on the memory manager, `maxMemory` may actually vary over time. // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxMemory` can ever possibly reach. We may need diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index d24421b96277..90aaa37877ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -35,6 +35,11 @@ class BlockManagerMaster( extends Logging { val timeout = RpcUtils.askRpcTimeout(conf) + lazy val alluxioBlockManagerMaster = new AlluxioBlockManagerMaster + + def init(): Unit = { + alluxioBlockManagerMaster.init(conf) + } /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { @@ -235,6 +240,9 @@ class BlockManagerMaster( if (driverEndpoint != null && isDriver) { tell(StopBlockManagerMaster) driverEndpoint = null + if (conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + alluxioBlockManagerMaster.delete() + } logInfo("BlockManagerMaster stopped") } } diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala new file mode 100644 index 000000000000..7dee03e71d8c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, InputStream} +import java.nio.ByteBuffer + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.util.ByteBufferInputStream + +/* + * An abstract class that the concrete external block manager has to inherit. + * The class has to have a no-argument constructor, and will be initialized by init, + * which is invoked by ExternalBlockStore. The main input parameter is blockId for all + * the methods, which is the unique identifier for Block in one Spark application. + * + * The underlying external block manager should avoid any name space conflicts among multiple + * Spark applications. For example, creating different directory for different applications + * by randomUUID + * + */ +private[spark] abstract class ExternalBlockManager { + + protected var blockManager: BlockManager = _ + protected var serializerManager : SerializerManager = _ + + var fs: FileSystem = _ + var hdfsFs: FileSystem = _ + + + override def toString: String = {"External Block Store"} + + /* + * Initialize a concrete block manager implementation. Subclass should initialize its internal + * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore + * right after the class is constructed. The function should throw IOException on failure + * + * @throws java.io.IOException if there is any file system failure during the initialization. + */ + def init(blockManager: BlockManager): Unit = { + this.blockManager = blockManager + this.serializerManager = blockManager.serializerManager + } + + /* + * Drop the block from underlying external block store, if it exists.. + * @return true on successfully removing the block + * false if the block could not be removed as it was not found + * + * @throws java.io.IOException if there is any file system failure in removing the block. + */ + def removeBlock(blockId: BlockId): Boolean + + /* + * Used by BlockManager to check the existence of the block in the underlying external + * block store. + * @return true if the block exists. + * false if the block does not exists. + * + * @throws java.io.IOException if there is any file system failure in checking + * the block existence. + */ + def blockExists(blockId: BlockId): Boolean + + /* + * Put the given block to the underlying external block store. Note that in normal case, + * putting a block should never fail unless something wrong happens to the underlying + * external block store, e.g., file system failure, etc. In this case, IOException + * should be thrown. + * + * @throws java.io.IOException if there is any file system failure in putting the block. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit + + def putValues(blockId: BlockId, values: Iterator[_]): Unit = { + val bytes = serializerManager.dataSerialize(blockId, values) + putBytes(blockId, bytes.toByteBuffer) + } + + def putFile(blockId: BlockId, file : File): Unit = {} + def putFile(shuffleId: Int, blockId: BlockId, file : File): Unit = {} + /* + * Retrieve the block bytes. + * @return Some(ByteBuffer) if the block bytes is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ + def getBytes(blockId: BlockId): Option[ByteBuffer] + + /* + * Retrieve the block data. + * @return Some(Iterator[Any]) if the block data is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ + def getValues(blockId: BlockId): Option[Iterator[_]] = { + val ct = implicitly[ClassTag[Iterator[_]]] + getBytes(blockId).map(buffer => serializerManager.dataDeserializeStream(blockId, + new ByteBufferInputStream(buffer))(ct)) + } + + def createInputStream(path: Path): Option[InputStream] + + def getSize(path: Path): Long = 0L + + /* + * Get the size of the block saved in the underlying external block store, + * which is saved before by putBytes. + * @return size of the block + * 0 if the block does not exist + * + * @throws java.io.IOException if there is any file system failure in getting the block size. + */ + def getSize(blockId: BlockId): Long + + /* + * Clean up any information persisted in the underlying external block store, + * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore + * during system shutdown. + * + */ + def shutdown() + + def delete(): Unit = {} + + def getFile(blockId: BlockId): Path +} diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala new file mode 100644 index 000000000000..5379a48898ac --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ShutdownHookManager, Utils} + +/* + * Stores BlockManager blocks on ExternalBlockStore. + * We capture any potential exception from underlying implementation + * and return with the expected failure value + */ +private[spark] class ExternalBlockStore(blockManager: BlockManager) extends Logging { + + lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager() + + logInfo("ExternalBlockStore started") + + // Create concrete block manager and fall back to Tachyon by default for backward compatibility. + private def createBlkManager(): Option[ExternalBlockManager] = { + val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME) + .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME) + + try { + val instance = Utils.classForName(clsName) + .newInstance() + .asInstanceOf[ExternalBlockManager] + instance.init(blockManager) + ShutdownHookManager.addShutdownHook { () => + logDebug("Shutdown hook called") + externalBlockManager.map(_.shutdown()) + } + Some(instance) + } catch { + case NonFatal(t) => + logError("Cannot initialize external block store", t) + None + } + } +} + +private[spark] object ExternalBlockStore extends Logging { + val MAX_DIR_CREATION_ATTEMPTS = 10 + val SUB_DIRS_PER_DIR = "64" + val BASE_DIR = "spark.externalBlockStore.baseDir" + val FOLD_NAME = "spark.externalBlockStore.folderName" + val MASTER_URL = "spark.externalBlockStore.url" + val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager" + val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.AlluxioBlockManager" +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 00d01dd28afb..4e0432a0fec2 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, IOException} +import java.io.{DataInputStream, File, InputStream, IOException} import java.nio.ByteBuffer import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.GuardedBy @@ -25,12 +25,16 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import org.apache.spark.{SparkException, TaskContext} +import com.google.common.io.ByteStreams + +import org.apache.spark.{AlluxioManagedBuffer, SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} + +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.util.Utils +import org.apache.spark.shuffle.{FetchFailedException, IndexShuffleBlockResolver} +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBufferOutputStream /** @@ -240,8 +244,15 @@ final class ShuffleBlockFetcherIterator( } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) - results.put(new FailureFetchResult(BlockId(blockId), address, e)) + + if (!SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + results.put(new FailureFetchResult(BlockId(blockId), address, e)) + } else { + results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), + getAlluxioBlocks(BlockId(blockId).asInstanceOf[ShuffleBlockId]), + remainingBlocks.isEmpty)) + } } } @@ -257,6 +268,30 @@ final class ShuffleBlockFetcherIterator( } } + private[this] def getAlluxioBlocks(blockId: ShuffleBlockId): ManagedBuffer = { + + val indexFileByteBuffer = blockManager.externalBlockStore.externalBlockManager.get + .getBytes(ShuffleIndexBlockId(blockId.shuffleId, blockId.mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID)) + val in = new DataInputStream(new ByteBufferInputStream(indexFileByteBuffer.get)) + try { + ByteStreams.skipFully(in, blockId.reduceId * 8) + val offset = in.readLong() + val nextOffset = in.readLong() + new AlluxioManagedBuffer( + SparkTransportConf.fromSparkConf(blockManager.conf, "shuffle"), + blockManager.externalBlockStore.externalBlockManager.get.getFile( + ShuffleDataBlockId(blockId.shuffleId, blockId.mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID)), + offset, + nextOffset - offset, + blockManager.externalBlockStore.externalBlockManager.get + ) + } finally { + in.close() + } + } + private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5