@@ -33,7 +33,7 @@ import org.apache.spark.internal.config._
3333import org .apache .spark .launcher .SparkLauncher
3434import org .apache .spark .network .buffer .{FileSegmentManagedBuffer , ManagedBuffer , NioManagedBuffer }
3535import org .apache .spark .network .netty .SparkTransportConf
36- import org .apache .spark .network .shuffle .{BlockFetchingListener , BlockStoreClient }
36+ import org .apache .spark .network .shuffle .{BlockFetchingListener }
3737import org .apache .spark .network .shuffle .ErrorHandler .BlockPushErrorHandler
3838import org .apache .spark .network .util .TransportConf
3939import org .apache .spark .scheduler .MapStatus
@@ -118,9 +118,8 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
118118 // time won't be pushing the same ranges of shuffle partitions.
119119 pushRequests ++= Utils .randomize(requests)
120120
121- val shuffleClient = SparkEnv .get.blockManager.blockStoreClient
122121 submitTask(() => {
123- pushUpToMax(shuffleClient )
122+ pushUpToMax()
124123 })
125124 }
126125
@@ -144,7 +143,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
144143 * This code is similar to ShuffleBlockFetcherIterator#fetchUpToMaxBytes in how it throttles
145144 * the data transfer between shuffle client/server.
146145 */
147- private def pushUpToMax (shuffleClient : BlockStoreClient ): Unit = synchronized {
146+ private def pushUpToMax (): Unit = synchronized {
148147 // Process any outstanding deferred push requests if possible.
149148 if (deferredPushRequests.nonEmpty) {
150149 for ((remoteAddress, defReqQueue) <- deferredPushRequests) {
@@ -153,7 +152,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
153152 val request = defReqQueue.dequeue()
154153 logDebug(s " Processing deferred push request for $remoteAddress with "
155154 + s " ${request.blocks.length} blocks " )
156- sendRequest(request, shuffleClient )
155+ sendRequest(request)
157156 if (defReqQueue.isEmpty) {
158157 deferredPushRequests -= remoteAddress
159158 }
@@ -171,7 +170,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
171170 defReqQueue.enqueue(request)
172171 deferredPushRequests(remoteAddress) = defReqQueue
173172 } else {
174- sendRequest(request, shuffleClient )
173+ sendRequest(request)
175174 }
176175 }
177176
@@ -197,7 +196,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
197196 * client thread instead of task execution thread which takes care of majority of the block
198197 * pushes.
199198 */
200- private def sendRequest (request : PushRequest , shuffleClient : BlockStoreClient ): Unit = {
199+ private def sendRequest (request : PushRequest ): Unit = {
201200 bytesInFlight = bytesInFlight + request.size
202201 reqsInFlight = reqsInFlight + 1
203202 numBlocksInFlightPerAddress(request.address) = numBlocksInFlightPerAddress.getOrElseUpdate(
@@ -222,7 +221,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
222221 submitTask(() => {
223222 if (updateStateAndCheckIfPushMore(
224223 sizeMap(result.blockId), address, remainingBlocks, result)) {
225- pushUpToMax(SparkEnv .get.blockManager.blockStoreClient )
224+ pushUpToMax()
226225 }
227226 })
228227 }
@@ -242,7 +241,8 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
242241 handleResult(PushResult (blockId, exception))
243242 }
244243 }
245- shuffleClient.pushBlocks(address.host, address.port, blockIds.toArray,
244+ SparkEnv .get.blockManager.blockStoreClient.pushBlocks(
245+ address.host, address.port, blockIds.toArray,
246246 sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)),
247247 blockPushListener)
248248 }
@@ -263,7 +263,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
263263 reqBuffer : ManagedBuffer ,
264264 blockSizes : Seq [Long ]): Array [ManagedBuffer ] = {
265265 if (blockSizes.size == 1 ) {
266- Seq (reqBuffer).toArray
266+ Array (reqBuffer)
267267 } else {
268268 val inMemoryBuffer = reqBuffer.nioByteBuffer()
269269 val blockOffsets = new Array [Long ](blockSizes.size)
@@ -378,7 +378,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
378378 // Start a new PushRequest if the current request goes beyond the max batch size,
379379 // or the number of blocks in the current request goes beyond the limit per destination,
380380 // or the next block push location is for a different shuffle service, or the next block
381- // exceeds the max block size to push limit. This guarantees that each PushReqeust
381+ // exceeds the max block size to push limit. This guarantees that each PushRequest
382382 // represents continuous blocks in the shuffle file to be pushed to the same shuffle
383383 // service, and does not go beyond existing limitations.
384384 if (currentReqSize + blockSize <= maxBlockBatchSize
@@ -394,13 +394,13 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
394394 }
395395 // Start a new batch
396396 currentReqSize = 0
397- // Set currentReqffset to -1 so we are able to distinguish between the initial value
397+ // Set currentReqOffset to -1 so we are able to distinguish between the initial value
398398 // of currentReqOffset and when we are about to start a new batch
399399 currentReqOffset = - 1
400400 currentMergerId = mergerId
401401 blocks = new ArrayBuffer [(BlockId , Long )]
402402 }
403- // Skip blocks exceeding the size limit for push
403+ // Only push blocks under the size limit
404404 if (blockSize <= maxBlockSizeToPush) {
405405 blocks += ((ShufflePushBlockId (shuffleId, partitionId, reduceId), blockSize))
406406 // Only update currentReqOffset if the current block is the first in the request
@@ -455,10 +455,7 @@ private[spark] object ShuffleWriter {
455455 * @param failure exception if the push was unsuccessful; null otherwise;
456456 */
457457 @ Since (" 3.1.0" )
458- private case class PushResult (
459- blockId : String ,
460- failure : Throwable
461- )
458+ private case class PushResult (blockId : String , failure : Throwable )
462459
463460 private val BLOCK_PUSHER_POOL : ExecutorService = {
464461 val conf = SparkEnv .get.conf
0 commit comments