diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7b68dfe5ad06..73eb9baa2bde 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -253,7 +253,7 @@ private[spark] class Executor( val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) - val resultSize = serializedDirectResult.limit + val resultSize = serializedDirectResult.remaining() // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 44d195587a08..f0b68b2d5f68 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -166,7 +166,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) + val zeroArray = new Array[Byte](zeroBuffer.remaining()) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() @@ -216,7 +216,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) + val zeroArray = new Array[Byte](zeroBuffer.remaining()) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index f4965994d827..2aae57006abe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -49,16 +49,17 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { + val serializedSize = serializedData.remaining() val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => - if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { + if (!taskSetManager.canFetchMoreResults(serializedSize)) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. directResult.value() - (directResult, serializedData.limit()) + (directResult, serializedSize) case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize @@ -105,7 +106,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul override def run(): Unit = Utils.logUncaughtExceptions { val loader = Utils.getContextOrSparkClassLoader try { - if (serializedData != null && serializedData.limit() > 0) { + if (serializedData != null && serializedData.remaining() > 0) { reason = serializer.get().deserialize[TaskEndReason]( serializedData, loader) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a02f3017cb6e..e770beca81ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -469,11 +469,11 @@ private[spark] class TaskSetManager( abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && + if (serializedTask.remaining() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"(${serializedTask.remaining() / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } addRunningTask(taskId) @@ -483,7 +483,7 @@ private[spark] class TaskSetManager( // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + - s"$taskLocality, ${serializedTask.limit} bytes)") + s"$taskLocality, ${serializedTask.remaining()} bytes)") sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 505c161141c8..0a13ace4b71e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -224,13 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) - if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { + if (serializedTask.remaining() >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, + msg = msg.format(task.taskId, task.index, serializedTask.remaining(), akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala index 5e7e6567a3e0..329195b6073f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala @@ -31,7 +31,7 @@ private[spark] case class MesosTaskLaunchData( attemptNumber: Int) extends Logging { def toByteString: ByteString = { - val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) + val dataBuffer = ByteBuffer.allocate(4 + serializedTask.remaining()) dataBuffer.putInt(attemptNumber) dataBuffer.put(serializedTask) dataBuffer.rewind diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ab0007fb7899..570a138a36b8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -522,12 +522,12 @@ private[spark] class BlockManager( /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ - memoryStore.putBytes(blockId, bytes.limit, () => { + memoryStore.putBytes(blockId, bytes.remaining(), () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we cannot // put it into MemoryStore, copyForMemory should not be created. That's why this // action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(bytes.limit) + val copyForMemory = ByteBuffer.allocate(bytes.remaining()) copyForMemory.put(bytes) }) bytes.rewind() @@ -604,10 +604,11 @@ private[spark] class BlockManager( if (data != null) { if (asBlockResult) { + val dataSize = data.remaining() return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, - data.limit())) + dataSize)) } else { return Some(data) } @@ -951,10 +952,10 @@ private[spark] class BlockManager( try { val onePeerStartTime = System.currentTimeMillis data.rewind() - logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") + logTrace(s"Trying to replicate $blockId of ${data.remaining()} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) - logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" + logTrace(s"Replicated $blockId of ${data.remaining()} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer peersForReplication -= peer @@ -977,7 +978,7 @@ private[spark] class BlockManager( } } val timeTakeMs = (System.currentTimeMillis - startTime) - logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + + logDebug(s"Replicating $blockId of ${data.remaining()} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c008b9dc1632..f07a51e8f32d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -54,8 +54,8 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( - file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) - PutResult(bytes.limit(), Right(bytes.duplicate())) + file.getName, Utils.bytesToString(bytes.remaining()), finishTime - startTime)) + PutResult(bytes.remaining(), Right(bytes.duplicate())) } override def putArray( diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala index db965d54bafd..e4e273702b36 100644 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -109,8 +109,8 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: if (externalBlockManager.isDefined) { val byteBuffer = bytes.duplicate() byteBuffer.rewind() + val size = bytes.remaining() externalBlockManager.get.putBytes(blockId, byteBuffer) - val size = bytes.limit() val data = if (returnValues) { Right(bytes) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4dbac388e098..e5862725e16b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -96,8 +96,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo putIterator(blockId, values, level, returnValues = true) } else { val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + val size = bytes.remaining() + tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks) + PutResult(size, Right(bytes.duplicate()), droppedBlocks) } } @@ -114,7 +115,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks) val data = if (putSuccess) { - assert(bytes.limit == size) + assert(bytes.remaining() == size) Right(bytes.duplicate()) } else { null @@ -134,8 +135,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + val size = bytes.remaining() + tryToPut(blockId, bytes, size, deserialized = false, droppedBlocks) + PutResult(size, Right(bytes.duplicate()), droppedBlocks) } } diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala index a06b6f84ef11..d9ce1425e729 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala @@ -45,8 +45,9 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable } private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { - out.writeInt(buffer.limit()) - if (Channels.newChannel(out).write(buffer) != buffer.limit()) { + val readableBytes = buffer.remaining() + out.writeInt(readableBytes) + if (Channels.newChannel(out).write(buffer) != readableBytes) { throw new IOException("Could not fully write buffer to output stream") } buffer.rewind() // Allow us to write it again later diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 007a71f87cf1..587b8a9c06ee 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -155,8 +155,8 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { val ser = SparkEnv.get.closureSerializer.newInstance() val union = rdd1.union(rdd2) // The UnionRDD itself should be large, but each individual partition should be small. - assert(ser.serialize(union).limit() > 2000) - assert(ser.serialize(union.partitions.head).limit() < 2000) + assert(ser.serialize(union).remaining() > 2000) + assert(ser.serialize(union.partitions.head).remaining() < 2000) } test("aggregate") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index f81fe3113106..da7eed8400bc 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -185,7 +185,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { def check[T: ClassTag](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) // Check that very long ranges don't get written one element at a time - assert(ser.serialize(t).limit < 100) + assert(ser.serialize(t).remaining() < 100) } check(1 to 1000000) check(1 to 1000000 by 2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala index b0e216feb559..642169ad90ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -88,7 +88,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] } // Header = null count + null positions - val headerSize = 4 + nulls.limit() + val headerSize = 4 + nulls.remaining() val compressedSize = if (encoder.compressedSize == 0) { nonNullBuffer.remaining() } else {