diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java index 36ca73f6ac0f0..b507f911fe11a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java @@ -109,13 +109,15 @@ public void addToChannel(Channel ch) throws IOException { @VisibleForTesting static class EncryptionHandler extends ChannelOutboundHandlerAdapter { - private final ByteArrayWritableChannel byteChannel; + private final ByteArrayWritableChannel byteEncChannel; private final CryptoOutputStream cos; + private final ByteArrayWritableChannel byteRawChannel; private boolean isCipherValid; EncryptionHandler(TransportCipher cipher) throws IOException { - byteChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); - cos = cipher.createOutputStream(byteChannel); + byteEncChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); + cos = cipher.createOutputStream(byteEncChannel); + byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); isCipherValid = true; } @@ -127,7 +129,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @VisibleForTesting EncryptedMessage createEncryptedMessage(Object msg) { - return new EncryptedMessage(this, cos, msg, byteChannel); + return new EncryptedMessage(this, cos, msg, byteEncChannel, byteRawChannel); } @Override @@ -223,8 +225,8 @@ static class EncryptedMessage extends AbstractFileRegion { // Due to streaming issue CRYPTO-125: https://issues.apache.org/jira/browse/CRYPTO-125, it has // to utilize two helper ByteArrayWritableChannel for streaming. One is used to receive raw data // from upper handler, another is used to store encrypted data. - private ByteArrayWritableChannel byteEncChannel; - private ByteArrayWritableChannel byteRawChannel; + private final ByteArrayWritableChannel byteEncChannel; + private final ByteArrayWritableChannel byteRawChannel; private ByteBuffer currentEncrypted; @@ -232,7 +234,8 @@ static class EncryptedMessage extends AbstractFileRegion { EncryptionHandler handler, CryptoOutputStream cos, Object msg, - ByteArrayWritableChannel ch) { + ByteArrayWritableChannel byteEncChannel, + ByteArrayWritableChannel byteRawChannel) { Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, "Unrecognized message type: %s", msg.getClass().getName()); this.handler = handler; @@ -240,9 +243,9 @@ static class EncryptedMessage extends AbstractFileRegion { this.buf = isByteBuf ? (ByteBuf) msg : null; this.region = isByteBuf ? null : (FileRegion) msg; this.transferred = 0; - this.byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); this.cos = cos; - this.byteEncChannel = ch; + this.byteEncChannel = byteEncChannel; + this.byteRawChannel = byteRawChannel; this.count = isByteBuf ? buf.readableBytes() : region.count(); }