Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ subprojects {
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()
systemProperty("junit.jupiter.extensions.autodetection.enabled", true)

exclude testsToExclude

Expand Down Expand Up @@ -507,6 +508,7 @@ subprojects {
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()
systemProperty("junit.jupiter.extensions.autodetection.enabled", true)

exclude testsToExclude

Expand Down Expand Up @@ -535,6 +537,7 @@ subprojects {
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()
systemProperty("junit.jupiter.extensions.autodetection.enabled", true)

exclude testsToExclude

Expand Down
8 changes: 8 additions & 0 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@
<property name="message" value="'System.exit': Should not directly call System.exit, but Exit.exit instead."/>
</module>

<module name="Regexp">
<property name="id" value="useNetworkContext"/>
<property name="format" value="(SelectorProvider\.provider\(|SocketFactory\.getDefault\(|DatagramChannel\.open\(|Pipe\.open\(|Selector\.open\(|SocketChannel\.open\(|System\.inheritedChannel\(|new ServerSocket\(|new Socket\()"/>
<property name="illegalPattern" value="true"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="Use NetworkContext static methods to access network resources"/>
</module>

<!-- code quality -->
<module name="MethodLength">
<property name="max" value="170" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="javax.net" />
<allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

import java.nio.channels.spi.SelectorProvider;
import java.util.ConcurrentModificationException;
import java.util.Objects;
import java.util.concurrent.Semaphore;

import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;

/**
* Store and distribute static factories for {@link java.net} sockets and {@link java.nio.channels} channel instances.
* <p>By default, this class is a no-op, and distributes the default factories provided by the system.
* <p>In testing environments, {@link #install(SelectorProvider, SocketFactory, ServerSocketFactory)} can be used to
* replace the default factories with custom factories for performing assertions.
*/
@SuppressWarnings("checkstyle:useNetworkContext")
public class NetworkContext {

private static final Semaphore LOCK = new Semaphore(1);
private static volatile SelectorProvider selectorProvider = SelectorProvider.provider();
private static volatile SocketFactory socketFactory = SocketFactory.getDefault();
private static volatile ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();

/**
* Get the current provider for channels, which may change at any time.
* Use the result as soon as possible to avoid using a stale provider.
* @return The {@link SelectorProvider} factory for {@link java.nio.channels} implementations, non-null
*/
public static SelectorProvider provider() {
return selectorProvider;
}

/**
* Get the current factory for client sockets, which may change at any time.
* Use the result as soon as possible to avoid using a stale factory.
* @return The {@link SocketFactory} factory for {@link java.net.Socket} implementations, non-null
*/
public static SocketFactory factory() {
return socketFactory;
}

/**
* Get the current factory for server sockets, which may change at any time.
* Use the result as soon as possible to avoid using a stale factory.
* @return The {@link ServerSocketFactory} factory for {@link java.net.ServerSocket} implementations, non-null
*/
public static ServerSocketFactory serverFactory() {
return serverSocketFactory;
}

/**
* Permanently install alternative factories for network resources. Can only be called once, and subsequent calls
* will fail with {@link ConcurrentModificationException}
* <p>This is meant for use only in tests. Installation is non-atomic, so network resources may be created with the
* default factories after installation.
* @param newSelectorProvider A provider for NIO selectors and sockets, non-null
* @param newSocketFactory A provider for client-side TCP sockets, non-null
* @param newServerSocketFactory A provider for server-side TCP sockets, non-null
* @throws ConcurrentModificationException if called more than once.
* @throws NullPointerException if any argument is null.
*/
protected static void install(
SelectorProvider newSelectorProvider,
SocketFactory newSocketFactory,
ServerSocketFactory newServerSocketFactory
) {
Objects.requireNonNull(newSelectorProvider, "SelectorProvider must be non-null");
Objects.requireNonNull(newSocketFactory, "SocketFactory must be non-null");
Objects.requireNonNull(newServerSocketFactory, "ServerSocketFactory must be non-null");
if (!LOCK.tryAcquire()) {
throw new ConcurrentModificationException("The network context is already in-use");
}
selectorProvider = newSelectorProvider;
socketFactory = newSocketFactory;
serverSocketFactory = newServerSocketFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Selector(int maxReceiveSize,
MemoryPool memoryPool,
LogContext logContext) {
try {
this.nioSelector = java.nio.channels.Selector.open();
this.nioSelector = NetworkContext.provider().openSelector();
} catch (IOException e) {
throw new KafkaException(e);
}
Expand Down Expand Up @@ -264,7 +264,7 @@ public static String generateConnectionId(Socket socket, int connectionIndex) {
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
SocketChannel socketChannel = NetworkContext.provider().openSocketChannel();
SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public EchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs) thr
this.serverSocket.setSoTimeout(SO_TIMEOUT_MS);
break;
case PLAINTEXT:
this.serverSocket = new ServerSocket(0);
this.serverSocket = NetworkContext.serverFactory().createServerSocket(0);
this.sslFactory = null;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

import org.apache.kafka.common.utils.LeakTester;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.opentest4j.AssertionFailedError;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

public abstract class LeakTestingExtension {

/**
* Ignore leaks from a suite entirely.
* <p>Use when there is a real resource leak that should be addressed as soon as possible.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface IgnoreAll {
String jira();
}

private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(LeakTestingExtension.class);
private static final String ROOT_TESTER_INSTANCE = "leak-tester";
private static final String PER_TEST_INSTANCE = "leak-test";

protected boolean ignored(ExtensionContext extensionContext) {
return extensionContext.getTestClass()
.map(clazz -> clazz.getAnnotationsByType(IgnoreAll.class))
.map(arr -> arr.length > 0)
.orElse(false);
}

protected abstract String message();

private NetworkContextLeakTester tester(ExtensionContext extensionContext) {
// Use the root namespace which lives across multiple test suites.
ExtensionContext.Store store = extensionContext.getRoot().getStore(NAMESPACE);
return store.getOrComputeIfAbsent(ROOT_TESTER_INSTANCE, ignored -> new NetworkContextLeakTester(), NetworkContextLeakTester.class);
}

protected void before(ExtensionContext extensionContext) {
if (ignored(extensionContext)) {
return;
}
extensionContext.getStore(NAMESPACE).put(PER_TEST_INSTANCE, tester(extensionContext).start());
}

protected void after(ExtensionContext extensionContext) throws AssertionFailedError {
if (ignored(extensionContext)) {
return;
}
try {
ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
store.getOrDefault(PER_TEST_INSTANCE, LeakTester.LeakTest.class, () -> {}).close();
} catch (AssertionFailedError e) {
throw new AssertionFailedError(message(), e);
}
}

/**
* This class applies a coarse leak test for a whole class at a time.
* This is automatically loaded for all classes, but can be disabled with {@link IgnoreAll}
* See {@link org.junit.jupiter.api.extension.Extension} ServiceLoader manifest.
*/
public static class All extends LeakTestingExtension implements BeforeAllCallback, AfterAllCallback {

protected String message() {
return String.format(
"This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @%s.%s(\"KAFKA-XYZ\")",
LeakTestingExtension.class.getSimpleName(), IgnoreAll.class.getSimpleName());
}

@Override
public void beforeAll(ExtensionContext extensionContext) throws AssertionFailedError {
before(extensionContext);
}

@Override
public void afterAll(ExtensionContext extensionContext) throws AssertionFailedError {
after(extensionContext);
}
}

/**
* This class applies a fine leak test for individual tests.
* This can be opted-in by including it in an {@link org.junit.jupiter.api.extension.ExtendWith} suite annotation.
*/
public static class Each extends LeakTestingExtension implements BeforeEachCallback, AfterEachCallback {

protected String message() {
return String.format(
"This test contains a resource leak. Close the resources, or if the class-level asserts passed, remove @%s(%s.%s.class)",
ExtendWith.class.getSimpleName(), LeakTestingExtension.class.getSimpleName(), Each.class.getSimpleName());
}

@Override
public void beforeEach(ExtensionContext extensionContext) throws AssertionFailedError {
before(extensionContext);
}

@Override
public void afterEach(ExtensionContext extensionContext) throws AssertionFailedError {
after(extensionContext);
}
}
}
Loading