From 48ccd40aad8997a9d6717957dcf698ce59b72715 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Wed, 3 Apr 2019 18:02:54 +0900 Subject: [PATCH] Add BrokerConstants.NETTY_WORKER_THREADS --- .../main/java/io/moquette/BrokerConstants.java | 1 + .../java/io/moquette/broker/NewNettyAcceptor.java | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 086d032c7..8e7c48cac 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -71,6 +71,7 @@ public final class BrokerConstants { public static final String NETTY_EPOLL_PROPERTY_NAME = "netty.epoll"; public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size"; public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092; + public static final String NETTY_WORKER_THREADS = "netty.worker_threads"; public static final String IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME = "immediate_buffer_flush"; public static final String METRICS_ENABLE_PROPERTY_NAME = "use_metrics"; public static final String METRICS_LIBRATO_EMAIL_PROPERTY_NAME = "metrics.librato.email"; diff --git a/broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java b/broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java index 30c3fa03d..573adcbe5 100644 --- a/broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java +++ b/broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java @@ -146,17 +146,28 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte nettyChannelTimeoutSeconds = props.intProp(BrokerConstants.NETTY_CHANNEL_TIMEOUT_SECONDS_PROPERTY_NAME, 10); maxBytesInMessage = props.intProp(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, BrokerConstants.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE); + int nettyWorkerThreads = props.intProp(BrokerConstants.NETTY_WORKER_THREADS, 0); boolean epoll = props.boolProp(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false); if (epoll) { LOG.info("Netty is using Epoll"); bossGroup = new EpollEventLoopGroup(); - workerGroup = new EpollEventLoopGroup(); + if (nettyWorkerThreads == 0) { + workerGroup = new EpollEventLoopGroup(); + } + else { + workerGroup = new EpollEventLoopGroup(nettyWorkerThreads); + } channelClass = EpollServerSocketChannel.class; } else { LOG.info("Netty is using NIO"); bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); + if (nettyWorkerThreads == 0) { + workerGroup = new NioEventLoopGroup(); + } + else { + workerGroup = new NioEventLoopGroup(nettyWorkerThreads); + } channelClass = NioServerSocketChannel.class; }