@@ -146,6 +146,12 @@ object BlockFetcherIterator {
146146 }
147147
148148 protected def splitLocalRemoteBlocks (): ArrayBuffer [FetchRequest ] = {
149+ // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
150+ // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
151+ // nodes, rather than blocking on reading output from one node.
152+ val maxRequestSize = math.max(maxBytesInFlight / 5 , 1L )
153+ logInfo(" maxBytesInFlight: " + maxBytesInFlight + " , maxRequestSize: " + maxRequestSize)
154+
149155 // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
150156 // at most maxBytesInFlight in order to limit the amount of data in flight.
151157 val remoteRequests = new ArrayBuffer [FetchRequest ]
@@ -157,11 +163,6 @@ object BlockFetcherIterator {
157163 _numBlocksToFetch += localBlocksToFetch.size
158164 } else {
159165 numRemote += blockInfos.size
160- // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
161- // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
162- // nodes, rather than blocking on reading output from one node.
163- val minRequestSize = math.max(maxBytesInFlight / 5 , 1L )
164- logInfo(" maxBytesInFlight: " + maxBytesInFlight + " , minRequest: " + minRequestSize)
165166 val iterator = blockInfos.iterator
166167 var curRequestSize = 0L
167168 var curBlocks = new ArrayBuffer [(BlockId , Long )]
@@ -176,11 +177,12 @@ object BlockFetcherIterator {
176177 } else if (size < 0 ) {
177178 throw new BlockException (blockId, " Negative block size " + size)
178179 }
179- if (curRequestSize >= minRequestSize ) {
180+ if (curRequestSize >= maxRequestSize ) {
180181 // Add this FetchRequest
181182 remoteRequests += new FetchRequest (address, curBlocks)
182183 curRequestSize = 0
183184 curBlocks = new ArrayBuffer [(BlockId , Long )]
185+ logDebug(s " Creating fetch request of $curRequestSize at $address" )
184186 }
185187 }
186188 // Add in the final request
@@ -189,7 +191,7 @@ object BlockFetcherIterator {
189191 }
190192 }
191193 }
192- logInfo(" Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
194+ logInfo(" Getting " + _numBlocksToFetch + " non-empty blocks out of " +
193195 totalBlocks + " blocks" )
194196 remoteRequests
195197 }
@@ -224,8 +226,8 @@ object BlockFetcherIterator {
224226 sendRequest(fetchRequests.dequeue())
225227 }
226228
227- val numGets = remoteRequests.size - fetchRequests.size
228- logInfo(" Started " + numGets + " remote gets in " + Utils .getUsedTimeMs(startTime))
229+ val numFetches = remoteRequests.size - fetchRequests.size
230+ logInfo(" Started " + numFetches + " remote fetches in" + Utils .getUsedTimeMs(startTime))
229231
230232 // Get Local Blocks
231233 startTime = System .currentTimeMillis
@@ -325,7 +327,7 @@ object BlockFetcherIterator {
325327 }
326328
327329 copiers = startCopiers(conf.getInt(" spark.shuffle.copier.threads" , 6 ))
328- logInfo(" Started " + fetchRequestsSync.size + " remote gets in " +
330+ logInfo(" Started " + fetchRequestsSync.size + " remote fetches in " +
329331 Utils .getUsedTimeMs(startTime))
330332
331333 // Get Local Blocks
0 commit comments