diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 041a628e19c..ba07fbf6f2a 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -197,7 +197,6 @@ zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 - storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer @@ -205,6 +204,8 @@ storm.messaging.netty.buffer_size: 5242880 #5MB buffer storm.messaging.netty.max_retries: 300 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 +io.netty.noPreferDirect: false +io.netty.allocator.type: "pooled" # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 @@ -280,6 +281,7 @@ pacemaker.host: "localhost" pacemaker.port: 6699 pacemaker.base.threads: 10 pacemaker.max.threads: 50 +pacemaker.client.max.threads: 2 pacemaker.thread.timeout: 10 pacemaker.childopts: "-Xmx1024m" pacemaker.auth.method: "NONE" diff --git a/pom.xml b/pom.xml index ce5437c5ff3..b9e05db23ef 100644 --- a/pom.xml +++ b/pom.xml @@ -225,7 +225,8 @@ 3.3.2 0.9.0 16.0.1 - 3.9.0.Final + 4.1.5.Final + 1.0.2 1.6.6 2.1 1.7.7 @@ -648,10 +649,6 @@ log4j log4j - - org.jboss.netty - netty - @@ -819,7 +816,7 @@ io.netty - netty + netty-all ${netty.version} diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 0b5ea0222da..cb087e57f48 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -270,7 +270,7 @@ io.netty - netty + netty-all io.dropwizard.metrics @@ -490,7 +490,7 @@ compojure:compojure clj-time:clj-time org.apache.thrift:* - io.netty:netty + io.netty:netty-all com.google.guava:guava org.apache.httpcomponents:http* org.apache.zookeeper:zookeeper @@ -538,6 +538,10 @@ clojure.core.incubator org.apache.storm.shade.clojure.core.incubator + + io.netty + org.apache.storm.netty + clojure.tools.namespace org.apache.storm.shade.clojure.tools.namespace @@ -583,10 +587,6 @@ org.apache.storm.thrift - - org.jboss.netty - org.apache.storm.shade.org.jboss.netty - com.google.common org.apache.storm.shade.com.google.common @@ -760,7 +760,7 @@ - io.netty:netty + io.netty:netty-all META-INF/LICENSE.txt META-INF/NOTICE.txt diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 5a97f4aff8e..6cc3f107bc5 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -72,6 +72,20 @@ public class Config extends HashMap { @isPositiveNumber public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size"; + /** + * Netty based messaging: The write buffer high water mark for write buffer + */ + @isInteger + @isPositiveNumber + public static final String STORM_MESSAGING_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "storm.messaging.netty.write.buffer.high.water.mark"; + + /** + * Netty based messaging: The write buffer low water mark for write buffer + */ + @isInteger + @isPositiveNumber + public static final String STORM_MESSAGING_NETTY_WRITE_BUFFER_LOW_WATER_MARK = "storm.messaging.netty.write.buffer.low.water.mark"; + /** * Netty based messaging: Sets the backlog value to specify when the channel binds to a local address */ @@ -120,6 +134,12 @@ public class Config extends HashMap { @isInteger public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size"; + /** + * The Netty message decoder will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_DECODE_BATCH_SIZE + */ + @isInteger + public static final String STORM_NETTY_MESSAGE_DECODE_BATCH_SIZE = "storm.messaging.netty.decode.batch.size"; + /** * We check with this interval that whether the Netty channel is writable and try to write pending messages */ @@ -954,6 +974,15 @@ public class Config extends HashMap { @isPositiveNumber public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads"; + /** + * The maximum number of threads that should be used by the Pacemaker client. + * When Pacemaker gets loaded it will spawn new threads, up to + * this many total, to handle the load. + */ + @isNumber + @isPositiveNumber + public static final String PACEMAKER_CLIENT_MAX_THREADS = "pacemaker.client.max.threads"; + /** * This parameter is used by the storm-deploy project to configure the * jvm options for the pacemaker daemon. diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java index 7300847a134..b043c97549d 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java @@ -17,6 +17,11 @@ */ package org.apache.storm.messaging.local; +import org.apache.storm.grouping.Load; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IConnectionCallback; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,16 +33,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; - import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.storm.grouping.Load; -import org.apache.storm.messaging.IConnection; -import org.apache.storm.messaging.TaskMessage; -import org.apache.storm.messaging.IConnectionCallback; -import org.apache.storm.messaging.IContext; public class Context implements IContext { private static final Logger LOG = LoggerFactory.getLogger(Context.class); diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java index 035eb1bcdf9..da078171e28 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java @@ -17,42 +17,42 @@ */ package org.apache.storm.messaging.netty; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Iterator; -import java.util.Collection; -import java.util.Map; -import java.util.HashMap; -import java.util.Timer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.lang.InterruptedException; - +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import org.apache.storm.Config; import org.apache.storm.grouping.Load; import org.apache.storm.messaging.ConnectionWithStatus; -import org.apache.storm.messaging.TaskMessage; import org.apache.storm.messaging.IConnectionCallback; +import org.apache.storm.messaging.TaskMessage; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.utils.StormBoundedExponentialBackoffRetry; import org.apache.storm.utils.Utils; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Timer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkState; @@ -69,6 +69,7 @@ * the remote destination is currently unavailable. */ public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient { + private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; @@ -79,7 +80,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa private final Map stormConf; private final StormBoundedExponentialBackoffRetry retryPolicy; - private final ClientBootstrap bootstrap; + private final EventLoopGroup eventLoopGroup; + private final Bootstrap bootstrap; private final InetSocketAddress dstAddress; protected final String dstAddressPrefixedName; private volatile Map serverLoad = null; @@ -136,15 +138,19 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa private final MessageBuffer batcher; + private final AtomicBoolean inFlush = new AtomicBoolean(false); + private final Object writeLock = new Object(); @SuppressWarnings("rawtypes") - Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) { + Client(Map stormConf, EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host, int port, Context context) { this.stormConf = stormConf; closing = false; this.scheduler = scheduler; this.context = context; int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); + int write_buffer_high_water_mark = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_WRITE_BUFFER_HIGH_WATER_MARK), 5242880); + int write_buffer_low_water_mark = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_WRITE_BUFFER_LOW_WATER_MARK), 2097152); // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false)); LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize); @@ -155,8 +161,18 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts); + this.eventLoopGroup = eventLoopGroup; // Initiate connection to remote destination - bootstrap = createClientBootstrap(factory, bufferSize, stormConf); + bootstrap = new Bootstrap() + .group(this.eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, bufferSize) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, write_buffer_high_water_mark) + .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, write_buffer_low_water_mark) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .handler(new StormClientPipelineFactory(this, stormConf)); dstAddress = new InetSocketAddress(host, port); dstAddressPrefixedName = prefixedName(dstAddress); launchChannelAliveThread(); @@ -189,15 +205,6 @@ public void run() { }, 0, CHANNEL_ALIVE_INTERVAL_MS); } - private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map stormConf) { - ClientBootstrap bootstrap = new ClientBootstrap(factory); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("sendBufferSize", bufferSize); - bootstrap.setOption("keepAlive", true); - bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf)); - return bootstrap; - } - private String prefixedName(InetSocketAddress dstAddress) { if (null != dstAddress) { return PREFIX + dstAddress.toString(); @@ -217,14 +224,10 @@ private boolean reconnectingAllowed() { } private boolean connectionEstablished(Channel channel) { - // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully - // established iff the channel is connected. That is, a TCP-based channel must be in the CONNECTED state before - // anything can be read or written to the channel. - // + // since netty 4.x, the state model has been simplified // See: - // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html - // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions - return channel != null && channel.isConnected(); + // - http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-19 + return channel != null && channel.isActive(); } /** @@ -308,7 +311,8 @@ public void send(Iterator msgs) { } } - if(channel.isWritable()){ + // as channel.writeAndFlush is expensive, we don't want to call it every time + if(channel.isWritable() && !inFlush.get()){ synchronized (writeLock) { // Netty's internal buffer is not full and we still have message left in the buffer. // We should write the unfilled MessageBatch immediately to reduce latency @@ -320,7 +324,7 @@ public void send(Iterator msgs) { } else { // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger // batch. This yields better throughput. - // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer + // We can rely on `notifyChannelFlushabilityChanged` to push these messages as soon as there is space in Netty's buffer // because we know `Channel.isWritable` was false after the messages were already in the buffer. } } @@ -370,7 +374,7 @@ private int iteratorSize(Iterator msgs) { * * If the write operation fails, then we will close the channel and trigger a reconnect. */ - private void flushMessages(Channel channel, final MessageBatch batch) { + private void flushMessages(final Channel channel, final MessageBatch batch) { if (null == batch || batch.isEmpty()) { return; } @@ -378,23 +382,35 @@ private void flushMessages(Channel channel, final MessageBatch batch) { final int numMessages = batch.size(); LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString()); pendingMessages.addAndGet(numMessages); + inFlush.set(true); + // call channel.writeAndFlush in the same eventloop suppose to be more efficient + eventLoopGroup.execute(new Runnable() { + @Override + public void run() { - ChannelFuture future = channel.write(batch); - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - pendingMessages.addAndGet(0 - numMessages); - if (future.isSuccess()) { - LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName); - messagesSent.getAndAdd(batch.size()); - } else { - LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName, - future.getCause()); - closeChannelAndReconnect(future.getChannel()); - messagesLost.getAndAdd(numMessages); - } - } + ChannelFuture future = channel.writeAndFlush(batch); + inFlush.set(false); + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { + pendingMessages.addAndGet(0 - numMessages); + if (future.isSuccess()) { + LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName); + messagesSent.getAndAdd(batch.size()); + } else { + LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName, + future.cause()); + closeChannelAndReconnect(future.channel()); + messagesLost.getAndAdd(numMessages); + } + } + }); + + // need to call it so we can flush + notifyChannelFlushabilityChanged(channel); + } }); + } /** @@ -499,10 +515,6 @@ public Map getConfig() { } /** ISaslClient interface **/ - public void channelConnected(Channel channel) { -// setChannel(channel); - } - public void channelReady() { saslChannelReady.set(true); } @@ -520,7 +532,7 @@ private String srcAddressName() { String name = null; Channel channel = channelRef.get(); if (channel != null) { - SocketAddress address = channel.getLocalAddress(); + SocketAddress address = channel.localAddress(); if (address != null) { name = address.toString(); } @@ -534,13 +546,13 @@ public String toString() { } /** - * Called by Netty thread on change in channel interest - * @param channel + * Called by Netty thread on change in channel writability or after channel.writeAndFlush call is done + * @param channel channel */ - public void notifyInterestChanged(Channel channel) { + public void notifyChannelFlushabilityChanged(Channel channel) { if(channel.isWritable()){ synchronized (writeLock) { - // Channel is writable again, write if there are any messages pending + // Channel is flushable again, write if there are any messages pending MessageBatch pending = batcher.drain(); flushMessages(channel, pending); } @@ -569,7 +581,6 @@ private void reschedule(Throwable t) { scheduleConnect(nextDelayMs); } - @Override public void run(Timeout timeout) throws Exception { if (reconnectingAllowed()) { @@ -582,7 +593,7 @@ public void run(Timeout timeout) throws Exception { @Override public void operationComplete(ChannelFuture future) throws Exception { // This call returns immediately - Channel newChannel = future.getChannel(); + Channel newChannel = future.channel(); if (future.isSuccess() && connectionEstablished(newChannel)) { boolean setChannel = channelRef.compareAndSet(null, newChannel); @@ -593,7 +604,7 @@ public void operationComplete(ChannelFuture future) throws Exception { LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get()); } } else { - Throwable cause = future.getCause(); + Throwable cause = future.cause(); reschedule(cause); if (newChannel != null) { newChannel.close(); diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java index 3b65eb50a2f..8c664ca9371 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java @@ -17,24 +17,25 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.HashMap; -import java.util.Map; - +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; import org.apache.storm.Config; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IContext; import org.apache.storm.utils.Utils; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; + public class Context implements IContext { @SuppressWarnings("rawtypes") private Map storm_conf; private Map connections; - private NioClientSocketChannelFactory clientChannelFactory; - + + private EventLoopGroup workerEventLoopGroup; + private HashedWheelTimer clientScheduleService; /** @@ -45,18 +46,17 @@ public void prepare(Map storm_conf) { this.storm_conf = storm_conf; connections = new HashMap<>(); - //each context will have a single client channel factory + //each context will have a single client channel workerEventLoopGroup int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS)); - ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker"); + if (maxWorkers > 0) { - clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), maxWorkers); + workerEventLoopGroup = new NioEventLoopGroup(maxWorkers, workerFactory); } else { - clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + // 0 means DEFAULT_EVENT_LOOP_THREADS + workerEventLoopGroup = new NioEventLoopGroup(0, workerFactory); } - + clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service")); } @@ -78,7 +78,7 @@ public synchronized IConnection connect(String storm_id, String host, int port) { return connection; } - IConnection client = new Client(storm_conf, clientChannelFactory, + IConnection client = new Client(storm_conf, workerEventLoopGroup, clientScheduleService, host, port, this); connections.put(key(host, port), client); return client; @@ -102,9 +102,8 @@ public synchronized void term() { connections = null; - //we need to release resources associated with client channel factory - clientChannelFactory.releaseExternalResources(); - + //we need to release resources associated + workerEventLoopGroup.shutdownGracefully(); } private String key(String host, int port) { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java index 3c7aaba613a..fcc4c979bda 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java @@ -17,11 +17,11 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; +import java.io.IOException; public enum ControlMessage implements INettySerializable { CLOSE_MESSAGE((short)-100), @@ -55,21 +55,29 @@ public int encodeLength() { /** * encode the current Control Message into a channel buffer - * @throws IOException + * @throws Exception if failed to write to buffer */ - public ChannelBuffer buffer() throws IOException { - ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength())); + public ByteBuf buffer() throws IOException { + ByteBufOutputStream bout = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.ioBuffer(encodeLength())); write(bout); bout.close(); return bout.buffer(); } + void write(ByteBufOutputStream bout) throws IOException { + bout.writeShort(code); + } + public static ControlMessage read(byte[] serial) { - ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial); - return mkMessage(cm_buffer.getShort(0)); + ByteBuf cm_buffer = ByteBufAllocator.DEFAULT.buffer(serial.length); + try { + cm_buffer.writeBytes(serial); + short messageCode = cm_buffer.getShort(0); + return mkMessage(messageCode); + } finally { + if (cm_buffer != null) { + cm_buffer.release(); + } + } } - - public void write(ChannelBufferOutputStream bout) throws IOException { - bout.writeShort(code); - } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java index 0a0236facad..c891779b932 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java @@ -17,10 +17,11 @@ */ package org.apache.storm.messaging.netty; +import io.netty.buffer.ByteBuf; + import java.io.IOException; -import org.jboss.netty.buffer.ChannelBuffer; public interface INettySerializable { - ChannelBuffer buffer() throws IOException; + ByteBuf buffer() throws IOException; int encodeLength(); } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java index 681c1990324..263ccbfeb7b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java @@ -17,12 +17,9 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.apache.storm.Config; - public interface ISaslClient { - void channelConnected(Channel channel); + void channelReady(); String name(); String secretKey(); -} +} \ No newline at end of file diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java index 997dbebaac7..b72b393574c 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java @@ -17,7 +17,7 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; +import io.netty.channel.Channel; public interface ISaslServer extends IServer { String name(); diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/IServer.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/IServer.java index b04d7154ec9..6a184d30be7 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/IServer.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/IServer.java @@ -17,7 +17,7 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; +import io.netty.channel.Channel; public interface IServer { void channelConnected(Channel c); diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java index 9f4632978df..9119185a48b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java @@ -17,18 +17,16 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; -import java.util.Map; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler { +import java.io.IOException; +import java.util.Map; + +public class KerberosSaslClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory .getLogger(KerberosSaslClientHandler.class); @@ -46,56 +44,48 @@ public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas } @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent event) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { // register the newly established channel - Channel channel = ctx.getChannel(); - client.channelConnected(channel); + Channel channel = ctx.channel(); + client.channelReady(); LOG.info("Connection established from {} to {}", - channel.getLocalAddress(), channel.getRemoteAddress()); + channel.localAddress(), channel.remoteAddress()); try { - KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient - .get(channel); + KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { LOG.debug("Creating saslNettyClient now for channel: {}", channel); saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section); - KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel, - saslNettyClient); + channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).set(saslNettyClient); } LOG.debug("Going to initiate Kerberos negotiations."); byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0])); LOG.debug("Sending initial challenge: {}", initialChallenge); - channel.write(new SaslMessageToken(initialChallenge)); + channel.writeAndFlush(new SaslMessageToken(initialChallenge)); } catch (Exception e) { LOG.error("Failed to authenticate with server due to error: ", e); } - return; - } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) - throws Exception { - LOG.debug("send/recv time (ms): {}", - (System.currentTimeMillis() - start_time)); + public void channelRead(ChannelHandlerContext ctx, Object rawMsg) throws Exception { + LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time)); - Channel channel = ctx.getChannel(); + Channel channel = ctx.channel(); // Generate SASL response to server using Channel-local SASL client. - KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient - .get(channel); + KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel); } // examine the response message from server - if (event.getMessage() instanceof ControlMessage) { - ControlMessage msg = (ControlMessage) event.getMessage(); + if (rawMsg instanceof ControlMessage) { + ControlMessage msg = (ControlMessage) rawMsg; if (msg == ControlMessage.SASL_COMPLETE_REQUEST) { LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed."); @@ -104,21 +94,19 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) LOG.error(message); throw new Exception(message); } - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); this.client.channelReady(); - // We call fireMessageReceived since the client is allowed to + // We call writeAndFlush since the client is allowed to // perform this request. The client's request will now proceed // to the next pipeline component namely StormClientHandler. - Channels.fireMessageReceived(ctx, msg); + channel.writeAndFlush(msg); } else { LOG.warn("Unexpected control message: {}", msg); } - return; } - else if (event.getMessage() instanceof SaslMessageToken) { - SaslMessageToken saslTokenMessage = (SaslMessageToken) event - .getMessage(); + else if (rawMsg instanceof SaslMessageToken) { + SaslMessageToken saslTokenMessage = (SaslMessageToken) rawMsg; LOG.debug("Responding to server's token of length: {}", saslTokenMessage.getSaslToken().length); @@ -144,9 +132,9 @@ else if (event.getMessage() instanceof SaslMessageToken) { // Construct a message containing the SASL response and send it to the // server. SaslMessageToken saslResponse = new SaslMessageToken(responseToServer); - channel.write(saslResponse); + channel.writeAndFlush(saslResponse); } else { - LOG.error("Unexpected message from server: {}", event.getMessage()); + LOG.error("Unexpected message from server: {}", rawMsg); } } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java index d53bb7c1c45..fb68feb8dd8 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java @@ -19,12 +19,9 @@ import org.apache.storm.Config; import org.apache.storm.security.auth.AuthUtils; -import java.io.IOException; -import java.security.Principal; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.Map; -import java.util.TreeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.security.auth.Subject; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -35,9 +32,12 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import org.apache.zookeeper.server.auth.KerberosName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.TreeMap; /** * Implements SASL logic for storm worker client processes. diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java index dc76b0d9424..26ad4e6125a 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java @@ -17,15 +17,9 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelLocal; +import io.netty.util.AttributeKey; final class KerberosSaslNettyClientState { - public static final ChannelLocal getKerberosSaslNettyClient = new ChannelLocal() { - protected KerberosSaslNettyClient initialValue(Channel channel) { - return null; - } - }; - + public static final AttributeKey KERBEROS_SASL_NETTY_CLIENT = AttributeKey.valueOf("kerberos.sasl.netty.client"); } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java index 72486ef597e..dcc90df79e1 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java @@ -19,14 +19,10 @@ import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.security.auth.KerberosPrincipalToLocal; -import java.io.IOException; -import java.security.Principal; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import org.apache.zookeeper.server.auth.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.security.auth.Subject; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -39,9 +35,13 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.zookeeper.server.auth.KerberosName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; class KerberosSaslNettyServer { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java index 2ee2bf4204a..ec1cd8ccc68 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java @@ -17,14 +17,9 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelLocal; +import io.netty.util.AttributeKey; final class KerberosSaslNettyServerState { - public static final ChannelLocal getKerberosSaslNettyServer = new ChannelLocal() { - protected KerberosSaslNettyServer initialValue(Channel channel) { - return null; - } - }; + public static final AttributeKey KERBOROS_SASL_NETTY_SERVER = AttributeKey.valueOf("kerboros.sasl.netty.server"); } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java index 14ac1724f96..5fc388d7fab 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java @@ -17,19 +17,17 @@ */ package org.apache.storm.messaging.netty; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.List; import java.util.Map; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler { +public class KerberosSaslServerHandler extends ChannelInboundHandlerAdapter { ISaslServer server; /** Used for client or server's token to send or receive from each other. */ @@ -48,14 +46,12 @@ public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - Object msg = e.getMessage(); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null) { return; } - Channel channel = ctx.getChannel(); + Channel channel = ctx.channel(); if (msg instanceof SaslMessageToken) { @@ -65,22 +61,20 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) try { LOG.debug("Got SaslMessageToken!"); - KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer - .get(channel); + KerberosSaslNettyServer saslNettyServer = channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).get(); if (saslNettyServer == null) { LOG.debug("No saslNettyServer for {} yet; creating now, with topology token: ", channel); try { saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers); - KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel, - saslNettyServer); + channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).set(saslNettyServer); } catch (RuntimeException ioe) { LOG.error("Error occurred while creating saslNettyServer on server {} for client {}", - channel.getLocalAddress(), channel.getRemoteAddress()); + channel.localAddress(), channel.remoteAddress()); throw ioe; } } else { LOG.debug("Found existing saslNettyServer on server: {} for client {}", - channel.getLocalAddress(), channel.getRemoteAddress()); + channel.localAddress(), channel.remoteAddress()); } byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg) @@ -89,10 +83,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes); if(saslTokenMessageRequest.getSaslToken() == null) { - channel.write(ControlMessage.SASL_COMPLETE_REQUEST); + channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST); } else { // Send response to client. - channel.write(saslTokenMessageRequest); + channel.writeAndFlush(saslTokenMessageRequest); } if (saslNettyServer.isComplete()) { @@ -100,12 +94,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // SASL-Complete message to the client. LOG.info("SASL authentication is complete for client with username: {}", saslNettyServer.getUserName()); - channel.write(ControlMessage.SASL_COMPLETE_REQUEST); + channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST); LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete."); - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); server.authenticated(channel); } - return; } catch (Exception ex) { LOG.error("Failed to handle SaslMessageToken: ", ex); @@ -117,16 +110,15 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // non-SASL requests will be denied by the Authorize channel handler // (the next handler upstream in the server pipeline) if SASL // authentication has not completed. - LOG.warn("Sending upstream an unexpected non-SASL message : {}", - msg); - Channels.fireMessageReceived(ctx, msg); + LOG.warn("Sending upstream an unexpected non-SASL message : {}", msg); + channel.writeAndFlush(msg); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(server != null) { - server.closeChannel(e.getChannel()); + server.closeChannel(ctx.channel()); } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java index 1c5dd9d2e64..521e7d2d5d2 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java @@ -17,10 +17,10 @@ */ package org.apache.storm.messaging.netty; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; import org.apache.storm.messaging.TaskMessage; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; import java.util.ArrayList; @@ -77,8 +77,8 @@ int size() { /** * create a buffer containing the encoding of this batch */ - ChannelBuffer buffer() throws Exception { - ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length)); + ByteBuf buffer() throws Exception { + ByteBufOutputStream bout = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.ioBuffer(encoded_length)); for (TaskMessage msg : msgs) { writeTaskMessage(bout, msg); @@ -98,9 +98,9 @@ ChannelBuffer buffer() throws Exception { * Each TaskMessage is encoded as: * task ... short(2) * len ... int(4) - * payload ... byte[] * + * payload ... byte[] */ - private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception { + private void writeTaskMessage(ByteBufOutputStream bout, TaskMessage message) throws Exception { int payload_len = 0; if (message.message() != null) payload_len = message.message().length; diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java index 9030424fe6a..34acdfa6122 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java @@ -17,16 +17,29 @@ */ package org.apache.storm.messaging.netty; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.storm.Config; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; +import java.util.Map; -import org.apache.storm.messaging.TaskMessage; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.frame.FrameDecoder; +public class MessageDecoder extends ByteToMessageDecoder { + + private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class); + + private final int maxBatchSize; + + public MessageDecoder(Map storm_conf) { + maxBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_DECODE_BATCH_SIZE), -1); + } -public class MessageDecoder extends FrameDecoder { /* * Each ControlMessage is encoded as: * code (<0) ... short(2) @@ -35,16 +48,16 @@ public class MessageDecoder extends FrameDecoder { * len ... int(4) * payload ... byte[] * */ - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { - // Make sure that we have received at least a short + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) throws Exception { + // Make sure that we have received at least a short long available = buf.readableBytes(); if (available < 2) { //need more data - return null; + return; } List ret = new ArrayList<>(); - // Use while loop, try to decode as more messages as possible in single call while (available >= 2) { @@ -65,38 +78,42 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe if (ctrl_msg == ControlMessage.EOB_MESSAGE) { continue; } else { - return ctrl_msg; + out.add(ctrl_msg); + return; } } - + //case 2: SaslTokenMessageRequest - if(code == SaslMessageToken.IDENTIFIER) { - // Make sure that we have received at least an integer (length) + if (code == SaslMessageToken.IDENTIFIER) { + // Make sure that we have received at least an integer (length) if (buf.readableBytes() < 4) { //need more data buf.resetReaderIndex(); - return null; + return; } - + // Read the length field. int length = buf.readInt(); - if (length<=0) { - return new SaslMessageToken(null); + if (length <= 0) { + out.add(new SaslMessageToken(null)); + return; } - + // Make sure if there's enough bytes in the buffer. if (buf.readableBytes() < length) { // The whole bytes were not received yet - return null. buf.resetReaderIndex(); - return null; + return; } - - // There's enough bytes in the buffer. Read it. - ChannelBuffer payload = buf.readBytes(length); - + + // There's enough bytes in the buffer. Read it. + byte[] payload = new byte[length]; + buf.readBytes(payload); + // Successfully decoded a frame. // Return a SaslTokenMessageRequest object - return new SaslMessageToken(payload.array()); + out.add(new SaslMessageToken(payload)); + return; } // case 3: task Message @@ -114,7 +131,7 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe available -= 4; if (length <= 0) { - ret.add(new TaskMessage(code, null)); + out.add(new TaskMessage(code, null)); break; } @@ -127,18 +144,21 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe available -= length; // There's enough bytes in the buffer. Read it. - ChannelBuffer payload = buf.readBytes(length); - + byte[] payload = new byte[length]; + buf.readBytes(payload); // Successfully decoded a frame. // Return a TaskMessage object - ret.add(new TaskMessage(code, payload.array())); + ret.add(new TaskMessage(code, payload)); + + if (maxBatchSize > 0 && ret.size() >= maxBatchSize) { + break; + } } - if (ret.size() == 0) { - return null; - } else { - return ret; + LOG.debug("Decoded {} messages", ret.size()); + if (ret.size() > 0) { + out.add(ret); } } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java index 0e9fc983412..bd4c5069a22 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java @@ -17,27 +17,27 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.util.List; + +public class MessageEncoder extends MessageToMessageEncoder { -public class MessageEncoder extends OneToOneEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception { + protected void encode(ChannelHandlerContext ctx, Object obj, List out) throws Exception { + if (obj instanceof ControlMessage) { - return ((ControlMessage)obj).buffer(); - } + out.add(((ControlMessage) obj).buffer()); - if (obj instanceof MessageBatch) { - return ((MessageBatch)obj).buffer(); - } - - if (obj instanceof SaslMessageToken) { - return ((SaslMessageToken)obj).buffer(); - } - - throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName()); - } + } else if (obj instanceof MessageBatch) { + out.add(((MessageBatch)obj).buffer()); + } else if (obj instanceof SaslMessageToken) { + out.add(((SaslMessageToken)obj).buffer()); + } else { + throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName()); + } + } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java index e60c711102a..43d2dbe125e 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java @@ -20,16 +20,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.util.ThreadNameDeterminer; -import org.jboss.netty.util.ThreadRenamingRunnable; - public class NettyRenameThreadFactory implements ThreadFactory { - static { - //Rename Netty threads - ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT); - } - final ThreadGroup group; final AtomicInteger index = new AtomicInteger(1); final String name; diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java index ccac70f756c..de4eb5f6db0 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java @@ -17,13 +17,14 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * Send and receive SASL tokens. */ @@ -83,9 +84,9 @@ public int encodeLength() { * * @throws IOException */ - public ChannelBuffer buffer() throws IOException { - ChannelBufferOutputStream bout = new ChannelBufferOutputStream( - ChannelBuffers.directBuffer(encodeLength())); + public ByteBuf buffer() throws IOException { + ByteBufOutputStream bout = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.ioBuffer(encodeLength())); + short identifier = -500; int payload_len = 0; if (token != null) payload_len = token.length; @@ -101,14 +102,21 @@ public ChannelBuffer buffer() throws IOException { } public static SaslMessageToken read(byte[] serial) { - ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial); - short identifier = sm_buffer.readShort(); - int payload_len = sm_buffer.readInt(); - if(identifier != IDENTIFIER) { - return null; + ByteBuf sm_buffer = ByteBufAllocator.DEFAULT.ioBuffer(serial.length); + try { + sm_buffer.writeBytes(serial); + short identifier = sm_buffer.readShort(); + int payload_len = sm_buffer.readInt(); + if (identifier != IDENTIFIER) { + return null; + } + byte token[] = new byte[payload_len]; + sm_buffer.readBytes(token, 0, payload_len); + return new SaslMessageToken(token); + } finally { + if (sm_buffer != null) { + sm_buffer.release(); + } } - byte token[] = new byte[payload_len]; - sm_buffer.readBytes(token, 0, payload_len); - return new SaslMessageToken(token); } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java index b24e7d69b96..3e09bdbc30b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java @@ -17,7 +17,8 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -29,9 +30,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; /** * Implements SASL logic for storm worker client processes. diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java index 3a7d6a27786..c1f6f94534b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java @@ -17,15 +17,10 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelLocal; +import io.netty.util.AttributeKey; final class SaslNettyClientState { - public static final ChannelLocal getSaslNettyClient = new ChannelLocal() { - protected SaslNettyClient initialValue(Channel channel) { - return null; - } - }; + public static final AttributeKey SASL_NETTY_CLIENT = AttributeKey.valueOf("sasl.netty.client"); } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServer.java index 9f48c397776..51b9a96708d 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServer.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServer.java @@ -17,7 +17,8 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -29,9 +30,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; class SaslNettyServer { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java index fb4dd702050..0f5f736d427 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java @@ -17,14 +17,9 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelLocal; +import io.netty.util.AttributeKey; final class SaslNettyServerState { - public static final ChannelLocal getSaslNettyServer = new ChannelLocal() { - protected SaslNettyServer initialValue(Channel channel) { - return null; - } - }; -} + public static final AttributeKey SASL_NETTY_SERVER = AttributeKey.valueOf("sasl.netty.server"); +} \ No newline at end of file diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java index e6634f7e88b..37d9e59db25 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java @@ -17,21 +17,17 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SaslStormClientHandler extends SimpleChannelUpstreamHandler { +import java.io.IOException; + +public class SaslStormClientHandler extends ChannelInboundHandlerAdapter { - private static final Logger LOG = LoggerFactory - .getLogger(SaslStormClientHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(SaslStormClientHandler.class); private ISaslClient client; long start_time; @@ -46,25 +42,23 @@ public SaslStormClientHandler(ISaslClient client) throws IOException { } @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent event) { + public void channelActive(ChannelHandlerContext ctx) { // register the newly established channel - Channel channel = ctx.getChannel(); - client.channelConnected(channel); + Channel channel = ctx.channel(); + + LOG.info("Connection established from " + channel.localAddress() + + " to " + channel.remoteAddress()); try { - SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient - .get(channel); + SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { LOG.debug("Creating saslNettyClient now " + "for channel: " + channel); saslNettyClient = new SaslNettyClient(name, token); - SaslNettyClientState.getSaslNettyClient.set(channel, - saslNettyClient); + channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient); } - LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST"); - channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST); + channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST); } catch (Exception e) { LOG.error("Failed to authenticate with server " + "due to error: ", e); @@ -72,25 +66,23 @@ public void channelConnected(ChannelHandlerContext ctx, } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) - throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time)); - Channel channel = ctx.getChannel(); + Channel channel = ctx.channel(); // Generate SASL response to server using Channel-local SASL client. - SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient - .get(channel); + SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { throw new Exception("saslNettyClient was unexpectedly " + "null for channel: " + channel); } // examine the response message from server - if (event.getMessage() instanceof ControlMessage) { - ControlMessage msg = (ControlMessage) event.getMessage(); - if (msg == ControlMessage.SASL_COMPLETE_REQUEST) { + if (msg instanceof ControlMessage) { + ControlMessage controlMessage = (ControlMessage) msg; + if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) { LOG.debug("Server has sent us the SaslComplete " + "message. Allowing normal work to proceed."); @@ -101,18 +93,18 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) + "Sasl-complete message, but as far as " + "we can tell, we are not authenticated yet."); } - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); this.client.channelReady(); // We call fireMessageReceived since the client is allowed to // perform this request. The client's request will now proceed // to the next pipeline component namely StormClientHandler. - Channels.fireMessageReceived(ctx, msg); + ctx.fireChannelRead(msg); return; } } - SaslMessageToken saslTokenMessage = (SaslMessageToken) event - .getMessage(); + + SaslMessageToken saslTokenMessage = (SaslMessageToken) msg; LOG.debug("Responding to server's token of length: " + saslTokenMessage.getSaslToken().length); @@ -141,7 +133,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) // Construct a message containing the SASL response and send it to the // server. SaslMessageToken saslResponse = new SaslMessageToken(responseToServer); - channel.write(saslResponse); + channel.writeAndFlush(saslResponse); } private void getSASLCredentials() throws IOException { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java index e32144f86e1..898ab056894 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java @@ -17,11 +17,8 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +26,9 @@ * Authorize or deny client requests based on existence and completeness of * client's SASL authentication. */ -public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler { +public class SaslStormServerAuthorizeHandler extends ChannelInboundHandlerAdapter { - private static final Logger LOG = LoggerFactory - .getLogger(SaslStormServerHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(SaslStormServerAuthorizeHandler.class); /** * Constructor. @@ -41,18 +37,15 @@ public SaslStormServerAuthorizeHandler() { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Object msg = e.getMessage(); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null) return; - Channel channel = ctx.getChannel(); LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server "); // Authorize: client is allowed to doRequest() if and only if the client // has successfully authenticated with this server. - SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer - .get(channel); + SaslNettyServer saslNettyServer = ctx.channel().attr(SaslNettyServerState.SASL_NETTY_SERVER).get(); if (saslNettyServer == null) { LOG.warn("messageReceived: This client is *NOT* authorized to perform " @@ -78,6 +71,6 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { // We call fireMessageReceived since the client is allowed to perform // this request. The client's request will now proceed to the next // pipeline component. - Channels.fireMessageReceived(ctx, msg); + ctx.fireChannelRead(msg); } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java index b4cce236eaf..f42238aae8c 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java @@ -17,18 +17,15 @@ */ package org.apache.storm.messaging.netty; -import java.io.IOException; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SaslStormServerHandler extends SimpleChannelUpstreamHandler { +import java.io.IOException; + +public class SaslStormServerHandler extends ChannelInboundHandlerAdapter { ISaslServer server; /** Used for client or server's token to send or receive from each other. */ @@ -44,50 +41,51 @@ public SaslStormServerHandler(ISaslServer server) throws IOException { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - Object msg = e.getMessage(); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null) return; - Channel channel = ctx.getChannel(); + Channel channel = ctx.channel(); - if (msg instanceof ControlMessage - && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) { + if (msg instanceof ControlMessage && msg == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) { // initialize server-side SASL functionality, if we haven't yet // (in which case we are looking at the first SASL message from the // client). - SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer - .get(channel); + SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get(); if (saslNettyServer == null) { LOG.debug("No saslNettyServer for " + channel - + " yet; creating now, with topology token: "); + + " yet; creating now, with topology token: " + topologyName); try { saslNettyServer = new SaslNettyServer(topologyName, token); } catch (IOException ioe) { LOG.error("Error occurred while creating saslNettyServer on server " - + channel.getLocalAddress() + + channel.localAddress() + " for client " - + channel.getRemoteAddress()); + + channel.remoteAddress()); saslNettyServer = null; } - SaslNettyServerState.getSaslNettyServer.set(channel, - saslNettyServer); + channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).set(saslNettyServer); + if (channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get() == null) { + throw new IllegalStateException("Failed to set SaslNettyServerState.SASL_NETTY_SERVER"); + } else { + LOG.debug("SaslNettyServer for " + channel + + "created with topology token: " + topologyName); + } + } else { LOG.debug("Found existing saslNettyServer on server:" - + channel.getLocalAddress() + " for client " - + channel.getRemoteAddress()); + + channel.localAddress() + " for client " + + channel.remoteAddress()); } LOG.debug("processToken: With nettyServer: " + saslNettyServer + " and token length: " + token.length); SaslMessageToken saslTokenMessageRequest; - saslTokenMessageRequest = new SaslMessageToken( - saslNettyServer.response(new byte[0])); + saslTokenMessageRequest = new SaslMessageToken(saslNettyServer.response(new byte[0])); // Send response to client. - channel.write(saslTokenMessageRequest); + channel.writeAndFlush(saslTokenMessageRequest); // do not send upstream to other handlers: no further action needs // to be done for SASL_TOKEN_MESSAGE_REQUEST requests. return; @@ -97,8 +95,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // initialize server-side SASL functionality, if we haven't yet // (in which case we are looking at the first SASL message from the // client). - SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer - .get(channel); + SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get(); if (saslNettyServer == null) { throw new Exception("saslNettyServer was unexpectedly " + "null for channel: " + channel); @@ -108,17 +105,17 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) .getSaslToken())); // Send response to client. - channel.write(saslTokenMessageRequest); + channel.writeAndFlush(saslTokenMessageRequest); if (saslNettyServer.isComplete()) { // If authentication of client is complete, we will also send a // SASL-Complete message to the client. LOG.debug("SASL authentication is complete for client with " + "username: " + saslNettyServer.getUserName()); - channel.write(ControlMessage.SASL_COMPLETE_REQUEST); + channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST); LOG.debug("Removing SaslServerHandler from pipeline since SASL " + "authentication is complete."); - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); server.authenticated(channel); } } else { @@ -129,13 +126,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // authentication has not completed. LOG.warn("Sending upstream an unexpected non-SASL message : " + msg); - Channels.fireMessageReceived(ctx, msg); + ctx.fireChannelRead(msg); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - if(server != null) server.closeChannel(e.getChannel()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + server.closeChannel(ctx.channel()); } private void getSASLCredentials() throws IOException { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslUtils.java index b5c6b6be966..3794622ce87 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslUtils.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/SaslUtils.java @@ -17,14 +17,12 @@ */ package org.apache.storm.messaging.netty; -import java.util.HashMap; -import java.util.Map; - -import javax.security.sasl.Sasl; - import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.Charsets; +import javax.security.sasl.Sasl; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.Config; class SaslUtils { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Server.java index 91becf808de..0d8b8c53470 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Server.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Server.java @@ -17,6 +17,20 @@ */ package org.apache.storm.messaging.netty; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.storm.Config; import org.apache.storm.grouping.Load; import org.apache.storm.messaging.ConnectionWithStatus; @@ -34,17 +48,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer { @@ -55,8 +60,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe private final ConcurrentHashMap messagesEnqueued = new ConcurrentHashMap<>(); private final AtomicInteger messagesDequeued = new AtomicInteger(0); - volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); - final ChannelFactory factory; + volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE); + final EventLoopGroup bossEventLoopGroup; + final EventLoopGroup workerEventLoopGroup; final ServerBootstrap bootstrap; private volatile boolean closing = false; @@ -72,34 +78,44 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe // Configure the server. int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); + int write_buffer_high_water_mark = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_WRITE_BUFFER_HIGH_WATER_MARK), 5242880); + int write_buffer_low_water_mark = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_WRITE_BUFFER_LOW_WATER_MARK), 2097152); int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500); int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)); - ThreadFactory bossFactory = new NettyRenameThreadFactory(netty_name() + "-boss"); - ThreadFactory workerFactory = new NettyRenameThreadFactory(netty_name() + "-worker"); + ThreadFactory bossFactory = new NettyRenameThreadFactory(name() + "-boss"); + ThreadFactory workerFactory = new NettyRenameThreadFactory(name() + "-worker"); + bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory); + // 0 means DEFAULT_EVENT_LOOP_THREADS if (maxWorkers > 0) { - factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), maxWorkers); + // add extra one for boss + workerEventLoopGroup = new NioEventLoopGroup(maxWorkers, workerFactory); } else { - factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + workerEventLoopGroup = new NioEventLoopGroup(0, workerFactory); } - - LOG.info("Create Netty Server " + netty_name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers); - - bootstrap = new ServerBootstrap(factory); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.receiveBufferSize", buffer_size); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("backlog", backlog); - - // Set up the pipeline factory. - bootstrap.setPipelineFactory(new StormServerPipelineFactory(this)); + + LOG.info("Create Netty Server " + name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers); + + bootstrap = new ServerBootstrap() + .group(bossEventLoopGroup, workerEventLoopGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, backlog) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_RCVBUF, buffer_size) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, write_buffer_high_water_mark) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, write_buffer_low_water_mark) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new StormServerPipelineFactory(this, storm_conf)); // Bind and start to accept incoming connections. - Channel channel = bootstrap.bind(new InetSocketAddress(port)); - allChannels.add(channel); + try { + ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync(); + allChannels.add(f.channel()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } private void addReceiveCount(String from, int amount) { @@ -161,7 +177,7 @@ public void closeChannel(Channel channel) { public synchronized void close() { if (allChannels != null) { allChannels.close().awaitUninterruptibly(); - factory.releaseExternalResources(); + workerEventLoopGroup.shutdownGracefully(); allChannels = null; } } @@ -171,7 +187,7 @@ public void sendLoadMetrics(Map taskToLoad) { try { MessageBatch mb = new MessageBatch(1); mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object)taskToLoad)))); - allChannels.write(mb); + allChannels.writeAndFlush(mb); } catch (IOException e) { throw new RuntimeException(e); } @@ -210,7 +226,7 @@ else if (!connectionEstablished(allChannels)) { } private boolean connectionEstablished(Channel channel) { - return channel != null && channel.isBound(); + return channel != null && channel.isActive(); } private boolean connectionEstablished(ChannelGroup allChannels) { diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java index e763d5d7929..ebf72d7a270 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java @@ -17,26 +17,19 @@ */ package org.apache.storm.messaging.netty; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.storm.messaging.TaskMessage; import org.apache.storm.serialization.KryoValuesDeserializer; +import java.io.IOException; import java.net.ConnectException; -import java.util.Map; import java.util.List; -import java.io.IOException; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; - -import org.jboss.netty.channel.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Map; -public class StormClientHandler extends SimpleChannelUpstreamHandler { +public class StormClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class); private Client client; private KryoValuesDeserializer _des; @@ -47,9 +40,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) { + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { //examine the response message from server - Object message = event.getMessage(); if (message instanceof ControlMessage) { ControlMessage msg = (ControlMessage)message; if (msg==ControlMessage.FAILURE_RESPONSE) { @@ -71,19 +63,17 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) { throw new RuntimeException(e); } } else { - throw new RuntimeException("Don't know how to handle a message of type " - + message + " (" + client.getDstAddress() + ")"); + throw new RuntimeException("Don't know how to handle a message of type " + message + " (" + client.getDstAddress() + ")"); } } @Override - public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - client.notifyInterestChanged(e.getChannel()); + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + client.notifyChannelFlushabilityChanged(ctx.channel()); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) { - Throwable cause = event.getCause(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (!(cause instanceof ConnectException)) { LOG.info("Connection to "+client.getDstAddress()+" failed:", cause); } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java index e36eb65975a..0bfe7e3213b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java @@ -17,14 +17,13 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; - +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import org.apache.storm.Config; import java.util.Map; -class StormClientPipelineFactory implements ChannelPipelineFactory { +class StormClientPipelineFactory extends ChannelInitializer { private Client client; private Map conf; @@ -33,12 +32,13 @@ class StormClientPipelineFactory implements ChannelPipelineFactory { this.conf = conf; } - public ChannelPipeline getPipeline() throws Exception { + @Override + protected void initChannel(Channel ch) throws Exception { // Create a default pipeline implementation. - ChannelPipeline pipeline = Channels.pipeline(); + ChannelPipeline pipeline = ch.pipeline(); // Decoder - pipeline.addLast("decoder", new MessageDecoder()); + pipeline.addLast("decoder", new MessageDecoder(conf)); // Encoder pipeline.addLast("encoder", new MessageEncoder()); @@ -46,11 +46,9 @@ public ChannelPipeline getPipeline() throws Exception { .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION); if (isNettyAuth) { // Authenticate: Removed after authentication completes - pipeline.addLast("saslClientHandler", new SaslStormClientHandler( - client)); + pipeline.addLast("saslClientHandler", new SaslStormClientHandler(client)); } // business logic. pipeline.addLast("handler", new StormClientHandler(client, conf)); - return pipeline; } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java index fbec9659fe3..0318a321abb 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java @@ -17,58 +17,42 @@ */ package org.apache.storm.messaging.netty; -import org.apache.storm.utils.Utils; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.concurrent.atomic.AtomicInteger; -public class StormServerHandler extends SimpleChannelUpstreamHandler { +public class StormServerHandler extends ChannelInboundHandlerAdapter { + private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class); IServer server; private AtomicInteger failure_count; - private Channel channel; - + public StormServerHandler(IServer server) { this.server = server; failure_count = new AtomicInteger(0); } - + @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { - server.channelConnected(e.getChannel()); - if(channel != null) { - LOG.debug("Replacing channel with new channel: {} -> ", - channel, e.getChannel()); - } - channel = e.getChannel(); - server.channelConnected(channel); + public void channelActive(ChannelHandlerContext ctx) { + server.channelConnected(ctx.channel()); } - + @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Object msgs = e.getMessage(); - if (msgs == null) { - return; - } - + public void channelRead(ChannelHandlerContext ctx, Object msg) { try { - server.received(msgs, e.getRemoteAddress().toString(), channel); + server.received(msg, ctx.channel().remoteAddress().toString(), ctx.channel()); } catch (InterruptedException e1) { - LOG.info("failed to enqueue a request message", e); + LOG.info("failed to enqueue a request message", e1); failure_count.incrementAndGet(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.error("server errors in handling the request", e.getCause()); - Utils.handleUncaughtException(e.getCause()); - server.closeChannel(e.getChannel()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.error("server errors in handling the request", cause); + server.closeChannel(ctx.channel()); } } diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java index d28790b9f5b..66fe1145751 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java @@ -17,25 +17,29 @@ */ package org.apache.storm.messaging.netty; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; - +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import org.apache.storm.Config; -class StormServerPipelineFactory implements ChannelPipelineFactory { +import java.util.Map; + +class StormServerPipelineFactory extends ChannelInitializer { private Server server; + private Map conf; - StormServerPipelineFactory(Server server) { + StormServerPipelineFactory(Server server, Map conf) { this.server = server; + this.conf = conf; } - public ChannelPipeline getPipeline() throws Exception { + @Override + protected void initChannel(Channel ch) throws Exception { // Create a default pipeline implementation. - ChannelPipeline pipeline = Channels.pipeline(); + ChannelPipeline pipeline = ch.pipeline(); // Decoder - pipeline.addLast("decoder", new MessageDecoder()); + pipeline.addLast("decoder", new MessageDecoder(conf)); // Encoder pipeline.addLast("encoder", new MessageEncoder()); @@ -51,7 +55,5 @@ public ChannelPipeline getPipeline() throws Exception { } // business logic. pipeline.addLast("handler", new StormServerHandler(server)); - - return pipeline; } } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/IServerMessageHandler.java b/storm-core/src/jvm/org/apache/storm/pacemaker/IServerMessageHandler.java index 7dca2371c35..7ec897d2ef1 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/IServerMessageHandler.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/IServerMessageHandler.java @@ -21,5 +21,5 @@ public interface IServerMessageHandler { - public HBMessage handleMessage(HBMessage m, boolean authenticated); + HBMessage handleMessage(HBMessage m, boolean authenticated); } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java index 34f36653341..1d48ca8205f 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java @@ -17,29 +17,32 @@ */ package org.apache.storm.pacemaker; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.storm.Config; import org.apache.storm.generated.HBMessage; import org.apache.storm.messaging.netty.ISaslClient; import org.apache.storm.messaging.netty.NettyRenameThreadFactory; +import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec; import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.utils.StormBoundedExponentialBackoffRetry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.Configuration; import java.net.InetSocketAddress; import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import javax.security.auth.login.Configuration; -import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PacemakerClient implements ISaslClient { @@ -48,7 +51,7 @@ public class PacemakerClient implements ISaslClient { private String topo_name; private String secret; private boolean ready = false; - private final ClientBootstrap bootstrap; + private final Bootstrap bootstrap; private AtomicReference channelRef; private AtomicBoolean closing; private InetSocketAddress remote_addr; @@ -60,10 +63,17 @@ public class PacemakerClient implements ISaslClient { private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20); private int retryTimes = 0; + //the constructor is invoked by pacemaker-state-factory-test + public PacemakerClient() { + bootstrap = new Bootstrap(); + } + public PacemakerClient(Map config) { String host = (String)config.get(Config.PACEMAKER_HOST); int port = (int)config.get(Config.PACEMAKER_PORT); + int maxWorkers = (int)config.get(Config.PACEMAKER_CLIENT_MAX_THREADS); + topo_name = (String)config.get(Config.TOPOLOGY_NAME); if(topo_name == null) { topo_name = "pacemaker-client"; @@ -102,19 +112,28 @@ public PacemakerClient(Map config) { channelRef = new AtomicReference(null); setupMessaging(); - ThreadFactory bossFactory = new NettyRenameThreadFactory("client-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("client-worker"); - NioClientSocketChannelFactory factory = - new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); - bootstrap = new ClientBootstrap(factory); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("sendBufferSize", 5242880); - bootstrap.setOption("keepAlive", true); + + EventLoopGroup workerEventLoopGroup; + if (maxWorkers > 0) { + workerEventLoopGroup = new NioEventLoopGroup(maxWorkers, workerFactory); + } else { + // 0 means DEFAULT_EVENT_LOOP_THREADS + workerEventLoopGroup = new NioEventLoopGroup(0, workerFactory); + } + + bootstrap = new Bootstrap() + .group(workerEventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, 5242880) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) + .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .handler(new ThriftNettyClientCodec(this, config, authMethod)); remote_addr = new InetSocketAddress(host, port); - ChannelPipelineFactory pipelineFactory = new ThriftNettyClientCodec(this, config, authMethod).pipelineFactory(); - bootstrap.setPipelineFactory(pipelineFactory); bootstrap.connect(remote_addr); } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java index 322f5a009c6..637eca2548d 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java @@ -17,47 +17,38 @@ */ package org.apache.storm.pacemaker; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.Channel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.storm.generated.HBMessage; +import org.apache.storm.messaging.netty.ControlMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.messaging.netty.ControlMessage; -public class PacemakerClientHandler extends SimpleChannelUpstreamHandler { +public class PacemakerClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(PacemakerClientHandler.class); private PacemakerClient client; - - public PacemakerClientHandler(PacemakerClient client) { this.client = client; } @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent event) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { // register the newly established channel - Channel channel = ctx.getChannel(); + Channel channel = ctx.channel(); client.channelConnected(channel); - LOG.info("Connection established from {} to {}", - channel.getLocalAddress(), channel.getRemoteAddress()); + LOG.info("Connection established from {} to {}", channel.localAddress(), channel.remoteAddress()); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) { - LOG.debug("Got Message: {}", event.getMessage().toString()); - Object evm = event.getMessage(); + public void channelRead(ChannelHandlerContext ctx, Object evm) throws Exception { + LOG.debug("Got Message: {}", evm.toString()); if(evm instanceof ControlMessage) { LOG.debug("Got control message: {}", evm.toString()); - return; } else if(evm instanceof HBMessage) { client.gotMessage((HBMessage)evm); @@ -68,8 +59,8 @@ else if(evm instanceof HBMessage) { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) { - LOG.error("Connection to pacemaker failed", event.getCause()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.error("Connection to pacemaker failed", cause); client.reconnect(); } } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java index fa73d966ba7..ed45ef6be02 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java @@ -17,48 +17,51 @@ */ package org.apache.storm.pacemaker; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.storm.Config; import org.apache.storm.generated.HBMessage; import org.apache.storm.messaging.netty.ISaslServer; import org.apache.storm.messaging.netty.NettyRenameThreadFactory; +import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec; import org.apache.storm.security.auth.AuthUtils; -import java.lang.InterruptedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.Configuration; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import javax.security.auth.login.Configuration; -import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class PacemakerServer implements ISaslServer { - private static final long FIVE_MB_IN_BYTES = 5 * 1024 * 1024; + private static final int FIVE_MB_IN_BYTES = 5 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(PacemakerServer.class); - private final ServerBootstrap bootstrap; - private int port; private IServerMessageHandler handler; private String secret; - private String topo_name; - private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); - private ConcurrentSkipListSet authenticated_channels = new ConcurrentSkipListSet(); + private String topologyName; + private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE); + private ConcurrentSkipListSet authenticated_channels = new ConcurrentSkipListSet<>(); private ThriftNettyServerCodec.AuthMethod authMethod; + private EventLoopGroup workerEventLoopGroup; public PacemakerServer(IServerMessageHandler handler, Map config){ int maxWorkers = (int)config.get(Config.PACEMAKER_MAX_THREADS); - this.port = (int)config.get(Config.PACEMAKER_PORT); + int port = (int) config.get(Config.PACEMAKER_PORT); this.handler = handler; - this.topo_name = "pacemaker_server"; + this.topologyName = "pacemaker_server"; String auth = (String)config.get(Config.PACEMAKER_AUTH_METHOD); switch(auth) { @@ -86,31 +89,37 @@ public PacemakerServer(IServerMessageHandler handler, Map config){ throw new RuntimeException("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD."); } - ThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker"); - NioServerSocketChannelFactory factory; - if(maxWorkers > 0) { - factory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), - maxWorkers); - } - else { - factory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); - } - bootstrap = new ServerBootstrap(factory); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES); - bootstrap.setOption("keepAlive", true); - ChannelPipelineFactory pipelineFactory = new ThriftNettyServerCodec(this, config, authMethod).pipelineFactory(); - bootstrap.setPipelineFactory(pipelineFactory); - Channel channel = bootstrap.bind(new InetSocketAddress(port)); - allChannels.add(channel); - LOG.info("Bound server to port: {}", Integer.toString(port)); + if (maxWorkers > 0) { + // add extra one for boss + workerEventLoopGroup = new NioEventLoopGroup(maxWorkers + 1, workerFactory); + } else { + // 0 means DEFAULT_EVENT_LOOP_THREADS + workerEventLoopGroup = new NioEventLoopGroup(0, workerFactory); + } + + LOG.info("Create Netty Server " + name() + ", buffer_size: " + FIVE_MB_IN_BYTES + ", maxWorkers: " + maxWorkers); + + ServerBootstrap bootstrap = new ServerBootstrap() + .group(workerEventLoopGroup, workerEventLoopGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_RCVBUF, FIVE_MB_IN_BYTES) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new ThriftNettyServerCodec(this, config, authMethod)); + + // Bind and start to accept incoming connections. + try { + ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync(); + allChannels.add(f.channel()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** Implementing IServer. **/ @@ -121,11 +130,11 @@ public void channelConnected(Channel c) { public void cleanPipeline(Channel channel) { boolean authenticated = authenticated_channels.contains(channel); if(!authenticated) { - if(channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) { - channel.getPipeline().remove(ThriftNettyServerCodec.SASL_HANDLER); + if(channel.pipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) { + channel.pipeline().remove(ThriftNettyServerCodec.SASL_HANDLER); } - else if(channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) { - channel.getPipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER); + else if(channel.pipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) { + channel.pipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER); } } } @@ -151,10 +160,11 @@ public void closeChannel(Channel c) { c.close().awaitUninterruptibly(); allChannels.remove(c); authenticated_channels.remove(c); + workerEventLoopGroup.shutdownGracefully(); } public String name() { - return topo_name; + return topologyName; } public String secretKey() { diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java index ce9c8990125..b8e9d133812 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java @@ -17,26 +17,26 @@ */ package org.apache.storm.pacemaker.codec; -import org.jboss.netty.handler.codec.frame.FrameDecoder; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channel; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.storm.generated.HBMessage; import org.apache.storm.generated.HBServerMessageType; -import org.jboss.netty.buffer.ChannelBuffer; -import org.apache.storm.utils.Utils; import org.apache.storm.messaging.netty.ControlMessage; import org.apache.storm.messaging.netty.SaslMessageToken; +import org.apache.storm.utils.Utils; + +import java.util.List; -public class ThriftDecoder extends FrameDecoder { +public class ThriftDecoder extends ByteToMessageDecoder { private static final int INTEGER_SIZE = 4; @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { - + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List list) throws Exception { long available = buf.readableBytes(); if(available < INTEGER_SIZE) { - return null; + return; } buf.markReaderIndex(); @@ -47,7 +47,7 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe if(available < thriftLen) { // We haven't received the entire object yet, return and wait for more bytes. buf.resetReaderIndex(); - return null; + return; } byte serialized[] = new byte[thriftLen]; @@ -56,14 +56,15 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe if(m.get_type() == HBServerMessageType.CONTROL_MESSAGE) { ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob()); - return cm; + list.add(cm); } else if(m.get_type() == HBServerMessageType.SASL_MESSAGE_TOKEN) { SaslMessageToken sm = SaslMessageToken.read(m.get_data().get_message_blob()); - return sm; + list.add(sm); } else { - return m; + list.add(m); } } + } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java index 85408aad4e6..9b4704bf29b 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java @@ -17,27 +17,26 @@ */ package org.apache.storm.pacemaker.codec; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channel; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; import org.apache.storm.generated.HBMessage; import org.apache.storm.generated.HBMessageData; import org.apache.storm.generated.HBServerMessageType; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.ChannelBuffer; -import org.apache.storm.utils.Utils; import org.apache.storm.messaging.netty.ControlMessage; -import org.apache.storm.messaging.netty.SaslMessageToken; import org.apache.storm.messaging.netty.INettySerializable; -import java.io.IOException; +import org.apache.storm.messaging.netty.SaslMessageToken; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.thrift.TBase; -public class ThriftEncoder extends OneToOneEncoder { +import java.io.IOException; +import java.util.List; + +public class ThriftEncoder extends MessageToMessageEncoder { - private static final Logger LOG = LoggerFactory - .getLogger(ThriftEncoder.class); + private static final Logger LOG = LoggerFactory.getLogger(ThriftEncoder.class); private HBMessage encodeNettySerializable(INettySerializable netty_message, HBServerMessageType mType) { @@ -45,18 +44,21 @@ private HBMessage encodeNettySerializable(INettySerializable netty_message, HBMessageData message_data = new HBMessageData(); HBMessage m = new HBMessage(); try { - ChannelBuffer cbuffer = netty_message.buffer(); - if(cbuffer.hasArray()) { - message_data.set_message_blob(cbuffer.array()); + ByteBuf cbuffer = netty_message.buffer(); + try { + if (cbuffer.hasArray()) { + message_data.set_message_blob(cbuffer.array()); + } else { + byte buff[] = new byte[netty_message.encodeLength()]; + cbuffer.readBytes(buff, 0, netty_message.encodeLength()); + message_data.set_message_blob(buff); + } + m.set_type(mType); + m.set_data(message_data); + return m; + } finally { + cbuffer.release(); } - else { - byte buff[] = new byte[netty_message.encodeLength()]; - cbuffer.readBytes(buff, 0, netty_message.encodeLength()); - message_data.set_message_blob(buff); - } - m.set_type(mType); - m.set_data(message_data); - return m; } catch( IOException e) { LOG.error("Failed to encode NettySerializable: ", e); @@ -65,9 +67,9 @@ private HBMessage encodeNettySerializable(INettySerializable netty_message, } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) { + protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception { if(msg == null) { - return null; + return; } LOG.debug("Trying to encode: " + msg.getClass().toString() + " : " + msg.toString()); @@ -95,16 +97,17 @@ else if(msg instanceof SaslMessageToken) { try { byte serialized[] = Utils.thriftSerialize(m); - ChannelBuffer ret = ChannelBuffers.directBuffer(serialized.length + 4); + ByteBuf ret = ByteBufAllocator.DEFAULT.ioBuffer(serialized.length + 4); ret.writeInt(serialized.length); ret.writeBytes(serialized); - return ret; + list.add(ret); } catch (RuntimeException e) { LOG.error("Failed to serialize.", e); throw e; } } + } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java index 015ef30b06b..f0300eb2eaf 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java @@ -17,24 +17,25 @@ */ package org.apache.storm.pacemaker.codec; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import org.apache.storm.messaging.netty.KerberosSaslClientHandler; import org.apache.storm.messaging.netty.SaslStormClientHandler; -import org.apache.storm.security.auth.AuthUtils; -import java.io.IOException; -import java.util.Map; import org.apache.storm.pacemaker.PacemakerClient; import org.apache.storm.pacemaker.PacemakerClientHandler; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +import org.apache.storm.security.auth.AuthUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ThriftNettyClientCodec { +import java.io.IOException; +import java.util.Map; + +public class ThriftNettyClientCodec extends ChannelInitializer { public static final String SASL_HANDLER = "sasl-handler"; public static final String KERBEROS_HANDLER = "kerberos-handler"; - + public enum AuthMethod { DIGEST, KERBEROS, @@ -54,41 +55,38 @@ public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map storm_conf, this.storm_conf = storm_conf; } - public ChannelPipelineFactory pipelineFactory() { - return new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("encoder", new ThriftEncoder()); - pipeline.addLast("decoder", new ThriftDecoder()); - - if (authMethod == AuthMethod.KERBEROS) { - try { - LOG.debug("Adding KerberosSaslClientHandler to pacemaker client pipeline."); - pipeline.addLast(KERBEROS_HANDLER, - new KerberosSaslClientHandler(client, - storm_conf, - AuthUtils.LOGIN_CONTEXT_PACEMAKER_CLIENT)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - else if(authMethod == AuthMethod.DIGEST) { - try { - LOG.debug("Adding SaslStormClientHandler to pacemaker client pipeline."); - pipeline.addLast(SASL_HANDLER, new SaslStormClientHandler(client)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - else { - client.channelReady(); - } + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("encoder", new ThriftEncoder()); + pipeline.addLast("decoder", new ThriftDecoder()); - pipeline.addLast("PacemakerClientHandler", new PacemakerClientHandler(client)); - return pipeline; + if (authMethod == AuthMethod.KERBEROS) { + try { + LOG.debug("Adding KerberosSaslClientHandler to pacemaker client pipeline."); + pipeline.addLast(KERBEROS_HANDLER, + new KerberosSaslClientHandler(client, + storm_conf, + AuthUtils.LOGIN_CONTEXT_PACEMAKER_CLIENT)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + else if(authMethod == AuthMethod.DIGEST) { + try { + LOG.debug("Adding SaslStormClientHandler to pacemaker client pipeline."); + pipeline.addLast(SASL_HANDLER, new SaslStormClientHandler(client)); } - }; + catch (IOException e) { + throw new RuntimeException(e); + } + } + else { + client.channelReady(); + } + + pipeline.addLast("PacemakerClientHandler", new PacemakerClientHandler(client)); } + } diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java index 37c38a454fb..88ea3322a8b 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java @@ -17,6 +17,12 @@ */ package org.apache.storm.pacemaker.codec; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.storm.Config; import org.apache.storm.messaging.netty.ISaslServer; import org.apache.storm.messaging.netty.IServer; @@ -27,17 +33,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Map; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ThriftNettyServerCodec { +public class ThriftNettyServerCodec extends ChannelInitializer { public static final String SASL_HANDLER = "sasl-handler"; public static final String KERBEROS_HANDLER = "kerberos-handler"; - + public enum AuthMethod { DIGEST, KERBEROS, @@ -57,43 +58,38 @@ public ThriftNettyServerCodec(IServer server, Map storm_conf, AuthMethod authMet this.storm_conf = storm_conf; } - public ChannelPipelineFactory pipelineFactory() { - return new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() { - - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("encoder", new ThriftEncoder()); - pipeline.addLast("decoder", new ThriftDecoder()); - if(authMethod == AuthMethod.DIGEST) { - try { - LOG.debug("Adding SaslStormServerHandler to pacemaker server pipeline."); - pipeline.addLast(SASL_HANDLER, new SaslStormServerHandler((ISaslServer)server)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - else if(authMethod == AuthMethod.KERBEROS) { - try { - LOG.debug("Adding KerberosSaslServerHandler to pacemaker server pipeline."); - ArrayList authorizedUsers = new ArrayList(1); - authorizedUsers.add((String)storm_conf.get(Config.NIMBUS_DAEMON_USER)); - pipeline.addLast(KERBEROS_HANDLER, new KerberosSaslServerHandler((ISaslServer)server, - storm_conf, - AuthUtils.LOGIN_CONTEXT_PACEMAKER_SERVER, - authorizedUsers)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - else if(authMethod == AuthMethod.NONE) { - LOG.debug("Not authenticating any clients. AuthMethod is NONE"); - } - - pipeline.addLast("handler", new StormServerHandler(server)); - return pipeline; + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("encoder", new ThriftEncoder()); + pipeline.addLast("decoder", new ThriftDecoder()); + if(authMethod == AuthMethod.DIGEST) { + try { + LOG.debug("Adding SaslStormServerHandler to pacemaker server pipeline."); + pipeline.addLast(SASL_HANDLER, new SaslStormServerHandler((ISaslServer)server)); + } + catch (IOException e) { + throw new RuntimeException(e); } - }; + } + else if(authMethod == AuthMethod.KERBEROS) { + try { + LOG.debug("Adding KerberosSaslServerHandler to pacemaker server pipeline."); + ArrayList authorizedUsers = new ArrayList(1); + authorizedUsers.add((String)storm_conf.get(Config.NIMBUS_DAEMON_USER)); + pipeline.addLast(KERBEROS_HANDLER, new KerberosSaslServerHandler((ISaslServer)server, + storm_conf, + AuthUtils.LOGIN_CONTEXT_PACEMAKER_SERVER, + authorizedUsers)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + else if(authMethod == AuthMethod.NONE) { + LOG.debug("Not authenticating any clients. AuthMethod is NONE"); + } + + pipeline.addLast("handler", new StormServerHandler(server)); } }