Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConversions._

import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import com.google.common.io.Files

/**
* Utilities for tests. Included in main codebase since it's used by multiple
Expand Down Expand Up @@ -63,12 +63,7 @@ private[spark] object TestUtils {
jarStream.putNextEntry(jarEntry)

val in = new FileInputStream(file)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jarStream.write(buffer, 0, nRead)
}
ByteStreams.copy(in, jarStream)
in.close()
}
jarStream.close()
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, RandomAccessFile}
import java.io.{IOException, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode

Expand Down Expand Up @@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
// For small files, directly read rather than memory map
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
channel.read(buf, segment.offset)
channel.position(segment.offset)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
Some(buf)
} else {
Expand Down
21 changes: 7 additions & 14 deletions core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.IOException
import java.nio.ByteBuffer

import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType}

import org.apache.spark.Logging
Expand Down Expand Up @@ -105,25 +106,17 @@ private[spark] class TachyonStore(
return None
}
val is = file.getInStream(ReadType.CACHE)
var buffer: ByteBuffer = null
assert (is != null)
try {
if (is != null) {
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
buffer = ByteBuffer.wrap(bs)
if (fetchSize != size) {
logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " +
s"is not equal to fetched size $fetchSize")
return None
}
}
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
} catch {
case ioe: IOException =>
logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
return None
None
}
Some(buffer)
}

override def contains(blockId: BlockId): Boolean = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.reflect.ClassTag
import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.Files
import com.google.common.io.{ByteStreams, Files}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
Expand Down Expand Up @@ -1005,8 +1005,8 @@ private[spark] object Utils extends Logging {
val stream = new FileInputStream(file)

try {
stream.skip(effectiveStart)
stream.read(buff)
ByteStreams.skipFully(stream, effectiveStart)
ByteStreams.readFully(stream, buff)
} finally {
stream.close()
}
Expand Down
8 changes: 2 additions & 6 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}

import com.google.common.io.ByteStreams
import com.google.common.io.Files
import org.scalatest.FunSuite

Expand Down Expand Up @@ -60,12 +61,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
jar.putNextEntry(jarEntry)

val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}
ByteStreams.copy(in, jar)

in.close()
jar.close()
Expand Down