diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3f8212f997637..80035d8bdf60b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -102,6 +102,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkContext.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkContext.java
new file mode 100644
index 0000000000000..845f619d5b228
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ConcurrentModificationException;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+/**
+ * Store and distribute static factories for {@link java.net} sockets and {@link java.nio.channels} channel instances.
+ * By default, this class is a no-op, and distributes the default factories provided by the system.
+ *
In testing environments, {@link #install(SelectorProvider, SocketFactory, ServerSocketFactory)} can be used to
+ * replace the default factories with custom factories for performing assertions.
+ */
+@SuppressWarnings("checkstyle:useNetworkContext")
+public class NetworkContext {
+
+ private static final Semaphore LOCK = new Semaphore(1);
+ private static volatile SelectorProvider selectorProvider = SelectorProvider.provider();
+ private static volatile SocketFactory socketFactory = SocketFactory.getDefault();
+ private static volatile ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
+
+ /**
+ * Get the current provider for channels, which may change at any time.
+ * Use the result as soon as possible to avoid using a stale provider.
+ * @return The {@link SelectorProvider} factory for {@link java.nio.channels} implementations, non-null
+ */
+ public static SelectorProvider provider() {
+ return selectorProvider;
+ }
+
+ /**
+ * Get the current factory for client sockets, which may change at any time.
+ * Use the result as soon as possible to avoid using a stale factory.
+ * @return The {@link SocketFactory} factory for {@link java.net.Socket} implementations, non-null
+ */
+ public static SocketFactory factory() {
+ return socketFactory;
+ }
+
+ /**
+ * Get the current factory for server sockets, which may change at any time.
+ * Use the result as soon as possible to avoid using a stale factory.
+ * @return The {@link ServerSocketFactory} factory for {@link java.net.ServerSocket} implementations, non-null
+ */
+ public static ServerSocketFactory serverFactory() {
+ return serverSocketFactory;
+ }
+
+ /**
+ * Permanently install alternative factories for network resources. Can only be called once, and subsequent calls
+ * will fail with {@link ConcurrentModificationException}
+ *
This is meant for use only in tests. Installation is non-atomic, so network resources may be created with the
+ * default factories after installation.
+ * @param newSelectorProvider A provider for NIO selectors and sockets, non-null
+ * @param newSocketFactory A provider for client-side TCP sockets, non-null
+ * @param newServerSocketFactory A provider for server-side TCP sockets, non-null
+ * @throws ConcurrentModificationException if called more than once.
+ * @throws NullPointerException if any argument is null.
+ */
+ protected static void install(
+ SelectorProvider newSelectorProvider,
+ SocketFactory newSocketFactory,
+ ServerSocketFactory newServerSocketFactory
+ ) {
+ Objects.requireNonNull(newSelectorProvider, "SelectorProvider must be non-null");
+ Objects.requireNonNull(newSocketFactory, "SocketFactory must be non-null");
+ Objects.requireNonNull(newServerSocketFactory, "ServerSocketFactory must be non-null");
+ if (!LOCK.tryAcquire()) {
+ throw new ConcurrentModificationException("The network context is already in-use");
+ }
+ selectorProvider = newSelectorProvider;
+ socketFactory = newSocketFactory;
+ serverSocketFactory = newServerSocketFactory;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 68698ab7b8d21..d61c3bcfe3986 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -157,7 +157,7 @@ public Selector(int maxReceiveSize,
MemoryPool memoryPool,
LogContext logContext) {
try {
- this.nioSelector = java.nio.channels.Selector.open();
+ this.nioSelector = NetworkContext.provider().openSelector();
} catch (IOException e) {
throw new KafkaException(e);
}
@@ -264,7 +264,7 @@ public static String generateConnectionId(Socket socket, int connectionIndex) {
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
- SocketChannel socketChannel = SocketChannel.open();
+ SocketChannel socketChannel = NetworkContext.provider().openSocketChannel();
SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index 2375f95f6190c..1eafbbf45788e 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -64,7 +64,7 @@ public EchoServer(SecurityProtocol securityProtocol, Map configs) thr
this.serverSocket.setSoTimeout(SO_TIMEOUT_MS);
break;
case PLAINTEXT:
- this.serverSocket = new ServerSocket(0);
+ this.serverSocket = NetworkContext.serverFactory().createServerSocket(0);
this.sslFactory = null;
break;
default:
diff --git a/clients/src/test/java/org/apache/kafka/common/network/LeakTestingExtension.java b/clients/src/test/java/org/apache/kafka/common/network/LeakTestingExtension.java
new file mode 100644
index 0000000000000..5bffbe6223d8e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/LeakTestingExtension.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.utils.LeakTester;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.opentest4j.AssertionFailedError;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+public abstract class LeakTestingExtension {
+
+ /**
+ * Ignore leaks from a suite entirely.
+ * Use when there is a real resource leak that should be addressed as soon as possible.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.TYPE)
+ public @interface IgnoreAll {
+ String jira();
+ }
+
+ private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(LeakTestingExtension.class);
+ private static final String ROOT_TESTER_INSTANCE = "leak-tester";
+ private static final String PER_TEST_INSTANCE = "leak-test";
+
+ protected boolean ignored(ExtensionContext extensionContext) {
+ return extensionContext.getTestClass()
+ .map(clazz -> clazz.getAnnotationsByType(IgnoreAll.class))
+ .map(arr -> arr.length > 0)
+ .orElse(false);
+ }
+
+ protected abstract String message();
+
+ private NetworkContextLeakTester tester(ExtensionContext extensionContext) {
+ // Use the root namespace which lives across multiple test suites.
+ ExtensionContext.Store store = extensionContext.getRoot().getStore(NAMESPACE);
+ return store.getOrComputeIfAbsent(ROOT_TESTER_INSTANCE, ignored -> new NetworkContextLeakTester(), NetworkContextLeakTester.class);
+ }
+
+ protected void before(ExtensionContext extensionContext) {
+ if (ignored(extensionContext)) {
+ return;
+ }
+ extensionContext.getStore(NAMESPACE).put(PER_TEST_INSTANCE, tester(extensionContext).start());
+ }
+
+ protected void after(ExtensionContext extensionContext) throws AssertionFailedError {
+ if (ignored(extensionContext)) {
+ return;
+ }
+ try {
+ ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
+ store.getOrDefault(PER_TEST_INSTANCE, LeakTester.LeakTest.class, () -> {}).close();
+ } catch (AssertionFailedError e) {
+ throw new AssertionFailedError(message(), e);
+ }
+ }
+
+ /**
+ * This class applies a coarse leak test for a whole class at a time.
+ * This is automatically loaded for all classes, but can be disabled with {@link IgnoreAll}
+ * See {@link org.junit.jupiter.api.extension.Extension} ServiceLoader manifest.
+ */
+ public static class All extends LeakTestingExtension implements BeforeAllCallback, AfterAllCallback {
+
+ protected String message() {
+ return String.format(
+ "This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @%s.%s(\"KAFKA-XYZ\")",
+ LeakTestingExtension.class.getSimpleName(), IgnoreAll.class.getSimpleName());
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) throws AssertionFailedError {
+ before(extensionContext);
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) throws AssertionFailedError {
+ after(extensionContext);
+ }
+ }
+
+ /**
+ * This class applies a fine leak test for individual tests.
+ * This can be opted-in by including it in an {@link org.junit.jupiter.api.extension.ExtendWith} suite annotation.
+ */
+ public static class Each extends LeakTestingExtension implements BeforeEachCallback, AfterEachCallback {
+
+ protected String message() {
+ return String.format(
+ "This test contains a resource leak. Close the resources, or if the class-level asserts passed, remove @%s(%s.%s.class)",
+ ExtendWith.class.getSimpleName(), LeakTestingExtension.class.getSimpleName(), Each.class.getSimpleName());
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext extensionContext) throws AssertionFailedError {
+ before(extensionContext);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext extensionContext) throws AssertionFailedError {
+ after(extensionContext);
+ }
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkContextLeakTester.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkContextLeakTester.java
new file mode 100644
index 0000000000000..9faa82d9a1787
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkContextLeakTester.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.utils.LeakTester;
+import org.apache.kafka.common.utils.PredicateLeakTester;
+
+import java.io.IOException;
+import java.net.ProtocolFamily;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.Pipe;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.function.Supplier;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A {@link LeakTester} which attaches to the static {@link TestNetworkContext} to automatically register sockets
+ * created by Kafka clients and servers for leak testing.
+ */
+public class NetworkContextLeakTester implements LeakTester {
+
+ public NetworkContextLeakTester() {
+ }
+
+ public LeakTest start() {
+ return RecordingNetworkDecorator.INSTANCE.start();
+ }
+
+ private static class RecordingNetworkDecorator implements TestNetworkContext.Decorator, LeakTester {
+
+ private static final RecordingNetworkDecorator INSTANCE = TestNetworkContext.install(RecordingNetworkDecorator::new);
+
+ private final LeakTester tester;
+ private final RecordingSelectorProvider selectorProvider;
+ private final RecordingSocketFactory socketFactory;
+ private final RecordingServerSocketFactory serverSocketFactory;
+
+ private RecordingNetworkDecorator(Supplier inner) {
+ selectorProvider = new RecordingSelectorProvider(() -> inner.get().provider());
+ socketFactory = new RecordingSocketFactory(() -> inner.get().factory());
+ serverSocketFactory = new RecordingServerSocketFactory(() -> inner.get().serverFactory());
+ tester = LeakTester.combine(
+ selectorProvider.datagramTester,
+ selectorProvider.selectorTester,
+ selectorProvider.pipeTester,
+ selectorProvider.socketChannelTester,
+ selectorProvider.serverSocketTester,
+ socketFactory.socketTester,
+ serverSocketFactory.serverSocketTester,
+ serverSocketFactory.socketTester
+ );
+ }
+
+ @Override
+ public SelectorProvider provider() {
+ return selectorProvider;
+ }
+
+ @Override
+ public SocketFactory factory() {
+ return socketFactory;
+ }
+
+ @Override
+ public ServerSocketFactory serverFactory() {
+ return serverSocketFactory;
+ }
+
+ @Override
+ public LeakTest start() {
+ return tester.start();
+ }
+ }
+
+ private static class RecordingSelectorProvider extends TestNetworkContext.SelectorProviderDecorator {
+
+ private final PredicateLeakTester datagramTester;
+ private final PredicateLeakTester pipeTester;
+ private final PredicateLeakTester selectorTester;
+ private final PredicateLeakTester serverSocketTester;
+ private final PredicateLeakTester socketChannelTester;
+
+ private RecordingSelectorProvider(Supplier inner) {
+ super(inner);
+ datagramTester = new PredicateLeakTester<>(AbstractInterruptibleChannel::isOpen, DatagramChannel.class);
+ pipeTester = new PredicateLeakTester<>(pipe -> pipe.source().isOpen() || pipe.sink().isOpen(), Pipe.class);
+ selectorTester = new PredicateLeakTester<>(Selector::isOpen, AbstractSelector.class);
+ serverSocketTester = new PredicateLeakTester<>(AbstractInterruptibleChannel::isOpen, ServerSocketChannel.class);
+ socketChannelTester = new PredicateLeakTester<>(AbstractInterruptibleChannel::isOpen, SocketChannel.class);
+ }
+
+ @Override
+ public DatagramChannel openDatagramChannel() throws IOException {
+ return datagramTester.open(super.openDatagramChannel());
+ }
+
+ @Override
+ public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
+ return datagramTester.open(super.openDatagramChannel(family));
+ }
+
+ @Override
+ public Pipe openPipe() throws IOException {
+ return pipeTester.open(super.openPipe());
+ }
+
+ @Override
+ public AbstractSelector openSelector() throws IOException {
+ return selectorTester.open(super.openSelector());
+ }
+
+ @Override
+ public ServerSocketChannel openServerSocketChannel() throws IOException {
+ ServerSocketChannel original = super.openServerSocketChannel();
+ ServerSocketChannel spy = spy(original);
+ try {
+ doAnswer(invocation -> socketChannelTester.open((SocketChannel) invocation.callRealMethod())).when(spy).accept();
+ } catch (IOException e) {
+ return serverSocketTester.open(original);
+ }
+ return serverSocketTester.open(spy);
+ }
+
+ @Override
+ public SocketChannel openSocketChannel() throws IOException {
+ return socketChannelTester.open(super.openSocketChannel());
+ }
+ }
+
+ private static class RecordingSocketFactory extends TestNetworkContext.SocketFactoryDecorator {
+
+ private final PredicateLeakTester socketTester;
+
+ private RecordingSocketFactory(Supplier inner) {
+ super(inner);
+ socketTester = new PredicateLeakTester<>(socket -> !socket.isClosed(), Socket.class);
+ }
+
+ @Override
+ public Socket intercept(Socket socket) {
+ return socketTester.open(socket);
+ }
+ }
+
+ private static class RecordingServerSocketFactory extends TestNetworkContext.ServerSocketFactoryDecorator {
+
+ private final PredicateLeakTester serverSocketTester;
+ private final PredicateLeakTester socketTester;
+
+ private RecordingServerSocketFactory(Supplier inner) {
+ super(inner);
+ serverSocketTester = new PredicateLeakTester<>(serverSocket -> !serverSocket.isClosed(), ServerSocket.class);
+ socketTester = new PredicateLeakTester<>(socket -> !socket.isClosed(), Socket.class);
+ }
+
+ @Override
+ protected ServerSocket intercept(ServerSocket serverSocket) {
+ ServerSocket spy = spy(serverSocket);
+ try {
+ doAnswer(invocation -> socketTester.open((Socket) invocation.callRealMethod())).when(spy).accept();
+ } catch (IOException e) {
+ return serverSocketTester.open(serverSocket);
+ }
+ return serverSocketTester.open(spy);
+ }
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 2509c0a1f8b27..907d08ccd58c9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -111,7 +111,7 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco
setDaemon(true);
ServerSocketChannel serverSocketChannel = null;
try {
- serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel = NetworkContext.provider().openServerSocketChannel();
this.serverSocketChannel = serverSocketChannel;
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
@@ -389,7 +389,7 @@ public void run() {
java.nio.channels.Selector acceptSelector = null;
try {
- acceptSelector = java.nio.channels.Selector.open();
+ acceptSelector = NetworkContext.provider().openSelector();
serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
while (serverSocketChannel.isOpen()) {
if (acceptSelector.select(1000) > 0) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
index 8fa28e9446e63..8e752ebf74c72 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -28,7 +28,7 @@ public class PlaintextSender extends Thread {
@SuppressWarnings("this-escape")
public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
super(() -> {
- try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+ try (Socket connection = NetworkContext.factory().createSocket(serverAddress.getAddress(), serverAddress.getPort());
OutputStream os = connection.getOutputStream()) {
os.write(payload);
os.flush();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 15a38b172bdc1..5e763eb81863c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -194,7 +194,7 @@ public void testNoRouteToHost() {
@Test
public void testConnectionRefused() throws Exception {
String node = "0";
- ServerSocket nonListeningSocket = new ServerSocket(0);
+ ServerSocket nonListeningSocket = NetworkContext.serverFactory().createServerSocket(0);
int nonListeningPort = nonListeningSocket.getLocalPort();
selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
while (selector.disconnected().containsKey(node)) {
@@ -441,7 +441,7 @@ public void registerFailure() throws Exception {
try (MockedConstruction mockedMetadataRegistry =
mockConstruction(Selector.SelectorChannelMetadataRegistry.class)) {
Selector selector = new Selector(CONNECTION_MAX_IDLE_MS, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
- final SocketChannel socketChannel = SocketChannel.open();
+ final SocketChannel socketChannel = NetworkContext.provider().openSocketChannel();
socketChannel.configureBlocking(false);
IOException e = assertThrows(IOException.class, () -> selector.register(channelId, socketChannel));
assertTrue(e.getCause().getMessage().contains("Test exception"), "Unexpected exception: " + e);
@@ -690,7 +690,7 @@ public void testMuteOnOOM() throws Exception {
selector = new Selector(NetworkReceive.UNLIMITED, CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup",
new HashMap<>(), true, false, channelBuilder, pool, new LogContext());
- try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ try (ServerSocketChannel ss = NetworkContext.provider().openServerSocketChannel()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
@@ -781,7 +781,7 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception {
SelectionKey selectionKey = mock(SelectionKey.class);
when(kafkaChannel.selectionKey()).thenReturn(selectionKey);
- SocketChannel socket = SocketChannel.open();
+ SocketChannel socket = NetworkContext.provider().openSocketChannel();
when(selectionKey.channel()).thenReturn(socket);
when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT);
when(selectionKey.attachment()).thenReturn(kafkaChannel);
@@ -824,7 +824,7 @@ public void testOutboundConnectionsCountInConnectionCreationMetric() throws Exce
public void testInboundConnectionsCountInConnectionCreationMetric() throws Exception {
int conns = 5;
- try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ try (ServerSocketChannel ss = NetworkContext.provider().openServerSocketChannel()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
@@ -851,7 +851,7 @@ public void testConnectionsByClientMetric() throws Exception {
ClientInformation.UNKNOWN_NAME_OR_VERSION, ClientInformation.UNKNOWN_NAME_OR_VERSION);
Map knownNameAndVersion = softwareNameAndVersionTags("A", "B");
- try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ try (ServerSocketChannel ss = NetworkContext.provider().openServerSocketChannel()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 8a9704c16216e..20adb5beb1e8c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -260,7 +260,7 @@ public void testMuteOnOOM() throws Exception {
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
new HashMap<>(), true, false, channelBuilder, pool, new LogContext());
- try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ try (ServerSocketChannel ss = NetworkContext.provider().openServerSocketChannel()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/TestNetworkContext.java b/clients/src/test/java/org/apache/kafka/common/network/TestNetworkContext.java
new file mode 100644
index 0000000000000..06dbd65fd9d75
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/TestNetworkContext.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ProtocolFamily;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.Channel;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.Pipe;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+/**
+ * A utility for manipulating the {@link NetworkContext} during tests.
+ * The entry-point into this class is {@link #install(Function)} which installs a new {@link Decorator} into the
+ * network context. Decorators are responsible for processing arguments, calling inner decorators, and processing the
+ * return value from inner decorators. The root decorator is the default behavior of the network context, providing
+ * real network resources.
+ *
Multiple decorators can be installed concurrently. Installation is non-atomic, meaning that sockets may be
+ * created without a Decorator after installation.
+ *
This class is thread-safe.
+ */
+public class TestNetworkContext {
+
+ /**
+ * A single layer of the network context. Multiple Decorators form a singly-linked-list to compose behavior.
+ */
+ public interface Decorator {
+ /**
+ * It is recommended to construct a single subclass of {@link SelectorProviderDecorator} with overridden
+ * behavior, and then return that instance in this method.
+ * @return the {@link SelectorProvider} to use for {@link java.nio.channels} instances.
+ */
+ SelectorProvider provider();
+
+ /**
+ * It is recommended to construct a single subclass of {@link SocketFactoryDecorator} with overridden
+ * behavior, and then return that instance in this method.
+ * @return the {@link SocketFactory} to use for {@link java.net.Socket} instances.
+ */
+ SocketFactory factory();
+
+ /**
+ * It is recommended to construct a single subclass of {@link ServerSocketFactoryDecorator} with overridden
+ * behavior, and then return that instance in this method.
+ * @return the {@link ServerSocketFactory} to use for {@link java.net.ServerSocket} instances.
+ */
+ ServerSocketFactory serverFactory();
+ }
+
+ /**
+ * Install a new decorator into the network context
+ * @param newDecorator Function for constructing a new {@link Decorator} object. The argument to this function is a
+ * supplier of the inner Decorator that should be called by the returned Decorator. The return
+ * value of this supplier is non-null, but can change at any time, and so should not be cached.
+ * @param The subtype of the new decorator being constructed.
+ * @return The decorator that was constructed by calling {@code newDecorator}
+ */
+ public static synchronized T install(Function, T> newDecorator) {
+ return LazyNetworkContext.install(newDecorator);
+ }
+
+ /**
+ * Lazy holder for the static state of the TestNetworkContext.
+ * Used to delay locking the {@link NetworkContext} until the first time a decorator is installed.
+ */
+ private static final class LazyNetworkContext {
+ private static final AtomicReference ACTIVE;
+ private static AtomicReference lastInstalled;
+
+ static {
+ ACTIVE = new AtomicReference<>(RootDecorator.INSTANCE);
+ lastInstalled = new AtomicReference<>(RootDecorator.INSTANCE);
+ // These instances are "fixed" as the NetworkContext can only be installed once.
+ // Their behavior can be changed by updating ACTIVE
+ SelectorProviderDecorator provider = new SelectorProviderDecorator(() -> ACTIVE.get().provider());
+ SocketFactoryDecorator factory = new SocketFactoryDecorator(() -> ACTIVE.get().factory());
+ ServerSocketFactoryDecorator serverFactory = new ServerSocketFactoryDecorator(() -> ACTIVE.get().serverFactory());
+ NetworkContext.install(provider, factory, serverFactory);
+ }
+
+ public static synchronized T install(Function, T> newDecorator) {
+ AtomicReference last = lastInstalled;
+ T decorator = newDecorator.apply(last::get);
+ ACTIVE.set(decorator);
+ lastInstalled = new AtomicReference<>(decorator);
+ return decorator;
+ }
+ }
+
+ @SuppressWarnings("checkstyle:useNetworkContext")
+ private static class RootDecorator implements Decorator {
+
+ private static final RootDecorator INSTANCE = new RootDecorator();
+
+ private RootDecorator() {
+ }
+
+ @Override
+ public SelectorProvider provider() {
+ return SelectorProvider.provider();
+ }
+
+ @Override
+ public SocketFactory factory() {
+ return SocketFactory.getDefault();
+ }
+
+ @Override
+ public ServerSocketFactory serverFactory() {
+ return ServerSocketFactory.getDefault();
+ }
+ }
+
+ public static class SelectorProviderDecorator extends SelectorProvider {
+
+ private final Supplier inner;
+
+ public SelectorProviderDecorator(Supplier inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public DatagramChannel openDatagramChannel() throws IOException {
+ return inner.get().openDatagramChannel();
+ }
+
+ @Override
+ public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
+ return inner.get().openDatagramChannel(family);
+ }
+
+ @Override
+ public Pipe openPipe() throws IOException {
+ return inner.get().openPipe();
+ }
+
+ @Override
+ public AbstractSelector openSelector() throws IOException {
+ return inner.get().openSelector();
+ }
+
+ @Override
+ public ServerSocketChannel openServerSocketChannel() throws IOException {
+ return inner.get().openServerSocketChannel();
+ }
+
+ @Override
+ public SocketChannel openSocketChannel() throws IOException {
+ return inner.get().openSocketChannel();
+ }
+
+ @Override
+ public Channel inheritedChannel() throws IOException {
+ return inner.get().inheritedChannel();
+ }
+ }
+
+ public static class SocketFactoryDecorator extends SocketFactory {
+
+ private final Supplier inner;
+
+ public SocketFactoryDecorator(Supplier inner) {
+ this.inner = inner;
+ }
+
+ protected Socket intercept(Socket socket) {
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ return intercept(inner.get().createSocket());
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+ return intercept(inner.get().createSocket(host, port));
+ }
+
+ @Override
+ public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException, UnknownHostException {
+ return intercept(inner.get().createSocket(host, port, localHost, localPort));
+ }
+
+ @Override
+ public Socket createSocket(InetAddress host, int port) throws IOException {
+ return intercept(inner.get().createSocket(host, port));
+ }
+
+ @Override
+ public Socket createSocket(InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException {
+ return intercept(inner.get().createSocket(address, port, localAddress, localPort));
+ }
+ }
+
+ public static class ServerSocketFactoryDecorator extends ServerSocketFactory {
+
+ private final Supplier inner;
+
+ public ServerSocketFactoryDecorator(Supplier inner) {
+ this.inner = inner;
+ }
+
+ protected ServerSocket intercept(ServerSocket serverSocket) {
+ return serverSocket;
+ }
+
+ @Override
+ public ServerSocket createServerSocket() throws IOException {
+ return intercept(inner.get().createServerSocket());
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port) throws IOException {
+ return intercept(inner.get().createServerSocket(port));
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+ return intercept(inner.get().createServerSocket(port, backlog));
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
+ return intercept(inner.get().createServerSocket(port, backlog, ifAddress));
+ }
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LeakTester.java b/clients/src/test/java/org/apache/kafka/common/utils/LeakTester.java
new file mode 100644
index 0000000000000..2d43b0c6789d0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/LeakTester.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.opentest4j.AssertionFailedError;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Interface for leak-checkers which evaluates an execution interval and throws an exception if resources were leaked.
+ * A test interval is the time between {@link LeakTester#start()} and {@link LeakTest#close()}. A leak-checker
+ * should consider a resource leaked if it is opened during the interval, but not closed during that interval.
+ * Implementations of this interface should be thread-safe.
+ *
+ *
For example, the following situation would :
+ *
+ * {@code
+ * LeakTester tester = ; // some implementation
+ * AutoCloseable resource;
+ * try (LeakTest test : tester.start()) {
+ * resource = () -> {};
+ * tester.register(resource); // pseudocode
+ * resource.close(); // If this is missing, a leak will be detected.
+ * } catch (AssertionFailedError e) {
+ * // We leaked a resource!
+ * }
+ * }
+ *
+ * Implementations of LeakTester will have different ways of registering resources for tracking. For example,
+ * A LeakTester that is also a Factory could automatically register all resources created by the factory method(s).
+ * A LeakTester could also maintain static methods which allows registration from deeper in the call stack.
+ * Implementations of LeakTester will also have different ways of testing if resources are closed, depending on the
+ * specifics of the resource they are tracking.
+ */
+@FunctionalInterface
+public interface LeakTester {
+
+ /**
+ * Start a leak testing interval
+ * @return A {@link LeakTest} object which when closed, defines the end of the interval and checks for leaks.
+ */
+ LeakTest start();
+
+ /**
+ * A leak test that has been started
+ */
+ @FunctionalInterface
+ interface LeakTest extends AutoCloseable {
+ /**
+ * Stop the leak test
+ * @throws AssertionFailedError if a resource was opened during the interval, but not closed.
+ */
+ void close() throws AssertionFailedError;
+ }
+
+ /**
+ * Combine two or more LeakTester objects into a single test. This has the effect of running multiple LeakTesters
+ * concurrently. If one or more of the testers discovers a leak, their exceptions are suppressed and a new exception
+ * is thrown to summarize all failures.
+ * @param testers A group of LeakTester instances which should be run concurrently
+ * @return A combined leak test which runs tests for the passed-in testers concurrently, non-null.
+ */
+ static LeakTester combine(LeakTester... testers) {
+ return () -> {
+ List tests = Arrays.stream(testers).map(LeakTester::start).collect(Collectors.toList());
+ return () -> {
+ AssertionFailedError summary = null;
+ for (LeakTest test : tests) {
+ try {
+ test.close();
+ } catch (AssertionFailedError e) {
+ if (summary == null) {
+ summary = new AssertionFailedError("Leak check failed");
+ }
+ summary.addSuppressed(e);
+ }
+ }
+ if (summary != null) {
+ throw summary;
+ }
+ };
+ };
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/PredicateLeakTester.java b/clients/src/test/java/org/apache/kafka/common/utils/PredicateLeakTester.java
new file mode 100644
index 0000000000000..0d6d8aa9316ed
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/PredicateLeakTester.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.opentest4j.AssertionFailedError;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+
+/**
+ * LeakTester for objects which have some state (open vs closed) that can be examined by a {@link Predicate}.
+ * An object is considered leaked if it is passed to {@link #open(Object)}, but at end of the interval the predicate
+ * returns true. This tester is only compatible with resources which are never re-opened, and resources which are not
+ * closed remain strongly-referenced by this class and may cause memory leaks.
+ * @param The type of objects being tracked
+ */
+public class PredicateLeakTester implements LeakTester {
+
+ private final ConcurrentMap refs = new ConcurrentHashMap<>();
+ private final Predicate isOpen;
+ private final Class clazz;
+
+ /**
+ * Create a new leak tester based on predicate evaluation.
+ * @param isOpen A predicate that returns true if a resource is open, and false otherwise.
+ * @param clazz The superclass of objects tracked by this tester
+ */
+ public PredicateLeakTester(Predicate isOpen, Class clazz) {
+ this.isOpen = Objects.requireNonNull(isOpen, "predicate must be non-null");
+ this.clazz = Objects.requireNonNull(clazz, "class must be non-null");
+ }
+
+ /**
+ * Register a resource to be tracked
+ * This method captures a stacktrace when a resource is registered, so this method should be called near to
+ * where the resource is opened or created. This is included in the stack trace thrown from failed leak assertions.
+ * @param obj The resource being tracked, non-null
+ * @return The passed-in object, for method-chaining
+ */
+ public T open(T obj) {
+ Objects.requireNonNull(obj, "resource must be non-null");
+ try {
+ throw new Exception("Opened " + obj.getClass().getName());
+ } catch (Exception e) {
+ refs.put(obj, e);
+ }
+ return obj;
+ }
+
+ private Set live() {
+ Set ret = new HashSet<>();
+ Iterator> iterator = refs.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ if (isOpen.test(entry.getKey())) {
+ ret.add(entry.getValue());
+ } else {
+ iterator.remove();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Start a leak test
+ * @return An ongoing leak test
+ */
+ public LeakTest start() {
+ Set before = live();
+ return () -> {
+ Set after = live();
+ after.removeAll(before);
+ if (!after.isEmpty()) {
+ AssertionFailedError e = new AssertionFailedError(clazz.getSimpleName() + " instances left open");
+ for (Exception leakedSocket : after) {
+ e.addSuppressed(leakedSocket);
+ }
+ throw e;
+ }
+ };
+ }
+}
diff --git a/clients/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/clients/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000000000..cadc2a857d384
--- /dev/null
+++ b/clients/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.common.network.LeakTestingExtension$All
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 845cffb07b0df..132d3a28da936 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.FileConfigProvider;
+import org.apache.kafka.common.network.NetworkContext;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Struct;
@@ -51,6 +52,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1469,7 +1471,7 @@ private void useFixedBrokerPort() throws IOException {
// on a different random free port the second time it is started. Note that we can only use the static port
// because we have a single broker setup in this test.
int listenerPort;
- try (ServerSocket s = new ServerSocket(0)) {
+ try (ServerSocket s = NetworkContext.serverFactory().createServerSocket(0)) {
listenerPort = s.getLocalPort();
}
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c9d6a86c54987..041552da2732a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -20,7 +20,7 @@ package kafka.network
import java.io.IOException
import java.net._
import java.nio.ByteBuffer
-import java.nio.channels.{Selector => NSelector, _}
+import java.nio.channels._
import java.util
import java.util.Optional
import java.util.concurrent._
@@ -38,7 +38,7 @@ import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
-import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, NetworkSend, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, NetworkContext, NetworkSend, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -590,7 +590,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val recvBufferSize = config.socketReceiveBufferBytes
private val listenBacklogSize = config.socketListenBacklogSize
- private val nioSelector = NSelector.open()
+ private val nioSelector = NetworkContext.provider().openSelector()
// If the port is configured as 0, we are using a wildcard port, so we need to open the socket
// before we can find out what port we have. If it is set to a nonzero value, defer opening
@@ -723,7 +723,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
- val serverChannel = ServerSocketChannel.open()
+ val serverChannel = NetworkContext.provider().openServerSocketChannel()
try {
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index bf5f8c1e6a3ec..32907a88b5163 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -14,10 +14,10 @@
package kafka.api
import java.io.IOException
-import java.net.ServerSocket
import java.util.Properties
import kafka.utils.TestUtils
+import org.apache.kafka.common.network.NetworkContext
/**
* DO NOT USE THESE UTILITIES UNLESS YOU ABSOLUTELY MUST
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
object FixedPortTestUtils {
def choosePorts(count: Int): Seq[Int] = {
try {
- val sockets = (0 until count).map(_ => new ServerSocket(0))
+ val sockets = (0 until count).map(_ => NetworkContext.serverFactory().createServerSocket(0))
val ports = sockets.map(_.getLocalPort())
sockets.foreach(_.close())
ports
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 8efa1cd15ab5e..032b076fe68df 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasResult}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.message.ProduceRequestData
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, NetworkContext}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
@@ -321,7 +321,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
def connect(listener: String): Socket = {
val listenerName = ListenerName.normalised(listener)
- new Socket("localhost", brokers.head.socketServer.boundPort(listenerName))
+ NetworkContext.factory.createSocket("localhost", brokers.head.socketServer.boundPort(listenerName))
}
private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 0623a16b38829..a245c53298cc9 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -26,7 +26,7 @@ import kafka.network.SocketServer
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
+import org.apache.kafka.common.network.{ListenerName, ConnectionMode, NetworkContext}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -135,7 +135,7 @@ object IntegrationTestUtils {
def connect(socketServer: SocketServer,
listenerName: ListenerName): Socket = {
- new Socket("localhost", socketServer.boundPort(listenerName))
+ NetworkContext.factory.createSocket("localhost", socketServer.boundPort(listenerName))
}
def clientSecurityProps(certAlias: String): Properties = {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d853a4e212d73..5826e69a69cd3 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -176,7 +176,7 @@ class SocketServerTest {
s"$listenerName", e)
}
val socket = try {
- new Socket("localhost", boundPort, localAddr, port)
+ NetworkContext.factory.createSocket("localhost", boundPort, localAddr, port)
} catch {
case e: Throwable => throw new RuntimeException(s"Unable to connect to remote port $boundPort " +
s"with local port $port on listener $listenerName", e)
@@ -2486,9 +2486,9 @@ class SocketServerTest {
* channel's `netReadBuffer` to simulate scenarios with SSL buffered data.
*/
private class ProxyServer(socketServer: SocketServer) {
- private val serverSocket = new ServerSocket(0)
+ private val serverSocket = NetworkContext.serverFactory.createServerSocket(0)
val localPort = serverSocket.getLocalPort
- val serverConnSocket = new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL)))
+ val serverConnSocket = NetworkContext.factory.createSocket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL)))
private val executor = Executors.newFixedThreadPool(2)
@volatile var clientConnSocket: Socket = _
@volatile private var buffer: Option[ByteBuffer] = None
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 4c1494380ea60..db5e10ebc6254 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, NetworkContext}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.utils.Utils
@@ -98,7 +98,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
def connect(socketServer: SocketServer = anySocketServer,
listenerName: ListenerName = listenerName): Socket = {
- new Socket("localhost", socketServer.boundPort(listenerName))
+ NetworkContext.factory.createSocket("localhost", socketServer.boundPort(listenerName))
}
private def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index c619acde664f4..39bd53ebab791 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -26,7 +26,7 @@ import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.message.ProduceRequestData
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, NetworkContext}
import org.apache.kafka.common.protocol.types.Type
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
@@ -52,7 +52,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
private def socketServer = brokers.head.socketServer
private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
- new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
+ NetworkContext.factory.createSocket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
}
private def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index 7f0c4ab503e52..99f171980ef18 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -17,7 +17,7 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.message.ListGroupsRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, NetworkContext}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ListGroupsRequest, ListGroupsResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import java.net.Socket
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Properties}
@@ -67,7 +66,7 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
@ValueSource(strings = Array("zk", "kraft"))
def testBothReportersAreInvoked(quorum: String): Unit = {
val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
- val socket = new Socket("localhost", port)
+ val socket = NetworkContext.factory.createSocket("localhost", port)
socket.setSoTimeout(10000)
try {
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 802b206e14d8e..72847e0850611 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, NetworkContext}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -268,7 +268,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
try {
// Set up a server to accept a connection and receive one byte from the first request. No response is sent.
- serverSocket = new ServerSocket(0)
+ serverSocket = NetworkContext.serverFactory.createServerSocket(0)
val receiveFuture = executor.submit(new Runnable {
override def run(): Unit = {
val socket = serverSocket.accept()
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
index 4a61c17287706..cf06618f00a04 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -17,8 +17,10 @@
package kafka.zk
+import org.apache.kafka.common.network.NetworkContext
+
import java.io.IOException
-import java.net.{SocketTimeoutException, Socket, InetAddress, InetSocketAddress}
+import java.net.{InetAddress, InetSocketAddress, SocketTimeoutException}
/**
* ZooKeeper responds to a small set of commands. Each command is composed of four letters. You issue the commands to
@@ -32,7 +34,7 @@ object ZkFourLetterWords {
val hostAddress =
if (host != null) new InetSocketAddress(host, port)
else new InetSocketAddress(InetAddress.getByName(null), port)
- val sock = new Socket()
+ val sock = NetworkContext.factory.createSocket()
try {
sock.connect(hostAddress, timeout)
val outStream = sock.getOutputStream
diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java
index 114df5329ba56..0632cadf6ffe2 100644
--- a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.common.network.NetworkContext;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
@@ -353,7 +354,7 @@ public void unknownObjectName() {
}
private static int findRandomOpenPortOnAllLocalInterfaces() throws Exception {
- try (ServerSocket socket = new ServerSocket(0)) {
+ try (ServerSocket socket = NetworkContext.serverFactory().createServerSocket(0)) {
return socket.getLocalPort();
}
}