diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index a0de9df1986f5..6948e595b546e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -44,6 +44,7 @@ import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.util.IOMode; import org.apache.spark.network.util.NettyUtils; +import org.apache.spark.network.util.NettyLogger; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.util.TransportFrameDecoder; @@ -64,6 +65,7 @@ public class TransportContext implements Closeable { private static final Logger logger = LoggerFactory.getLogger(TransportContext.class); + private static final NettyLogger nettyLogger = new NettyLogger(); private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; @@ -187,7 +189,11 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); - ChannelPipeline pipeline = channel.pipeline() + ChannelPipeline pipeline = channel.pipeline(); + if (nettyLogger.getLoggingHandler() != null) { + pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler()); + } + pipeline .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index b81c25afc737f..19eeddb842c09 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -194,4 +194,10 @@ public boolean release(int decrement) { } return super.release(decrement); } + + @Override + public String toString() { + return "MessageWithHeader [headerLength: " + headerLength + ", bodyLength: " + bodyLength + "]"; + } + } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java new file mode 100644 index 0000000000000..914c9704c79af --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.logging.LogLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NettyLogger { + private static final Logger logger = LoggerFactory.getLogger(NettyLogger.class); + + /** A Netty LoggingHandler which does not dump the message contents. */ + private static class NoContentLoggingHandler extends LoggingHandler { + + NoContentLoggingHandler(Class clazz, LogLevel level) { + super(clazz, level); + } + + protected String format(ChannelHandlerContext ctx, String eventName, Object arg) { + if (arg instanceof ByteBuf) { + return format(ctx, eventName) + " " + ((ByteBuf) arg).readableBytes() + "B"; + } else if (arg instanceof ByteBufHolder) { + return format(ctx, eventName) + " " + + ((ByteBufHolder) arg).content().readableBytes() + "B"; + } else { + return super.format(ctx, eventName, arg); + } + } + } + + private final LoggingHandler loggingHandler; + + public NettyLogger() { + if (logger.isTraceEnabled()) { + loggingHandler = new LoggingHandler(NettyLogger.class, LogLevel.TRACE); + } else if (logger.isDebugEnabled()) { + loggingHandler = new NoContentLoggingHandler(NettyLogger.class, LogLevel.DEBUG); + } else { + loggingHandler = null; + } + } + + public LoggingHandler getLoggingHandler() { + return loggingHandler; + } +}