diff --git a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyServer.java b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyServer.java index e2f00637f..8c7aaa397 100644 --- a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyServer.java +++ b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyServer.java @@ -94,7 +94,7 @@ public NettyServer(InetSocketAddress addr, Configuration conf) { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("encoder", NettyFrameEncoder.INSTANCE) - .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) + .addLast("frameDecoder", NettyUtils.createFrameDecoder()) .addLast("decoder", NettyFrameDecoder.INSTANCE) .addLast("handler", new NettyServerMLHandler()); } diff --git a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyTransceiver.java b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyTransceiver.java index 403b3162f..ebcc6b560 100644 --- a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyTransceiver.java +++ b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyTransceiver.java @@ -110,7 +110,7 @@ public NettyTransceiver( protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("encoder", NettyFrameEncoder.INSTANCE) - .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) + .addLast("frameDecoder", NettyUtils.createFrameDecoder()) .addLast("decoder", NettyFrameDecoder.INSTANCE) .addLast( "readTimeout", diff --git a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyUtils.java b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyUtils.java index 316f3d154..ec2e07bfe 100644 --- a/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyUtils.java +++ b/angel-ps/core/src/main/java/com/tencent/angel/ipc/NettyUtils.java @@ -30,6 +30,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; @@ -84,8 +86,13 @@ public static Class getServerChannelClass(IOMode mode) * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame. * This is used before all decoders. */ - public static TransportFrameDecoder createFrameDecoder() { - return new TransportFrameDecoder(); + public static ByteToMessageDecoder createFrameDecoder() { + // maxFrameLength = 2G + // lengthFieldOffset = 0 + // lengthFieldLength = 8 + // lengthAdjustment = -8, i.e. exclude the 8 byte length itself + // initialBytesToStrip = 8, i.e. strip out the length field itself + return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, -8, 8); } /** Returns the remote address on the channel or "<unknown remote>" if none exists. */ diff --git a/angel-ps/core/src/main/java/com/tencent/angel/ipc/TransportFrameDecoder.java b/angel-ps/core/src/main/java/com/tencent/angel/ipc/TransportFrameDecoder.java deleted file mode 100644 index 32d3452c8..000000000 --- a/angel-ps/core/src/main/java/com/tencent/angel/ipc/TransportFrameDecoder.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.tencent.angel.ipc; - -import java.util.LinkedList; - -import com.google.common.base.Preconditions; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; - -/** - * A customized frame decoder that allows intercepting raw data. - *

- * This behaves like Netty's frame decoder (with harcoded parameters that match this library's - * needs), except it allows an interceptor to be installed to read data directly before it's - * framed. - *

- * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's - * decoded, instead of building as many frames as the current buffer allows and dispatching - * all of them. This allows a child handler to install an interceptor if needed. - *

- * If an interceptor is installed, framing stops, and data is instead fed directly to the - * interceptor. When the interceptor indicates that it doesn't need to read any more data, - * framing resumes. Interceptors should not hold references to the data buffers provided - * to their handle() method. - */ -public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { - - public static final String HANDLER_NAME = "frameDecoder"; - private static final int LENGTH_SIZE = 8; - private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE; - private static final int UNKNOWN_FRAME_SIZE = -1; - - private final LinkedList buffers = new LinkedList<>(); - private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE); - - private long totalSize = 0; - private long nextFrameSize = UNKNOWN_FRAME_SIZE; - private volatile Interceptor interceptor; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception { - ByteBuf in = (ByteBuf) data; - buffers.add(in); - totalSize += in.readableBytes(); - - while (!buffers.isEmpty()) { - // First, feed the interceptor, and if it's still, active, try again. - if (interceptor != null) { - ByteBuf first = buffers.getFirst(); - int available = first.readableBytes(); - if (feedInterceptor(first)) { - assert !first.isReadable() : "Interceptor still active but buffer has data."; - } - - int read = available - first.readableBytes(); - if (read == available) { - buffers.removeFirst().release(); - } - totalSize -= read; - } else { - // Interceptor is not active, so try to decode one frame. - ByteBuf frame = decodeNext(); - if (frame == null) { - break; - } - ctx.fireChannelRead(frame); - } - } - } - - private long decodeFrameSize() { - if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < LENGTH_SIZE) { - return nextFrameSize; - } - - // We know there's enough data. If the first buffer contains all the data, great. Otherwise, - // hold the bytes for the frame length in a composite buffer until we have enough data to read - // the frame size. Normally, it should be rare to need more than one buffer to read the frame - // size. - ByteBuf first = buffers.getFirst(); - if (first.readableBytes() >= LENGTH_SIZE) { - nextFrameSize = first.readLong() - LENGTH_SIZE; - totalSize -= LENGTH_SIZE; - if (!first.isReadable()) { - buffers.removeFirst().release(); - } - return nextFrameSize; - } - - while (frameLenBuf.readableBytes() < LENGTH_SIZE) { - ByteBuf next = buffers.getFirst(); - int toRead = Math.min(next.readableBytes(), LENGTH_SIZE - frameLenBuf.readableBytes()); - frameLenBuf.writeBytes(next, toRead); - if (!next.isReadable()) { - buffers.removeFirst().release(); - } - } - - nextFrameSize = frameLenBuf.readLong() - LENGTH_SIZE; - totalSize -= LENGTH_SIZE; - frameLenBuf.clear(); - return nextFrameSize; - } - - private ByteBuf decodeNext() throws Exception { - long frameSize = decodeFrameSize(); - if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) { - return null; - } - - // Reset size for next frame. - nextFrameSize = UNKNOWN_FRAME_SIZE; - - Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize); - Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize); - - // If the first buffer holds the entire frame, return it. - int remaining = (int) frameSize; - if (buffers.getFirst().readableBytes() >= remaining) { - return nextBufferForFrame(remaining); - } - - // Otherwise, create a composite buffer. - CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); - while (remaining > 0) { - ByteBuf next = nextBufferForFrame(remaining); - remaining -= next.readableBytes(); - frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); - } - assert remaining == 0; - return frame; - } - - /** - * Takes the first buffer in the internal list, and either adjust it to fit in the frame - * (by taking a slice out of it) or remove it from the internal list. - */ - private ByteBuf nextBufferForFrame(int bytesToRead) { - ByteBuf buf = buffers.getFirst(); - ByteBuf frame; - - if (buf.readableBytes() > bytesToRead) { - frame = buf.retain().readSlice(bytesToRead); - totalSize -= bytesToRead; - } else { - frame = buf; - buffers.removeFirst(); - totalSize -= frame.readableBytes(); - } - - return frame; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - for (ByteBuf b : buffers) { - b.release(); - } - if (interceptor != null) { - interceptor.channelInactive(); - } - frameLenBuf.release(); - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (interceptor != null) { - interceptor.exceptionCaught(cause); - } - super.exceptionCaught(ctx, cause); - } - - public void setInterceptor(Interceptor interceptor) { - Preconditions.checkState(this.interceptor == null, "Already have an interceptor."); - this.interceptor = interceptor; - } - - /** - * @return Whether the interceptor is still active after processing the data. - */ - private boolean feedInterceptor(ByteBuf buf) throws Exception { - if (interceptor != null && !interceptor.handle(buf)) { - interceptor = null; - } - return interceptor != null; - } - - public interface Interceptor { - - /** - * Handles data received from the remote end. - * - * @param data Buffer containing data. - * @return "true" if the interceptor expects more data, "false" to uninstall the interceptor. - */ - boolean handle(ByteBuf data) throws Exception; - - /** Called if an exception is thrown in the channel pipeline. */ - void exceptionCaught(Throwable cause) throws Exception; - - /** Called if the channel is closed and the interceptor is still installed. */ - void channelInactive() throws Exception; - - } - -}