Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3163 Experimental support for Netty IO_URING incubator #5296

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions artemis-core-client-osgi/pom.xml
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
*
</Import-Package>
9 changes: 9 additions & 0 deletions artemis-core-client/pom.xml
Original file line number Diff line number Diff line change
@@ -89,6 +89,15 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-kqueue</artifactId>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
Original file line number Diff line number Diff line change
@@ -353,4 +353,11 @@ public interface ActiveMQClientLogger {

@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);

@LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailability(Throwable e);

@LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailabilitynoClass();

}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.incubator.channel.uring.IOUring;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;
import org.slf4j.Logger;
@@ -45,6 +46,18 @@ public static final boolean isEpollAvailable() {
}
}

public static final boolean isIoUringAvailable() {
try {
return Env.isLinuxOs() && IOUring.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
return false;
}
}

public static final boolean isKQueueAvailable() {
try {
return Env.isMacOs() && KQueue.isAvailable();
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.ResourceLeakDetector;
@@ -137,6 +139,7 @@ public class NettyConnector extends AbstractConnector {
public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
public static String IOURING_CONNECTOR_TYPE = "IO_URING";

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector {

private boolean useKQueue;

private boolean useIoUring;

private int remotingThreads;

private boolean useGlobalWorkerPool;
@@ -404,6 +409,7 @@ public NettyConnector(final Map<String, Object> configuration,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@@ -528,41 +534,61 @@ public synchronized void start() {
return;
}

boolean defaultRemotingThreads = remotingThreads == -1;

if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}

String connectorType;

if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;

if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new IOUringEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new IOUringEventLoopGroup(remotingThreads);
}

connectorType = IOURING_CONNECTOR_TYPE;
channelClazz = IOUringSocketChannel.class;

logger.debug("Connector {} using native io_uring", this);
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new EpollEventLoopGroup(remotingThreads);
}

connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;

logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new KQueueEventLoopGroup(remotingThreads);
}

connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;

logger.debug("Connector {} using native kqueue", this);
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(remotingThreads);
}

connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;

logger.debug("Connector {} using nio", this);
}
// if we are a servlet wrap the socketChannelFactory
Original file line number Diff line number Diff line change
@@ -66,6 +66,8 @@ public class TransportConstants {

public static final String USE_EPOLL_PROP_NAME = "useEpoll";

public static final String USE_IOURING_PROP_NAME = "useIoUring";

public static final String USE_KQUEUE_PROP_NAME = "useKQueue";

@Deprecated
@@ -213,6 +215,8 @@ public class TransportConstants {

public static final boolean DEFAULT_USE_KQUEUE = true;

public static final boolean DEFAULT_USE_IOURING = false;

public static final boolean DEFAULT_USE_INVM = false;

public static final boolean DEFAULT_USE_SERVLET = false;
@@ -409,6 +413,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@@ -484,6 +489,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
1 change: 1 addition & 0 deletions artemis-jms-client-osgi/pom.xml
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
*
</Import-Package>
13 changes: 13 additions & 0 deletions artemis-pom/pom.xml
Original file line number Diff line number Diff line change
@@ -440,6 +440,19 @@
<classifier>${netty-transport-native-kqueue-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
1 change: 1 addition & 0 deletions artemis-server-osgi/pom.xml
Original file line number Diff line number Diff line change
@@ -128,6 +128,7 @@
org.glassfish.json*;resolution:=optional,
org.postgresql*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
java.net.http*;resolution:=optional,
com.sun.net.httpserver*;resolution:=optional,
Original file line number Diff line number Diff line change
@@ -64,6 +64,8 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -112,6 +114,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
public static final String IOURING_ACCEPTOR_TYPE = "EXPERIMENTAL_IO_URING";

static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@@ -148,6 +151,8 @@ public class NettyAcceptor extends AbstractAcceptor {

private final boolean useKQueue;

private final boolean useIoUring;

private final ProtocolHandler protocolHandler;

private final String host;
@@ -276,6 +281,7 @@ public NettyAcceptor(final String name,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@@ -425,12 +431,23 @@ public synchronized void start() throws Exception {
eventLoopGroup = new DefaultEventLoopGroup();
} else {

boolean defaultRemotingThreads = remotingThreads == -1;

if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}

if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;

channelClazz = IOUringServerSocketChannel.class;
eventLoopGroup = new IOUringEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = IOURING_ACCEPTOR_TYPE;

logger.debug("Acceptor using native io_uring");
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
channelClazz = EpollServerSocketChannel.class;
eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = EPOLL_ACCEPTOR_TYPE;
@@ -446,6 +463,7 @@ public synchronized void start() throws Exception {
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = NIO_ACCEPTOR_TYPE;

logger.debug("Acceptor using nio");
}
}
23 changes: 20 additions & 3 deletions docs/user-manual/configuring-transports.adoc
Original file line number Diff line number Diff line change
@@ -243,14 +243,14 @@ These Native transports add features specific to a particular platform, generate

Both Clients and Server can benefit from this.

Current Supported Platforms.
Currently supported platforms:

* Linux running 64bit JVM
* MacOS running 64bit JVM

Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected.
Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected.

If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.

==== Linux Native Transport

@@ -263,6 +263,23 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is
Setting this to `false` will force the use of Java NIO instead of epoll.
Default is `true`


Additionally, Apache ActiveMQ Artemis offers `experimental` support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring.

The following properties are specific to this native transport:

useIoUring::
enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected.
Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO.
Default is `false`

[WARNING]
====
[#io_uring-warning]
IO_URING support is `experimental` at this point. Using it _could_ introduce unwanted side effects or unpredicted behavior.
It's currently not recommended for production or any otherwise critical use.
====

==== MacOS Native Transport

On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -121,6 +121,7 @@
<mockito.version>5.14.1</mockito.version>
<jctools.version>4.0.5</jctools.version>
<netty.version>4.1.114.Final</netty.version>
<netty.incubator.io_uring.version>0.0.25.Final</netty.incubator.io_uring.version>
<hdrhistogram.version>2.2.2</hdrhistogram.version>
<curator.version>5.7.0</curator.version>
<zookeeper.version>3.9.2</zookeeper.version>
@@ -261,6 +262,7 @@

<netty-transport-native-epoll-classifier>linux-x86_64</netty-transport-native-epoll-classifier>
<netty-transport-native-kqueue-classifier>osx-x86_64</netty-transport-native-kqueue-classifier>
<netty-transport-native-io_uring-classifier>linux-x86_64</netty-transport-native-io_uring-classifier>

<fast-tests>false</fast-tests>