From 0072d06e34d4310027120223083af2196edb4203 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Mon, 21 Jan 2019 19:34:11 +0800 Subject: [PATCH 01/14] Consolidate CompositeByteBuf when reading large frame --- .../network/util/TransportFrameDecoder.java | 10 ++++++- .../util/TransportFrameDecoderSuite.java | 30 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 8e73ab077a5c..47cd58b426c1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -146,8 +146,16 @@ private ByteBuf decodeNext() { remaining -= next.readableBytes(); frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); } + // Because the bytebuf created is far less than it's capacity in most cases, + // we can reduce memory consumption by consolidation + ByteBuf retained = null; + if (frameSize >= 1024 * 1024) { + retained = frame.consolidate(); + } else { + retained = frame; + } assert remaining == 0; - return frame; + return retained; } /** diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index 7d40387c5f1a..df21f1fe39c2 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -22,6 +22,7 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -47,6 +48,35 @@ public void testFrameDecoding() throws Exception { verifyAndCloseDecoder(decoder, ctx, data); } + @Test + public void testConsolidationForDecodingNonFullyWrittenByteBuf() { + TransportFrameDecoder decoder = new TransportFrameDecoder(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + ArrayList retained = Lists.newArrayList(); + when(ctx.fireChannelRead(any())).thenAnswer(in -> { + ByteBuf buf = (ByteBuf) in.getArguments()[0]; + retained.add(buf); + return null; + }); + ByteBuf data1 = Unpooled.buffer(1024 * 1024); + data1.writeLong(1024 * 1024 + 8); + data1.writeByte(127); + ByteBuf data2 = Unpooled.buffer(1024 * 1024); + for (int i = 0; i < 1024 * 1024 - 1; i++) { + data2.writeByte(128); + } + int orignalCapacity = data1.capacity() + data2.capacity(); + try { + decoder.channelRead(ctx, data1); + decoder.channelRead(ctx, data2); + assertEquals(1, retained.size()); + assert(retained.get(0).capacity() < orignalCapacity); + } catch (Exception e) { + release(data1); + release(data2); + } + } + @Test public void testInterception() throws Exception { int interceptedReads = 3; From a2536dabf216dbf526cf07deab1a53b52c290972 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Fri, 1 Feb 2019 14:19:40 +0800 Subject: [PATCH 02/14] Update as commented --- .../apache/spark/network/util/TransportFrameDecoderSuite.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index df21f1fe39c2..ded62be55da3 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -22,7 +22,6 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -52,7 +51,7 @@ public void testFrameDecoding() throws Exception { public void testConsolidationForDecodingNonFullyWrittenByteBuf() { TransportFrameDecoder decoder = new TransportFrameDecoder(); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - ArrayList retained = Lists.newArrayList(); + List retained = new ArrayList<>(); when(ctx.fireChannelRead(any())).thenAnswer(in -> { ByteBuf buf = (ByteBuf) in.getArguments()[0]; retained.add(buf); From 9fd8ecde49d2c241d8b0ce2ac54ee6aac6bf9030 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Sat, 2 Feb 2019 17:30:43 +0800 Subject: [PATCH 03/14] Consolidate when exceeds threshold and add tests --- .../spark/network/TransportContext.java | 3 +- .../apache/spark/network/util/NettyUtils.java | 6 +++- .../spark/network/util/TransportConf.java | 8 +++++ .../network/util/TransportFrameDecoder.java | 32 +++++++++++++------ .../util/TransportFrameDecoderSuite.java | 32 +++++++++++++++++++ 5 files changed, 70 insertions(+), 11 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 1a3f3f2a6f24..8f85872d4187 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -191,9 +191,10 @@ public TransportChannelHandler initializePipeline( TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); + ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) - .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) + .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder(conf.consolidateBufsThreshold())) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 423cc0c70ea0..92875e14fb4c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -100,7 +100,11 @@ public static Class getServerChannelClass(IOMode mode) * This is used before all decoders. */ public static TransportFrameDecoder createFrameDecoder() { - return new TransportFrameDecoder(); + return new TransportFrameDecoder(-1L); + } + + public static TransportFrameDecoder createFrameDecoder(long consolidateBufsThreshold) { + return new TransportFrameDecoder(consolidateBufsThreshold); } /** Returns the remote address on the channel or "<unknown remote>" if none exists. */ diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 3628da68f1c6..b4483f0f2c03 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -43,6 +43,7 @@ public class TransportConf { private final String SPARK_NETWORK_IO_LAZYFD_KEY; private final String SPARK_NETWORK_VERBOSE_METRICS; private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY; + private final String SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY; private final ConfigProvider conf; @@ -66,6 +67,7 @@ public TransportConf(String module, ConfigProvider conf) { SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD"); SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics"); SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive"); + SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY = getConfKey("io.consolidateBufsThreshold"); } public int getInt(String name, int defaultValue) { @@ -94,6 +96,12 @@ public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); } + public long consolidateBufsThreshold() { + long defaultConsolidateBufsThreshold = (long)(JavaUtils.byteStringAsBytes( + conf.get("spark.executor.memory", "1g")) * 0.1); + return conf.getLong(SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY, defaultConsolidateBufsThreshold); + } + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec( diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 47cd58b426c1..6d157356b87b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -56,6 +56,18 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { private long nextFrameSize = UNKNOWN_FRAME_SIZE; private volatile Interceptor interceptor; + private long consolidateBufsThreshold = Long.MAX_VALUE; + long consolidatedCount = 0L; + long consolidatedTotalTime = 0L; + + public TransportFrameDecoder() {} + + public TransportFrameDecoder(long consolidateBufsThreshold) { + if (consolidateBufsThreshold > 0) { + this.consolidateBufsThreshold = consolidateBufsThreshold; + } + } + @Override public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception { ByteBuf in = (ByteBuf) data; @@ -141,21 +153,23 @@ private ByteBuf decodeNext() { // Otherwise, create a composite buffer. CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); + long lastConsolidatedCapacity = 0L; while (remaining > 0) { ByteBuf next = nextBufferForFrame(remaining); remaining -= next.readableBytes(); frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); - } - // Because the bytebuf created is far less than it's capacity in most cases, - // we can reduce memory consumption by consolidation - ByteBuf retained = null; - if (frameSize >= 1024 * 1024) { - retained = frame.consolidate(); - } else { - retained = frame; + if (frame.capacity() - lastConsolidatedCapacity >= consolidateBufsThreshold) { + // Because the bytebuf created is far less than it's capacity in most cases, + // we can reduce memory consumption by consolidation + long start = System.currentTimeMillis(); + frame.consolidate(); + consolidatedCount += 1; + consolidatedTotalTime += System.currentTimeMillis() - start; + lastConsolidatedCapacity = frame.capacity(); + } } assert remaining == 0; - return retained; + return frame; } /** diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index ded62be55da3..42ef334e640f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -76,6 +76,38 @@ public void testConsolidationForDecodingNonFullyWrittenByteBuf() { } } + @Test + public void testConsolidationPerf() { + TransportFrameDecoder decoder = new TransportFrameDecoder(300 * 1024 * 1024); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + List retained = new ArrayList<>(); + when(ctx.fireChannelRead(any())).thenAnswer(in -> { + ByteBuf buf = (ByteBuf) in.getArguments()[0]; + retained.add(buf); + return null; + }); + + ByteBuf buf = Unpooled.buffer(8); + try { + buf.writeLong(8 + 1024 * 1024 * 1000); + decoder.channelRead(ctx, buf); + for (int i = 0; i < 1000; i++) { + buf = Unpooled.buffer(1024 * 1024 * 2); + ByteBuf writtenBuf = Unpooled.buffer(1024 * 1024).writerIndex(1024 * 1024); + buf.writeBytes(writtenBuf); + writtenBuf.release(); + decoder.channelRead(ctx, buf); + } + assertEquals(1, retained.size()); + assertEquals(1024 * 1024 * 1000, retained.get(0).capacity()); + System.out.println("consolidated " + decoder.consolidatedCount + " times cost " + decoder.consolidatedTotalTime + " milis"); + } catch (Exception e) { + if (buf != null) { + release(buf); + } + } + } + @Test public void testInterception() throws Exception { int interceptedReads = 3; From 9515621403185b1de0c91b79ed1310ebfdba0fcd Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sun, 3 Feb 2019 10:29:53 +0800 Subject: [PATCH 04/14] fix consolidateBufsThreshold --- .../apache/spark/network/util/TransportConf.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index b4483f0f2c03..e9feb5dcb39f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -96,9 +96,20 @@ public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); } + /** The threshold for consolidation, it is derived upon the memoryOverhead in yarn mode. */ public long consolidateBufsThreshold() { - long defaultConsolidateBufsThreshold = (long)(JavaUtils.byteStringAsBytes( - conf.get("spark.executor.memory", "1g")) * 0.1); + boolean isDriver = conf.get("spark.executor.id").equals("driver"); + final long MEMORY_OVERHEAD_MIN = 384L; + final double MEMORY_OVERHEAD_FACTOR = 0.1; + final double SHUFFLE_MEMORY_OVERHEAD_FACTOR = MEMORY_OVERHEAD_FACTOR * 0.6; + final double SHUFFLE_MEMORY_OVERHEAD_SAFE_FACTOR = SHUFFLE_MEMORY_OVERHEAD_FACTOR * 0.5; + long memory; + if (isDriver) { + memory = Math.max(JavaUtils.byteStringAsBytes(conf.get("spark.driver.memory")), MEMORY_OVERHEAD_MIN); + } else { + memory = Math.max(JavaUtils.byteStringAsBytes(conf.get("spark.executor.memory")), MEMORY_OVERHEAD_MIN); + } + long defaultConsolidateBufsThreshold = (long)(memory * SHUFFLE_MEMORY_OVERHEAD_SAFE_FACTOR); return conf.getLong(SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY, defaultConsolidateBufsThreshold); } From 1e3e9cfcb818f72b222bba5fd00835e4417d5e87 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Fri, 8 Feb 2019 14:30:13 +0800 Subject: [PATCH 05/14] Refine as commented --- .../spark/network/TransportContext.java | 3 +- .../apache/spark/network/util/NettyUtils.java | 6 +- .../spark/network/util/TransportConf.java | 19 ---- .../network/util/TransportFrameDecoder.java | 93 +++++++++++-------- .../util/TransportFrameDecoderSuite.java | 57 +++++++----- 5 files changed, 89 insertions(+), 89 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 8f85872d4187..1a3f3f2a6f24 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -191,10 +191,9 @@ public TransportChannelHandler initializePipeline( TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); - ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) - .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder(conf.consolidateBufsThreshold())) + .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 92875e14fb4c..423cc0c70ea0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -100,11 +100,7 @@ public static Class getServerChannelClass(IOMode mode) * This is used before all decoders. */ public static TransportFrameDecoder createFrameDecoder() { - return new TransportFrameDecoder(-1L); - } - - public static TransportFrameDecoder createFrameDecoder(long consolidateBufsThreshold) { - return new TransportFrameDecoder(consolidateBufsThreshold); + return new TransportFrameDecoder(); } /** Returns the remote address on the channel or "<unknown remote>" if none exists. */ diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index e9feb5dcb39f..3628da68f1c6 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -43,7 +43,6 @@ public class TransportConf { private final String SPARK_NETWORK_IO_LAZYFD_KEY; private final String SPARK_NETWORK_VERBOSE_METRICS; private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY; - private final String SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY; private final ConfigProvider conf; @@ -67,7 +66,6 @@ public TransportConf(String module, ConfigProvider conf) { SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD"); SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics"); SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive"); - SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY = getConfKey("io.consolidateBufsThreshold"); } public int getInt(String name, int defaultValue) { @@ -96,23 +94,6 @@ public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); } - /** The threshold for consolidation, it is derived upon the memoryOverhead in yarn mode. */ - public long consolidateBufsThreshold() { - boolean isDriver = conf.get("spark.executor.id").equals("driver"); - final long MEMORY_OVERHEAD_MIN = 384L; - final double MEMORY_OVERHEAD_FACTOR = 0.1; - final double SHUFFLE_MEMORY_OVERHEAD_FACTOR = MEMORY_OVERHEAD_FACTOR * 0.6; - final double SHUFFLE_MEMORY_OVERHEAD_SAFE_FACTOR = SHUFFLE_MEMORY_OVERHEAD_FACTOR * 0.5; - long memory; - if (isDriver) { - memory = Math.max(JavaUtils.byteStringAsBytes(conf.get("spark.driver.memory")), MEMORY_OVERHEAD_MIN); - } else { - memory = Math.max(JavaUtils.byteStringAsBytes(conf.get("spark.executor.memory")), MEMORY_OVERHEAD_MIN); - } - long defaultConsolidateBufsThreshold = (long)(memory * SHUFFLE_MEMORY_OVERHEAD_SAFE_FACTOR); - return conf.getLong(SPARK_NETWORK_IO_CONSOLIDATEBUFS_THRESHOLD_KEY, defaultConsolidateBufsThreshold); - } - /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec( diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 6d157356b87b..0087cdb799e3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -19,6 +19,7 @@ import java.util.LinkedList; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -48,24 +49,27 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { private static final int LENGTH_SIZE = 8; private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE; private static final int UNKNOWN_FRAME_SIZE = -1; + private static final long DEFAULT_CONSOLIDATE_FRAME_BUFS_DELTA_THRESHOLD = 20 * 1024 * 1024; private final LinkedList buffers = new LinkedList<>(); private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE); + private CompositeByteBuf frameBuf = null; + private long consolidateFrameBufsDeltaThreshold; + private long consolidatedFrameBufSize = 0; + private int consolidatedNumComponents = 0; private long totalSize = 0; private long nextFrameSize = UNKNOWN_FRAME_SIZE; + private int frameRemainingBytes = UNKNOWN_FRAME_SIZE; private volatile Interceptor interceptor; - private long consolidateBufsThreshold = Long.MAX_VALUE; - long consolidatedCount = 0L; - long consolidatedTotalTime = 0L; - - public TransportFrameDecoder() {} + public TransportFrameDecoder() { + this(DEFAULT_CONSOLIDATE_FRAME_BUFS_DELTA_THRESHOLD); + } - public TransportFrameDecoder(long consolidateBufsThreshold) { - if (consolidateBufsThreshold > 0) { - this.consolidateBufsThreshold = consolidateBufsThreshold; - } + @VisibleForTesting + TransportFrameDecoder(long consolidateFrameBufsDeltaThreshold) { + this.consolidateFrameBufsDeltaThreshold = consolidateFrameBufsDeltaThreshold; } @Override @@ -135,41 +139,54 @@ private long decodeFrameSize() { private ByteBuf decodeNext() { long frameSize = decodeFrameSize(); - if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) { + if (frameSize == UNKNOWN_FRAME_SIZE) { return null; } - // Reset size for next frame. - nextFrameSize = UNKNOWN_FRAME_SIZE; - - Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize); - Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize); - - // If the first buffer holds the entire frame, return it. - int remaining = (int) frameSize; - if (buffers.getFirst().readableBytes() >= remaining) { - return nextBufferForFrame(remaining); + if (frameBuf == null) { + Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, + "Too large frame: %s", frameSize); + Preconditions.checkArgument(frameSize > 0, + "Frame length should be positive: %s", frameSize); + frameRemainingBytes = (int) frameSize; + + // If buffers is empty, then return immediately for more input data. Otherwise, if the + // first buffer holds the entire frame, we attempt to build frame with it and return. + // Other cases, create a composite buffer to manage all the buffers. + if (buffers.isEmpty()) { + return null; + } else if (buffers.getFirst().readableBytes() >= frameRemainingBytes) { + // Reset buf and size for next frame. + frameBuf = null; + nextFrameSize = UNKNOWN_FRAME_SIZE; + return nextBufferForFrame(frameRemainingBytes); + } else { + frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); + } } - // Otherwise, create a composite buffer. - CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); - long lastConsolidatedCapacity = 0L; - while (remaining > 0) { - ByteBuf next = nextBufferForFrame(remaining); - remaining -= next.readableBytes(); - frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); - if (frame.capacity() - lastConsolidatedCapacity >= consolidateBufsThreshold) { - // Because the bytebuf created is far less than it's capacity in most cases, - // we can reduce memory consumption by consolidation - long start = System.currentTimeMillis(); - frame.consolidate(); - consolidatedCount += 1; - consolidatedTotalTime += System.currentTimeMillis() - start; - lastConsolidatedCapacity = frame.capacity(); - } + while (frameRemainingBytes > 0 && !buffers.isEmpty()) { + ByteBuf next = nextBufferForFrame(frameRemainingBytes); + frameRemainingBytes -= next.readableBytes(); + frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() + next.readableBytes()); } - assert remaining == 0; - return frame; + // If the delta size of frameBuf exceeds the threshold, then we do consolidation + // to reduce memory consumption. + if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateFrameBufsDeltaThreshold) { + int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents; + frameBuf.consolidate(consolidatedNumComponents, newNumComponents); + consolidatedFrameBufSize = frameBuf.capacity(); + consolidatedNumComponents = frameBuf.numComponents(); + } + if (frameRemainingBytes > 0) { + return null; + } + + // Reset buf and size for next frame. + ByteBuf frameBufCopy = frameBuf.duplicate(); + frameBuf = null; + nextFrameSize = UNKNOWN_FRAME_SIZE; + return frameBufCopy; } /** diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index 42ef334e640f..bb6c843dfca9 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -77,33 +77,40 @@ public void testConsolidationForDecodingNonFullyWrittenByteBuf() { } @Test - public void testConsolidationPerf() { - TransportFrameDecoder decoder = new TransportFrameDecoder(300 * 1024 * 1024); - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - List retained = new ArrayList<>(); - when(ctx.fireChannelRead(any())).thenAnswer(in -> { - ByteBuf buf = (ByteBuf) in.getArguments()[0]; - retained.add(buf); - return null; - }); + public void testConsolidationPerf() throws Exception { + long[] testingConsolidateThresholds = new long[] { 1024 * 1024, 5 * 1024 * 1024, 10 * 1024 * 1024, 20 * 1024 * 1024, + 30 * 1024 * 1024, 50 * 1024 * 1024, 64 * 1024 * 1024, 100 * 1024 * 1024, 300 * 1024 * 1024, 500 * 1024 * 1024, Long.MAX_VALUE }; + for (long threshold : testingConsolidateThresholds) { + TransportFrameDecoder decoder = new TransportFrameDecoder(threshold); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + List retained = new ArrayList<>(); + when(ctx.fireChannelRead(any())).thenAnswer(in -> { + ByteBuf buf = (ByteBuf) in.getArguments()[0]; + retained.add(buf); + return null; + }); - ByteBuf buf = Unpooled.buffer(8); - try { - buf.writeLong(8 + 1024 * 1024 * 1000); - decoder.channelRead(ctx, buf); - for (int i = 0; i < 1000; i++) { - buf = Unpooled.buffer(1024 * 1024 * 2); - ByteBuf writtenBuf = Unpooled.buffer(1024 * 1024).writerIndex(1024 * 1024); - buf.writeBytes(writtenBuf); - writtenBuf.release(); + try { + long start = System.currentTimeMillis(); + ByteBuf buf = Unpooled.buffer(8); + buf.writeLong(8 + 1024 * 1024 * 1000); decoder.channelRead(ctx, buf); - } - assertEquals(1, retained.size()); - assertEquals(1024 * 1024 * 1000, retained.get(0).capacity()); - System.out.println("consolidated " + decoder.consolidatedCount + " times cost " + decoder.consolidatedTotalTime + " milis"); - } catch (Exception e) { - if (buf != null) { - release(buf); + for (int i = 0; i < 1000; i++) { + buf = Unpooled.buffer(1024 * 1024 * 2); + ByteBuf writtenBuf = Unpooled.buffer(1024 * 1024).writerIndex(1024 * 1024); + buf.writeBytes(writtenBuf); + writtenBuf.release(); + decoder.channelRead(ctx, buf); + } + assertEquals(1, retained.size()); + assertEquals(1024 * 1024 * 1000, retained.get(0).capacity()); + long costTime = System.currentTimeMillis() - start; + System.out.println("Build frame buf with consolidation threshold " + threshold + + " cost " + costTime + " milis"); + } finally { + for (ByteBuf buf : retained) { + release(buf); + } } } } From 96a71ed1c8a3087620dc01ac456065406311a61c Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Fri, 8 Feb 2019 14:34:50 +0800 Subject: [PATCH 06/14] Fix style --- .../spark/network/util/TransportFrameDecoderSuite.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index bb6c843dfca9..3210828c4abd 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -78,8 +78,10 @@ public void testConsolidationForDecodingNonFullyWrittenByteBuf() { @Test public void testConsolidationPerf() throws Exception { - long[] testingConsolidateThresholds = new long[] { 1024 * 1024, 5 * 1024 * 1024, 10 * 1024 * 1024, 20 * 1024 * 1024, - 30 * 1024 * 1024, 50 * 1024 * 1024, 64 * 1024 * 1024, 100 * 1024 * 1024, 300 * 1024 * 1024, 500 * 1024 * 1024, Long.MAX_VALUE }; + long[] testingConsolidateThresholds = new long[] { + 1024 * 1024, 5 * 1024 * 1024, 10 * 1024 * 1024, 20 * 1024 * 1024, + 30 * 1024 * 1024, 50 * 1024 * 1024, 80 * 1024 * 1024, 100 * 1024 * 1024, + 300 * 1024 * 1024, 500 * 1024 * 1024, Long.MAX_VALUE }; for (long threshold : testingConsolidateThresholds) { TransportFrameDecoder decoder = new TransportFrameDecoder(threshold); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); From f872e24e67fd353d4574976751e0dc33c48ae7e2 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sat, 9 Feb 2019 12:09:00 +0800 Subject: [PATCH 07/14] Update as commented --- .../network/util/TransportFrameDecoder.java | 20 +++++----- .../util/TransportFrameDecoderSuite.java | 38 +++++++++++++------ 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 0087cdb799e3..3d2316c98bbd 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -53,8 +53,9 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { private final LinkedList buffers = new LinkedList<>(); private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE); + private final long consolidateFrameBufsDeltaThreshold; + private CompositeByteBuf frameBuf = null; - private long consolidateFrameBufsDeltaThreshold; private long consolidatedFrameBufSize = 0; private int consolidatedNumComponents = 0; @@ -150,19 +151,20 @@ private ByteBuf decodeNext() { "Frame length should be positive: %s", frameSize); frameRemainingBytes = (int) frameSize; - // If buffers is empty, then return immediately for more input data. Otherwise, if the - // first buffer holds the entire frame, we attempt to build frame with it and return. - // Other cases, create a composite buffer to manage all the buffers. + // If buffers is empty, then return immediately for more input data. if (buffers.isEmpty()) { return null; - } else if (buffers.getFirst().readableBytes() >= frameRemainingBytes) { + } + // Otherwise, if the first buffer holds the entire frame, we attempt to + // build frame with it and return. + if (buffers.getFirst().readableBytes() >= frameRemainingBytes) { // Reset buf and size for next frame. frameBuf = null; nextFrameSize = UNKNOWN_FRAME_SIZE; return nextBufferForFrame(frameRemainingBytes); - } else { - frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); } + // Other cases, create a composite buffer to manage all the buffers. + frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); } while (frameRemainingBytes > 0 && !buffers.isEmpty()) { @@ -183,10 +185,10 @@ private ByteBuf decodeNext() { } // Reset buf and size for next frame. - ByteBuf frameBufCopy = frameBuf.duplicate(); + ByteBuf frame = frameBuf; frameBuf = null; nextFrameSize = UNKNOWN_FRAME_SIZE; - return frameBufCopy; + return frame; } /** diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index 3210828c4abd..6f07a7c0b7fa 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -27,11 +27,15 @@ import io.netty.channel.ChannelHandlerContext; import org.junit.AfterClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.junit.Assert.*; import static org.mockito.Mockito.*; public class TransportFrameDecoderSuite { + private static final Logger logger = LoggerFactory.getLogger(TransportFrameDecoderSuite.class); private static Random RND = new Random(); @AfterClass @@ -79,9 +83,17 @@ public void testConsolidationForDecodingNonFullyWrittenByteBuf() { @Test public void testConsolidationPerf() throws Exception { long[] testingConsolidateThresholds = new long[] { - 1024 * 1024, 5 * 1024 * 1024, 10 * 1024 * 1024, 20 * 1024 * 1024, - 30 * 1024 * 1024, 50 * 1024 * 1024, 80 * 1024 * 1024, 100 * 1024 * 1024, - 300 * 1024 * 1024, 500 * 1024 * 1024, Long.MAX_VALUE }; + ByteUnit.MiB.toBytes(1), + ByteUnit.MiB.toBytes(5), + ByteUnit.MiB.toBytes(10), + ByteUnit.MiB.toBytes(20), + ByteUnit.MiB.toBytes(30), + ByteUnit.MiB.toBytes(50), + ByteUnit.MiB.toBytes(80), + ByteUnit.MiB.toBytes(100), + ByteUnit.MiB.toBytes(300), + ByteUnit.MiB.toBytes(500), + Long.MAX_VALUE }; for (long threshold : testingConsolidateThresholds) { TransportFrameDecoder decoder = new TransportFrameDecoder(threshold); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); @@ -95,20 +107,24 @@ public void testConsolidationPerf() throws Exception { try { long start = System.currentTimeMillis(); ByteBuf buf = Unpooled.buffer(8); - buf.writeLong(8 + 1024 * 1024 * 1000); + long targetBytes = ByteUnit.GiB.toBytes(1); + int pieceBytes = (int)ByteUnit.KiB.toBytes(32); + long writtenBytes = 0; + buf.writeLong(8 + ByteUnit.GiB.toBytes(1)); decoder.channelRead(ctx, buf); - for (int i = 0; i < 1000; i++) { - buf = Unpooled.buffer(1024 * 1024 * 2); - ByteBuf writtenBuf = Unpooled.buffer(1024 * 1024).writerIndex(1024 * 1024); + while (writtenBytes < targetBytes) { + buf = Unpooled.buffer(pieceBytes * 2); + ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes); buf.writeBytes(writtenBuf); writtenBuf.release(); decoder.channelRead(ctx, buf); + writtenBytes += pieceBytes; } + long elapsedTime = System.currentTimeMillis() - start; + logger.info("Writing 1GiB frame buf with consolidation of threshold " + threshold + + " took " + elapsedTime + " milis"); assertEquals(1, retained.size()); - assertEquals(1024 * 1024 * 1000, retained.get(0).capacity()); - long costTime = System.currentTimeMillis() - start; - System.out.println("Build frame buf with consolidation threshold " + threshold - + " cost " + costTime + " milis"); + assertEquals(targetBytes, retained.get(0).capacity()); } finally { for (ByteBuf buf : retained) { release(buf); From 3fb7484b92995c73df4ae041a5b5b19a7826e787 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sun, 10 Feb 2019 20:18:31 +0800 Subject: [PATCH 08/14] Reword consolidation threshold variable name --- .../spark/network/util/TransportFrameDecoder.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 3d2316c98bbd..261814b1f935 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -49,11 +49,11 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { private static final int LENGTH_SIZE = 8; private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE; private static final int UNKNOWN_FRAME_SIZE = -1; - private static final long DEFAULT_CONSOLIDATE_FRAME_BUFS_DELTA_THRESHOLD = 20 * 1024 * 1024; + private static final long CONSOLIDATE_THRESHOLD = 20 * 1024 * 1024; private final LinkedList buffers = new LinkedList<>(); private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE); - private final long consolidateFrameBufsDeltaThreshold; + private final long consolidateThreshold; private CompositeByteBuf frameBuf = null; private long consolidatedFrameBufSize = 0; @@ -65,12 +65,12 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { private volatile Interceptor interceptor; public TransportFrameDecoder() { - this(DEFAULT_CONSOLIDATE_FRAME_BUFS_DELTA_THRESHOLD); + this(CONSOLIDATE_THRESHOLD); } @VisibleForTesting - TransportFrameDecoder(long consolidateFrameBufsDeltaThreshold) { - this.consolidateFrameBufsDeltaThreshold = consolidateFrameBufsDeltaThreshold; + TransportFrameDecoder(long consolidateThreshold) { + this.consolidateThreshold = consolidateThreshold; } @Override @@ -174,7 +174,7 @@ private ByteBuf decodeNext() { } // If the delta size of frameBuf exceeds the threshold, then we do consolidation // to reduce memory consumption. - if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateFrameBufsDeltaThreshold) { + if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateThreshold) { int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents; frameBuf.consolidate(consolidatedNumComponents, newNumComponents); consolidatedFrameBufSize = frameBuf.capacity(); From bc44188fcc4069f5d68af8685b6ff49ad6490be3 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Thu, 14 Feb 2019 22:19:41 +0800 Subject: [PATCH 09/14] Fix tests and minor updates --- .../network/util/TransportFrameDecoder.java | 3 +- .../util/TransportFrameDecoderSuite.java | 34 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 261814b1f935..aa70eec4c7e9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -170,7 +170,8 @@ private ByteBuf decodeNext() { while (frameRemainingBytes > 0 && !buffers.isEmpty()) { ByteBuf next = nextBufferForFrame(frameRemainingBytes); frameRemainingBytes -= next.readableBytes(); - frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() + next.readableBytes()); +// frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() + next.readableBytes()); + frameBuf.addComponent(true, next); } // If the delta size of frameBuf exceeds the threshold, then we do consolidation // to reduce memory consumption. diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index 6f07a7c0b7fa..f3da13b69a90 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -52,7 +52,7 @@ public void testFrameDecoding() throws Exception { } @Test - public void testConsolidationForDecodingNonFullyWrittenByteBuf() { + public void testConsolidationForDecodingNonFullyWrittenByteBuf() throws Exception { TransportFrameDecoder decoder = new TransportFrameDecoder(); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); List retained = new ArrayList<>(); @@ -61,22 +61,24 @@ public void testConsolidationForDecodingNonFullyWrittenByteBuf() { retained.add(buf); return null; }); - ByteBuf data1 = Unpooled.buffer(1024 * 1024); - data1.writeLong(1024 * 1024 + 8); - data1.writeByte(127); - ByteBuf data2 = Unpooled.buffer(1024 * 1024); - for (int i = 0; i < 1024 * 1024 - 1; i++) { - data2.writeByte(128); - } - int orignalCapacity = data1.capacity() + data2.capacity(); + int targetBytes = (int) ByteUnit.MiB.toBytes(50); + int piece1Bytes = RND.nextInt(50); + int piece2Bytes = targetBytes - piece1Bytes; + ByteBuf piece1 = Unpooled.buffer(piece1Bytes * 2); + piece1.writeLong(targetBytes + 8); + byte[] piece1Data = new byte[piece1Bytes]; + piece1.writeBytes(piece1Data); + ByteBuf piece2 = Unpooled.buffer(piece2Bytes * 2); + byte[] piece2Data = new byte[piece2Bytes]; + piece2.writeBytes(piece2Data); try { - decoder.channelRead(ctx, data1); - decoder.channelRead(ctx, data2); + decoder.channelRead(ctx, piece1); + decoder.channelRead(ctx, piece2); assertEquals(1, retained.size()); - assert(retained.get(0).capacity() < orignalCapacity); - } catch (Exception e) { - release(data1); - release(data2); + assertEquals(targetBytes, retained.get(0).capacity()); + } finally { + release(piece1); + release(piece2); } } @@ -108,7 +110,7 @@ public void testConsolidationPerf() throws Exception { long start = System.currentTimeMillis(); ByteBuf buf = Unpooled.buffer(8); long targetBytes = ByteUnit.GiB.toBytes(1); - int pieceBytes = (int)ByteUnit.KiB.toBytes(32); + int pieceBytes = (int) ByteUnit.KiB.toBytes(32); long writtenBytes = 0; buf.writeLong(8 + ByteUnit.GiB.toBytes(1)); decoder.channelRead(ctx, buf); From ef63cdbe85430b35029d925529ed459926842b76 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Thu, 14 Feb 2019 22:21:10 +0800 Subject: [PATCH 10/14] remove useless code --- .../org/apache/spark/network/util/TransportFrameDecoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index aa70eec4c7e9..3dc71730e01e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -170,7 +170,6 @@ private ByteBuf decodeNext() { while (frameRemainingBytes > 0 && !buffers.isEmpty()) { ByteBuf next = nextBufferForFrame(frameRemainingBytes); frameRemainingBytes -= next.readableBytes(); -// frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() + next.readableBytes()); frameBuf.addComponent(true, next); } // If the delta size of frameBuf exceeds the threshold, then we do consolidation From 449efedf101fa4e004ec48c55e50f7891a6c4d39 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Fri, 15 Feb 2019 11:09:09 +0800 Subject: [PATCH 11/14] Remove testConsolidationForDecodingNonFullyWrittenByteBuf --- .../util/TransportFrameDecoderSuite.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index f3da13b69a90..f81188e074bf 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -51,37 +51,6 @@ public void testFrameDecoding() throws Exception { verifyAndCloseDecoder(decoder, ctx, data); } - @Test - public void testConsolidationForDecodingNonFullyWrittenByteBuf() throws Exception { - TransportFrameDecoder decoder = new TransportFrameDecoder(); - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - List retained = new ArrayList<>(); - when(ctx.fireChannelRead(any())).thenAnswer(in -> { - ByteBuf buf = (ByteBuf) in.getArguments()[0]; - retained.add(buf); - return null; - }); - int targetBytes = (int) ByteUnit.MiB.toBytes(50); - int piece1Bytes = RND.nextInt(50); - int piece2Bytes = targetBytes - piece1Bytes; - ByteBuf piece1 = Unpooled.buffer(piece1Bytes * 2); - piece1.writeLong(targetBytes + 8); - byte[] piece1Data = new byte[piece1Bytes]; - piece1.writeBytes(piece1Data); - ByteBuf piece2 = Unpooled.buffer(piece2Bytes * 2); - byte[] piece2Data = new byte[piece2Bytes]; - piece2.writeBytes(piece2Data); - try { - decoder.channelRead(ctx, piece1); - decoder.channelRead(ctx, piece2); - assertEquals(1, retained.size()); - assertEquals(targetBytes, retained.get(0).capacity()); - } finally { - release(piece1); - release(piece2); - } - } - @Test public void testConsolidationPerf() throws Exception { long[] testingConsolidateThresholds = new long[] { From 5f8c4eb0823148ed57d5d7d790c3d77c2fae5abe Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Fri, 15 Feb 2019 22:16:02 +0800 Subject: [PATCH 12/14] Fix consolidatedFrameBufSize and consolidatedNumComponents not reset --- .../org/apache/spark/network/util/TransportFrameDecoder.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 3dc71730e01e..1980361a1552 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -187,6 +187,8 @@ private ByteBuf decodeNext() { // Reset buf and size for next frame. ByteBuf frame = frameBuf; frameBuf = null; + consolidatedFrameBufSize = 0; + consolidatedNumComponents = 0; nextFrameSize = UNKNOWN_FRAME_SIZE; return frame; } From 3aad18a4ba96b5717c16ebc8a0d23b0a3986c634 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sat, 16 Feb 2019 11:05:03 +0800 Subject: [PATCH 13/14] Testing multiple messages --- .../util/TransportFrameDecoderSuite.java | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index f81188e074bf..06eeb757fd90 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -75,32 +75,40 @@ public void testConsolidationPerf() throws Exception { return null; }); - try { - long start = System.currentTimeMillis(); - ByteBuf buf = Unpooled.buffer(8); - long targetBytes = ByteUnit.GiB.toBytes(1); - int pieceBytes = (int) ByteUnit.KiB.toBytes(32); - long writtenBytes = 0; - buf.writeLong(8 + ByteUnit.GiB.toBytes(1)); - decoder.channelRead(ctx, buf); - while (writtenBytes < targetBytes) { - buf = Unpooled.buffer(pieceBytes * 2); - ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes); - buf.writeBytes(writtenBuf); - writtenBuf.release(); + // Testing multiple messages + int numMessages = 3; + long targetBytes = ByteUnit.GiB.toBytes(1); + int pieceBytes = (int) ByteUnit.KiB.toBytes(32); + for (int i = 0; i < numMessages; i++) { + try { + long start = System.currentTimeMillis(); + long writtenBytes = 0; + ByteBuf buf = Unpooled.buffer(8); + buf.writeLong(8 + ByteUnit.GiB.toBytes(1)); decoder.channelRead(ctx, buf); - writtenBytes += pieceBytes; - } - long elapsedTime = System.currentTimeMillis() - start; - logger.info("Writing 1GiB frame buf with consolidation of threshold " + threshold - + " took " + elapsedTime + " milis"); - assertEquals(1, retained.size()); - assertEquals(targetBytes, retained.get(0).capacity()); - } finally { - for (ByteBuf buf : retained) { - release(buf); + while (writtenBytes < targetBytes) { + buf = Unpooled.buffer(pieceBytes * 2); + ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes); + buf.writeBytes(writtenBuf); + writtenBuf.release(); + decoder.channelRead(ctx, buf); + writtenBytes += pieceBytes; + } + long elapsedTime = System.currentTimeMillis() - start; + logger.info("Writing 1GiB frame buf with consolidation of threshold " + threshold + + " took " + elapsedTime + " milis"); + } finally { + for (ByteBuf buf : retained) { + release(buf); + } } } + long totalBytesGot = 0; + for (ByteBuf buf : retained) { + totalBytesGot += buf.capacity(); + } + assertEquals(numMessages, retained.size()); + assertEquals(targetBytes * numMessages, totalBytesGot); } } From 6ca6f714d689a136679fda32484eaf92f30089e9 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Wed, 20 Feb 2019 21:09:03 +0800 Subject: [PATCH 14/14] Lower the testing message size and not counting allocation time --- .../network/util/TransportFrameDecoderSuite.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index 06eeb757fd90..4b67aa80351d 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -77,26 +77,28 @@ public void testConsolidationPerf() throws Exception { // Testing multiple messages int numMessages = 3; - long targetBytes = ByteUnit.GiB.toBytes(1); + long targetBytes = ByteUnit.MiB.toBytes(300); int pieceBytes = (int) ByteUnit.KiB.toBytes(32); for (int i = 0; i < numMessages; i++) { try { - long start = System.currentTimeMillis(); long writtenBytes = 0; + long totalTime = 0; ByteBuf buf = Unpooled.buffer(8); - buf.writeLong(8 + ByteUnit.GiB.toBytes(1)); + buf.writeLong(8 + targetBytes); decoder.channelRead(ctx, buf); while (writtenBytes < targetBytes) { buf = Unpooled.buffer(pieceBytes * 2); ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes); buf.writeBytes(writtenBuf); writtenBuf.release(); + long start = System.currentTimeMillis(); decoder.channelRead(ctx, buf); + long elapsedTime = System.currentTimeMillis() - start; + totalTime += elapsedTime; writtenBytes += pieceBytes; } - long elapsedTime = System.currentTimeMillis() - start; - logger.info("Writing 1GiB frame buf with consolidation of threshold " + threshold - + " took " + elapsedTime + " milis"); + logger.info("Writing 300MiB frame buf with consolidation of threshold " + threshold + + " took " + totalTime + " milis"); } finally { for (ByteBuf buf : retained) { release(buf);