Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private[spark] class Executor(

val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val resultSize = serializedDirectResult.remaining()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that there's an implicit assumption in some of this code that the buffer's position is 0 on returning, and the entire buffer is filled with valid data. Do we have a situation where the position is not 0 though, but is correctly at the start of the data? at least, this looks like it handles the situation, but it sounds unusual. Equally, if that's an issue, are we sure the entire buffer has valid data, through the end? that assumption is still present here, that the end of the data is the end of the buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a situation where the position is not 0 though, but is correctly at the start of the data?

If a ByteBuffer is from Netty, the position could be a non-zero value.

Equally, if that's an issue, are we sure the entire buffer has valid data, through the end? that assumption is still present here, that the end of the data is the end of the buffer.

The ByteBuffer may contain more data internally, but the user should only read the part between position and limit. I think that's defined in ByteBuffer/Buffer javadoc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found I was wrong about the position of ByteBuffer from Netty. Netty will call ByteBuffer.slice to reset the position to 0 before returning it: https://github.com/netty/netty/blob/0f9492c9affc528c766f9677952412564d4a3f6d/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java#L269

I think we don't need this patch.


// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
val zeroArray = new Array[Byte](zeroBuffer.remaining())
zeroBuffer.get(zeroArray)

lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
Expand Down Expand Up @@ -216,7 +216,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
val zeroArray = new Array[Byte](zeroBuffer.remaining())
zeroBuffer.get(zeroArray)

// When deserializing, use a lazy val to create just one instance of the serializer per task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val serializedSize = serializedData.remaining()
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
if (!taskSetManager.canFetchMoreResults(serializedSize)) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
(directResult, serializedSize)
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
Expand Down Expand Up @@ -105,7 +106,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
if (serializedData != null && serializedData.limit() > 0) {
if (serializedData != null && serializedData.remaining() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
serializedData, loader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,11 @@ private[spark] class TaskSetManager(
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
if (serializedTask.remaining() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"(${serializedTask.remaining() / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
Expand All @@ -483,7 +483,7 @@ private[spark] class TaskSetManager(
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")
s"$taskLocality, ${serializedTask.remaining()} bytes)")

sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
if (serializedTask.remaining() >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
msg = msg.format(task.taskId, task.index, serializedTask.remaining(), akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] case class MesosTaskLaunchData(
attemptNumber: Int) extends Logging {

def toByteString: ByteString = {
val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
val dataBuffer = ByteBuffer.allocate(4 + serializedTask.remaining())
dataBuffer.putInt(attemptNumber)
dataBuffer.put(serializedTask)
dataBuffer.rewind
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,12 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
memoryStore.putBytes(blockId, bytes.remaining(), () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(bytes.limit)
val copyForMemory = ByteBuffer.allocate(bytes.remaining())
copyForMemory.put(bytes)
})
bytes.rewind()
Expand Down Expand Up @@ -604,10 +604,11 @@ private[spark] class BlockManager(

if (data != null) {
if (asBlockResult) {
val dataSize = data.remaining()
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
dataSize))
} else {
return Some(data)
}
Expand Down Expand Up @@ -951,10 +952,10 @@ private[spark] class BlockManager(
try {
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
logTrace(s"Trying to replicate $blockId of ${data.remaining()} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
logTrace(s"Replicated $blockId of ${data.remaining()} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
Expand All @@ -977,7 +978,7 @@ private[spark] class BlockManager(
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
logDebug(s"Replicating $blockId of ${data.remaining()} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
PutResult(bytes.limit(), Right(bytes.duplicate()))
file.getName, Utils.bytesToString(bytes.remaining()), finishTime - startTime))
PutResult(bytes.remaining(), Right(bytes.duplicate()))
}

override def putArray(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
if (externalBlockManager.isDefined) {
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
val size = bytes.remaining()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting one. The original bytes is reset to position 0 and put, so it means that the size that was put really is bytes.limit or byteBuffer.remaining (before it's put). I think this might have to be adjusted but I'm also not sure why bytes is returned here instead of what was put into the block manager. Maybe it's really assumed position is 0, but then why rewind?

externalBlockManager.get.putBytes(blockId, byteBuffer)
val size = bytes.limit()
val data = if (returnValues) {
Right(bytes)
} else {
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
putIterator(blockId, values, level, returnValues = true)
} else {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
val size = bytes.remaining()
tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks)
PutResult(size, Right(bytes.duplicate()), droppedBlocks)
}
}

Expand All @@ -114,7 +115,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks)
val data =
if (putSuccess) {
assert(bytes.limit == size)
assert(bytes.remaining() == size)
Right(bytes.duplicate())
} else {
null
Expand All @@ -134,8 +135,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
val size = bytes.remaining()
tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks)
PutResult(size, Right(bytes.duplicate()), droppedBlocks)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable
}

private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.writeInt(buffer.limit())
if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
val readableBytes = buffer.remaining()
out.writeInt(readableBytes)
if (Channels.newChannel(out).write(buffer) != readableBytes) {
throw new IOException("Could not fully write buffer to output stream")
}
buffer.rewind() // Allow us to write it again later
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
val ser = SparkEnv.get.closureSerializer.newInstance()
val union = rdd1.union(rdd2)
// The UnionRDD itself should be large, but each individual partition should be small.
assert(ser.serialize(union).limit() > 2000)
assert(ser.serialize(union.partitions.head).limit() < 2000)
assert(ser.serialize(union).remaining() > 2000)
assert(ser.serialize(union.partitions.head).remaining() < 2000)
}

test("aggregate") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
assert(ser.serialize(t).remaining() < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
}

// Header = null count + null positions
val headerSize = 4 + nulls.limit()
val headerSize = 4 + nulls.remaining()
val compressedSize = if (encoder.compressedSize == 0) {
nonNullBuffer.remaining()
} else {
Expand Down