diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8ca731038e528..c5e7a73c53fe9 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files /** * Utilities for tests. Included in main codebase since it's used by multiple @@ -63,12 +63,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 295c70670857b..247e2400158ea 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io.{IOException, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc // For small files, directly read rather than memory map if (segment.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(segment.length.toInt) - channel.read(buf, segment.offset) + channel.position(segment.offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining}") + } + } buf.flip() Some(buf) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 932b5616043b4..6dbad5ff0518e 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.IOException import java.nio.ByteBuffer +import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) - var buffer: ByteBuffer = null + assert (is != null) try { - if (is != null) { - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) - buffer = ByteBuffer.wrap(bs) - if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + - s"is not equal to fetched size $fetchSize") - return None - } - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { case ioe: IOException => logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) - return None + None } - Some(buffer) } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aefbb24f01217..6d3bef007e6e4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} -import com.google.common.io.Files +import com.google.common.io.{ByteStreams, Files} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -1005,8 +1005,8 @@ private[spark] object Utils extends Logging { val stream = new FileInputStream(file) try { - stream.skip(effectiveStart) - stream.read(buff) + ByteStreams.skipFully(stream, effectiveStart) + ByteStreams.readFully(stream, buff) } finally { stream.close() } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 7e18f45de7b5b..5997e014c5ca8 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io._ import java.util.jar.{JarEntry, JarOutputStream} +import com.google.common.io.ByteStreams import com.google.common.io.Files import org.scalatest.FunSuite @@ -60,12 +61,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { jar.putNextEntry(jarEntry) val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jar) in.close() jar.close()