Skip to content

Commit 99d2e4e

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-24296][CORE] Replicate large blocks as a stream.
When replicating large cached RDD blocks, it can be helpful to replicate them as a stream, to avoid using large amounts of memory during the transfer. This also allows blocks larger than 2GB to be replicated. Added unit tests in DistributedSuite. Also ran tests on a cluster for blocks > 2gb. Closes #21451 from squito/clean_replication. Authored-by: Imran Rashid <irashid@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 35f7f5c commit 99d2e4e

File tree

16 files changed

+270
-37
lines changed

16 files changed

+270
-37
lines changed

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void onComplete(String streamId) throws IOException {
234234
callback.onSuccess(ByteBuffer.allocate(0));
235235
} catch (Exception ex) {
236236
IOException ioExc = new IOException("Failure post-processing complete stream;" +
237-
" failing this rpc and leaving channel active");
237+
" failing this rpc and leaving channel active", ex);
238238
callback.onFailure(ioExc);
239239
streamHandler.onFailure(streamId, ioExc);
240240
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements Encodable {
4242
/** Preceding every serialized message is its type, which allows us to deserialize it. */
4343
public enum Type {
4444
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
45-
HEARTBEAT(5);
45+
HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6);
4646

4747
private final byte id;
4848

@@ -67,6 +67,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
6767
case 3: return StreamHandle.decode(buf);
6868
case 4: return RegisterDriver.decode(buf);
6969
case 5: return ShuffleServiceHeartbeat.decode(buf);
70+
case 6: return UploadBlockStream.decode(buf);
7071
default: throw new IllegalArgumentException("Unknown message type: " + type);
7172
}
7273
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.protocol;
19+
20+
import java.util.Arrays;
21+
22+
import com.google.common.base.Objects;
23+
import io.netty.buffer.ByteBuf;
24+
25+
import org.apache.spark.network.protocol.Encoders;
26+
27+
// Needed by ScalaDoc. See SPARK-7726
28+
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
29+
30+
/**
31+
* A request to Upload a block, which the destination should receive as a stream.
32+
*
33+
* The actual block data is not contained here. It will be passed to the StreamCallbackWithID
34+
* that is returned from RpcHandler.receiveStream()
35+
*/
36+
public class UploadBlockStream extends BlockTransferMessage {
37+
public final String blockId;
38+
public final byte[] metadata;
39+
40+
public UploadBlockStream(String blockId, byte[] metadata) {
41+
this.blockId = blockId;
42+
this.metadata = metadata;
43+
}
44+
45+
@Override
46+
protected Type type() { return Type.UPLOAD_BLOCK_STREAM; }
47+
48+
@Override
49+
public int hashCode() {
50+
int objectsHashCode = Objects.hashCode(blockId);
51+
return objectsHashCode * 41 + Arrays.hashCode(metadata);
52+
}
53+
54+
@Override
55+
public String toString() {
56+
return Objects.toStringHelper(this)
57+
.add("blockId", blockId)
58+
.add("metadata size", metadata.length)
59+
.toString();
60+
}
61+
62+
@Override
63+
public boolean equals(Object other) {
64+
if (other != null && other instanceof UploadBlockStream) {
65+
UploadBlockStream o = (UploadBlockStream) other;
66+
return Objects.equal(blockId, o.blockId)
67+
&& Arrays.equals(metadata, o.metadata);
68+
}
69+
return false;
70+
}
71+
72+
@Override
73+
public int encodedLength() {
74+
return Encoders.Strings.encodedLength(blockId)
75+
+ Encoders.ByteArrays.encodedLength(metadata);
76+
}
77+
78+
@Override
79+
public void encode(ByteBuf buf) {
80+
Encoders.Strings.encode(buf, blockId);
81+
Encoders.ByteArrays.encode(buf, metadata);
82+
}
83+
84+
public static UploadBlockStream decode(ByteBuf buf) {
85+
String blockId = Encoders.Strings.decode(buf);
86+
byte[] metadata = Encoders.ByteArrays.decode(buf);
87+
return new UploadBlockStream(blockId, metadata);
88+
}
89+
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,14 +363,14 @@ private[spark] class Executor(
363363
threadMXBean.getCurrentThreadCpuTime
364364
} else 0L
365365
var threwException = true
366-
val value = try {
366+
val value = Utils.tryWithSafeFinally {
367367
val res = task.run(
368368
taskAttemptId = taskId,
369369
attemptNumber = taskDescription.attemptNumber,
370370
metricsSystem = env.metricsSystem)
371371
threwException = false
372372
res
373-
} finally {
373+
} {
374374
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
375375
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
376376

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,13 @@ package object config {
568568
.checkValue(v => v > 0, "The value should be a positive integer.")
569569
.createWithDefault(2000)
570570

571+
private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS =
572+
ConfigBuilder("spark.storage.memoryMapLimitForTests")
573+
.internal()
574+
.doc("For testing only, controls the size of chunks when memory mapping a file")
575+
.bytesConf(ByteUnit.BYTE)
576+
.createWithDefault(Int.MaxValue)
577+
571578
private[spark] val BARRIER_SYNC_TIMEOUT =
572579
ConfigBuilder("spark.barrier.sync.timeout")
573580
.doc("The timeout in seconds for each barrier() call from a barrier task. If the " +

core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.network
2020
import scala.reflect.ClassTag
2121

2222
import org.apache.spark.network.buffer.ManagedBuffer
23+
import org.apache.spark.network.client.StreamCallbackWithID
2324
import org.apache.spark.storage.{BlockId, StorageLevel}
2425

2526
private[spark]
@@ -43,6 +44,17 @@ trait BlockDataManager {
4344
level: StorageLevel,
4445
classTag: ClassTag[_]): Boolean
4546

47+
/**
48+
* Put the given block that will be received as a stream.
49+
*
50+
* When this method is called, the block data itself is not available -- it will be passed to the
51+
* returned StreamCallbackWithID.
52+
*/
53+
def putBlockDataAsStream(
54+
blockId: BlockId,
55+
level: StorageLevel,
56+
classTag: ClassTag[_]): StreamCallbackWithID
57+
4658
/**
4759
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
4860
*/

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ import scala.reflect.ClassTag
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.network.BlockDataManager
2828
import org.apache.spark.network.buffer.NioManagedBuffer
29-
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
29+
import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithID, TransportClient}
3030
import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
31-
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock}
31+
import org.apache.spark.network.shuffle.protocol._
3232
import org.apache.spark.serializer.Serializer
3333
import org.apache.spark.storage.{BlockId, StorageLevel}
3434

@@ -73,10 +73,32 @@ class NettyBlockRpcServer(
7373
}
7474
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
7575
val blockId = BlockId(uploadBlock.blockId)
76+
logDebug(s"Receiving replicated block $blockId with level ${level} " +
77+
s"from ${client.getSocketAddress}")
7678
blockManager.putBlockData(blockId, data, level, classTag)
7779
responseContext.onSuccess(ByteBuffer.allocate(0))
7880
}
7981
}
8082

83+
override def receiveStream(
84+
client: TransportClient,
85+
messageHeader: ByteBuffer,
86+
responseContext: RpcResponseCallback): StreamCallbackWithID = {
87+
val message =
88+
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
89+
val (level: StorageLevel, classTag: ClassTag[_]) = {
90+
serializer
91+
.newInstance()
92+
.deserialize(ByteBuffer.wrap(message.metadata))
93+
.asInstanceOf[(StorageLevel, ClassTag[_])]
94+
}
95+
val blockId = BlockId(message.blockId)
96+
logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +
97+
s"from ${client.getSocketAddress}")
98+
// This will return immediately, but will setup a callback on streamData which will still
99+
// do all the processing in the netty thread.
100+
blockManager.putBlockDataAsStream(blockId, level, classTag)
101+
}
102+
81103
override def getStreamManager(): StreamManager = streamManager
82104
}

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import scala.reflect.ClassTag
2727
import com.codahale.metrics.{Metric, MetricSet}
2828

2929
import org.apache.spark.{SecurityManager, SparkConf}
30+
import org.apache.spark.internal.config
3031
import org.apache.spark.network._
31-
import org.apache.spark.network.buffer.ManagedBuffer
32+
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3233
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
3334
import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
3435
import org.apache.spark.network.server._
3536
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager}
36-
import org.apache.spark.network.shuffle.protocol.UploadBlock
37+
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
3738
import org.apache.spark.network.util.JavaUtils
3839
import org.apache.spark.serializer.JavaSerializer
3940
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -148,20 +149,28 @@ private[spark] class NettyBlockTransferService(
148149
// Everything else is encoded using our binary protocol.
149150
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
150151

151-
// Convert or copy nio buffer into array in order to serialize it.
152-
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
152+
val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
153+
val callback = new RpcResponseCallback {
154+
override def onSuccess(response: ByteBuffer): Unit = {
155+
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
156+
result.success((): Unit)
157+
}
153158

154-
client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,
155-
new RpcResponseCallback {
156-
override def onSuccess(response: ByteBuffer): Unit = {
157-
logTrace(s"Successfully uploaded block $blockId")
158-
result.success((): Unit)
159-
}
160-
override def onFailure(e: Throwable): Unit = {
161-
logError(s"Error while uploading block $blockId", e)
162-
result.failure(e)
163-
}
164-
})
159+
override def onFailure(e: Throwable): Unit = {
160+
logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
161+
result.failure(e)
162+
}
163+
}
164+
if (asStream) {
165+
val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBuffer
166+
client.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback)
167+
} else {
168+
// Convert or copy nio buffer into array in order to serialize it.
169+
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
170+
171+
client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,
172+
callback)
173+
}
165174

166175
result.future
167176
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
4141
import org.apache.spark.metrics.source.Source
4242
import org.apache.spark.network._
4343
import org.apache.spark.network.buffer.ManagedBuffer
44+
import org.apache.spark.network.client.StreamCallbackWithID
4445
import org.apache.spark.network.netty.SparkTransportConf
4546
import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager}
4647
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
@@ -406,6 +407,63 @@ private[spark] class BlockManager(
406407
putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
407408
}
408409

410+
override def putBlockDataAsStream(
411+
blockId: BlockId,
412+
level: StorageLevel,
413+
classTag: ClassTag[_]): StreamCallbackWithID = {
414+
// TODO if we're going to only put the data in the disk store, we should just write it directly
415+
// to the final location, but that would require a deeper refactor of this code. So instead
416+
// we just write to a temp file, and call putBytes on the data in that file.
417+
val tmpFile = diskBlockManager.createTempLocalBlock()._2
418+
val channel = new CountingWritableChannel(
419+
Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
420+
logTrace(s"Streaming block $blockId to tmp file $tmpFile")
421+
new StreamCallbackWithID {
422+
423+
override def getID: String = blockId.name
424+
425+
override def onData(streamId: String, buf: ByteBuffer): Unit = {
426+
while (buf.hasRemaining) {
427+
channel.write(buf)
428+
}
429+
}
430+
431+
override def onComplete(streamId: String): Unit = {
432+
logTrace(s"Done receiving block $blockId, now putting into local blockManager")
433+
// Read the contents of the downloaded file as a buffer to put into the blockManager.
434+
// Note this is all happening inside the netty thread as soon as it reads the end of the
435+
// stream.
436+
channel.close()
437+
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
438+
// using a lot of memory here. With encryption, we'll read the whole file into a regular
439+
// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm
440+
// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp
441+
// file as a stream in both cases.
442+
val buffer = securityManager.getIOEncryptionKey() match {
443+
case Some(key) =>
444+
// we need to pass in the size of the unencrypted block
445+
val blockSize = channel.getCount
446+
val allocator = level.memoryMode match {
447+
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
448+
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
449+
}
450+
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
451+
452+
case None =>
453+
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
454+
}
455+
putBytes(blockId, buffer, level)(classTag)
456+
tmpFile.delete()
457+
}
458+
459+
override def onFailure(streamId: String, cause: Throwable): Unit = {
460+
// the framework handles the connection itself, we just need to do local cleanup
461+
channel.close()
462+
tmpFile.delete()
463+
}
464+
}
465+
}
466+
409467
/**
410468
* Get the BlockStatus for the block identified by the given ID, if it exists.
411469
* NOTE: This is mainly for testing.
@@ -667,7 +725,7 @@ private[spark] class BlockManager(
667725
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
668726
// could just use the inputStream on the temp file, rather than memory-mapping the file.
669727
// Until then, replication can cause the process to use too much memory and get killed
670-
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
728+
// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though
671729
// we've read the data to disk.
672730
logDebug(s"Getting remote block $blockId")
673731
require(blockId != null, "BlockId is null")
@@ -1358,12 +1416,16 @@ private[spark] class BlockManager(
13581416
try {
13591417
val onePeerStartTime = System.nanoTime
13601418
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
1419+
// This thread keeps a lock on the block, so we do not want the netty thread to unlock
1420+
// block when it finishes sending the message.
1421+
val buffer = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false,
1422+
unlockOnDeallocate = false)
13611423
blockTransferService.uploadBlockSync(
13621424
peer.host,
13631425
peer.port,
13641426
peer.executorId,
13651427
blockId,
1366-
new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false),
1428+
buffer,
13671429
tLevel,
13681430
classTag)
13691431
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +

core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ private[storage] class BlockManagerManagedBuffer(
3838
blockInfoManager: BlockInfoManager,
3939
blockId: BlockId,
4040
data: BlockData,
41-
dispose: Boolean) extends ManagedBuffer {
41+
dispose: Boolean,
42+
unlockOnDeallocate: Boolean = true) extends ManagedBuffer {
4243

4344
private val refCount = new AtomicInteger(1)
4445

@@ -58,7 +59,9 @@ private[storage] class BlockManagerManagedBuffer(
5859
}
5960

6061
override def release(): ManagedBuffer = {
61-
blockInfoManager.unlock(blockId)
62+
if (unlockOnDeallocate) {
63+
blockInfoManager.unlock(blockId)
64+
}
6265
if (refCount.decrementAndGet() == 0 && dispose) {
6366
data.dispose()
6467
}

0 commit comments

Comments
 (0)