diff --git a/docs/configuration.md b/docs/configuration.md
index c16bec9d7d..b9729ad924 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -53,6 +53,7 @@ This section lists configurations that may affect the performance.
| ----------------- | ------------------------------------------------------------ | ------- |
| maxQueuedRequests | Limit the queue size for request, like `queued.max.requests` in Kafka server. | 500 |
| requestTimeoutMs | Limit the timeout in milliseconds for request, like `request.timeout.ms` in Kafka client.
If a request was not processed in the timeout, KoP would return an error response to client. | 30000 |
+| failedAuthenticationDelayMs | Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure, like `connection.failed.authentication.delay.ms` in Kafka server. | 300 |
> **NOTE**
>
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java
index ab86d9ea2c..4d43d3fb45 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java
@@ -92,9 +92,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
- new KafkaRequestHandler(pulsarService, kafkaConfig,
- groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache,
- enableTls, advertisedEndPoint, statsLogger));
+ new KafkaRequestHandler(pulsarService, kafkaConfig,
+ groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache,
+ enableTls, advertisedEndPoint, statsLogger));
}
}
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
index ffe1449557..592e7aadc7 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
@@ -184,12 +184,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// execute channelPrepare to complete authentication
if (isActive.get() && !channelReady()) {
try {
-
channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency);
return;
} catch (AuthenticationException e) {
- log.error("unexpected error in authenticate:", e);
- close();
+ log.error("Failed authentication with [{}] ({})", this.remoteAddress, e.getMessage());
+ maybeDelayCloseOnAuthenticationFailure();
return;
} finally {
buffer.release();
@@ -445,6 +444,10 @@ protected abstract void channelPrepare(ChannelHandlerContext ctx,
BiConsumer registerRequestLatency)
throws AuthenticationException;
+ protected abstract void maybeDelayCloseOnAuthenticationFailure();
+
+ protected abstract void completeCloseOnAuthenticationFailure();
+
protected abstract void
handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response);
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java
index 2fe8eeffc3..6c27b4a048 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java
@@ -376,17 +376,16 @@ public Map> newChannelIniti
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, false,
- advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
+ advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, true,
- advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
+ advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
break;
}
});
-
return builder.build();
} catch (Exception e){
log.error("KafkaProtocolHandler newChannelInitializers failed with ", e);
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
index abec8db920..999eda14b2 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
@@ -206,6 +206,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final String advertisedListeners;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
+ private final int failedAuthenticationDelayMs;
private final String offsetsTopicName;
private final String txnTopicName;
private final Set allowedNamespaces;
@@ -286,6 +287,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
this.resumeThresholdPendingBytes = this.maxPendingBytes / 2;
+ this.failedAuthenticationDelayMs = kafkaConfig.getFailedAuthenticationDelayMs();
// update alive channel count stats
RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
@@ -310,7 +312,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// update active channel count stats
RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
- log.info("channel inactive {}", ctx.channel());
close();
}
@@ -349,6 +350,33 @@ protected void channelPrepare(ChannelHandlerContext ctx,
}
}
+ @Override
+ protected void maybeDelayCloseOnAuthenticationFailure() {
+ if (this.failedAuthenticationDelayMs > 0) {
+ this.ctx.executor().schedule(
+ this::handleCloseOnAuthenticationFailure,
+ this.failedAuthenticationDelayMs,
+ TimeUnit.MILLISECONDS);
+ } else {
+ handleCloseOnAuthenticationFailure();
+ }
+ }
+
+ private void handleCloseOnAuthenticationFailure() {
+ try {
+ this.completeCloseOnAuthenticationFailure();
+ } finally {
+ this.close();
+ }
+ }
+
+ @Override
+ protected void completeCloseOnAuthenticationFailure() {
+ if (isActive.get() && authenticator != null) {
+ authenticator.sendAuthenticationFailureResponse();
+ }
+ }
+
protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest,
CompletableFuture resultFuture) {
if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersionRequest.getHeader().apiVersion())) {
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
index 8f63bca196..932d709d1c 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
@@ -200,6 +200,14 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private int requestTimeoutMs = 30000;
+ @FieldContext(
+ category = CATEGORY_KOP,
+ doc = "Connection close delay on failed authentication: "
+ + "this is the time (in milliseconds) by which connection close "
+ + "will be delayed on authentication failure. "
+ )
+ private int failedAuthenticationDelayMs = 300;
+
// Kafka SSL configs
@FieldContext(
category = CATEGORY_KOP_SSL,
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java
index 31a6991ca7..93f174dc7a 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java
@@ -79,7 +79,8 @@ public class SaslAuthenticator {
private SaslServer saslServer;
private Session session;
private boolean enableKafkaSaslAuthenticateHeaders;
-
+ private ByteBuf authenticationFailureResponse = null;
+ private ChannelHandlerContext ctx = null;
private enum State {
HANDSHAKE_OR_VERSIONS_REQUEST,
@@ -108,6 +109,28 @@ public UnsupportedSaslMechanismException(String mechanism) {
}
}
+ /**
+ * Build a {@link ByteBuf} response on authenticate failure. The actual response is sent out when
+ * {@link #sendAuthenticationFailureResponse()} is called.
+ */
+ private void buildResponseOnAuthenticateFailure(RequestHeader header,
+ AbstractRequest request,
+ AbstractResponse abstractResponse,
+ Exception e) {
+ this.authenticationFailureResponse = buildKafkaResponse(header, request, abstractResponse, e);
+ }
+
+ /**
+ * Send any authentication failure response that may have been previously built.
+ */
+ public void sendAuthenticationFailureResponse() {
+ if (authenticationFailureResponse == null) {
+ return;
+ }
+ this.sendKafkaResponse(authenticationFailureResponse);
+ authenticationFailureResponse = null;
+ }
+
private static void setCurrentAuthenticationService(AuthenticationService authenticationService) {
if (SaslAuthenticator.authenticationService == null) {
SaslAuthenticator.authenticationService = authenticationService;
@@ -154,6 +177,8 @@ public void authenticate(ChannelHandlerContext ctx,
throws AuthenticationException {
checkArgument(requestBuf.readableBytes() > 0);
log.info("Authenticate {} {} {}", ctx, saslServer, state);
+
+ this.ctx = ctx;
if (saslServer != null && saslServer.isComplete()) {
setState(State.COMPLETE);
return;
@@ -307,11 +332,7 @@ private void handleKafkaRequest(ChannelHandlerContext ctx,
try {
createSaslServer(clientMechanism);
} catch (AuthenticationException e) {
- sendKafkaResponse(ctx,
- header,
- body,
- null,
- e);
+ this.authenticationFailureResponse = buildKafkaResponse(header, body, null, e);
throw e;
}
@@ -325,6 +346,22 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx,
AbstractRequest request,
AbstractResponse abstractResponse,
Exception e) {
+ ByteBuf response = buildKafkaResponse(header, request, abstractResponse, e);
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.channel().writeAndFlush(response);
+ });
+ }
+
+ private void sendKafkaResponse(ByteBuf response) {
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.channel().writeAndFlush(response);
+ });
+ }
+
+ private static ByteBuf buildKafkaResponse(RequestHeader header,
+ AbstractRequest request,
+ AbstractResponse abstractResponse,
+ Exception e) {
short version = header.apiVersion();
ApiKeys apiKey = header.apiKey();
AbstractResponse backResponse;
@@ -339,14 +376,11 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx,
&& !ApiKeys.API_VERSIONS.isVersionSupported(version)){
version = ApiKeys.API_VERSIONS.oldestVersion();
}
- ByteBuf response = ResponseUtils.serializeResponse(
+ return ResponseUtils.serializeResponse(
version,
header.toResponseHeader(),
backResponse
);
- ctx.channel().eventLoop().execute(() -> {
- ctx.channel().writeAndFlush(response);
- });
}
@VisibleForTesting
@@ -415,11 +449,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
AuthenticationException e = new AuthenticationException(
"Unexpected Kafka request of type " + apiKey + " during SASL authentication");
registerRequestLatency.accept(apiKey.name, startProcessTime);
- sendKafkaResponse(ctx,
- header,
- request,
- null,
- e);
+ buildResponseOnAuthenticateFailure(header, request, null, e);
throw e;
}
if (!apiKey.isVersionSupported(version)) {
@@ -448,11 +478,9 @@ private void handleSaslToken(ChannelHandlerContext ctx,
}
} catch (SaslException e) {
registerRequestLatency.accept(apiKey.name, startProcessTime);
- sendKafkaResponse(ctx,
- header,
- request,
- new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()),
- null);
+ buildResponseOnAuthenticateFailure(header, request,
+ new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), null);
+ sendAuthenticationFailureResponse();
if (log.isDebugEnabled()) {
log.debug("Authenticate failed for client, header {}, request {}, reason {}",
header, saslAuthenticateRequest, e.getMessage());
@@ -475,9 +503,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx,
}
if (request.hasUnsupportedRequestVersion()) {
registerRequestLatency.accept(header.apiKey().name, startProcessTime);
- sendKafkaResponse(ctx,
- header,
- request,
+ sendKafkaResponse(ctx, header, request,
request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()),
null);
} else {
@@ -532,9 +558,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx,
log.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
}
registerRequestLatency.accept(header.apiKey().name, startProcessTime);
- sendKafkaResponse(ctx,
- header,
- request,
+ buildResponseOnAuthenticateFailure(header, request,
new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, allowedMechanisms),
null);
throw new UnsupportedSaslMechanismException(mechanism);
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java
index 3c02bff57c..23f0424178 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java
@@ -56,8 +56,8 @@ public boolean isShutdownComplete() {
}
public synchronized boolean initiateShutdown() {
- if (isRunning()) {
- log.info("{} Shutting down", logIdent);
+ if (isRunning() && log.isDebugEnabled()) {
+ log.debug("{} Shutting down", logIdent);
}
shutdownInitiated.countDown();
if (isInterruptible) {
@@ -73,7 +73,9 @@ public synchronized boolean initiateShutdown() {
*/
public void awaitShutdown() throws InterruptedException {
shutdownComplete.await();
- log.info("{} Shutdown completed", logIdent);
+ if (log.isDebugEnabled()) {
+ log.debug("{} Shutdown completed", logIdent);
+ }
}
/**
@@ -98,7 +100,9 @@ public void pause(long timeout, TimeUnit unit) throws InterruptedException {
@Override
public void run() {
- log.info("{} Starting", logIdent);
+ if (log.isDebugEnabled()) {
+ log.debug("{} Starting", logIdent);
+ }
try {
while (isRunning()) {
doWork();
@@ -106,7 +110,9 @@ public void run() {
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
- log.info("{} Stopped", logIdent);
+ if (log.isDebugEnabled()) {
+ log.debug("{} Stopped", logIdent);
+ }
Exit.exit(e.statusCode());
} catch (Throwable cause) {
if (isRunning()) {
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java
new file mode 100644
index 0000000000..7759d9ddad
--- /dev/null
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.kop;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.streamnative.kafka.client.api.KafkaVersion;
+import io.streamnative.kafka.client.api.ProducerConfiguration;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Properties;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.KafkaChannel;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test delay close handler when authorization failed.
+ */
+@Slf4j
+public class DelayAuthorizationFailedCloseTest extends KopProtocolHandlerTestBase {
+
+ private static final String TENANT = "DelayAuthorizationFailedCloseTest";
+ private static final String NAMESPACE = "ns1";
+
+ private static final int FAILED_AUTHENTICATION_DELAY_MS = 300;
+ private static final String ADMIN_USER = "admin_user";
+ protected static final int BUFFER_SIZE = 4 * 1024;
+ private static final long DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 1000;
+
+ private Selector selector;
+ private Time time;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+ ServiceConfiguration authConf = new ServiceConfiguration();
+ authConf.setProperties(properties);
+ provider.initialize(authConf);
+
+ String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty());
+
+ super.resetConfig();
+ conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN"));
+ conf.setKafkaMetadataTenant("internal");
+ conf.setKafkaMetadataNamespace("__kafka");
+ conf.setKafkaTenant(TENANT);
+ conf.setKafkaNamespace(NAMESPACE);
+
+ conf.setClusterName(super.configClusterName);
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationAllowWildcardsMatching(true);
+ conf.setSuperUserRoles(Sets.newHashSet(ADMIN_USER));
+ conf.setAuthenticationProviders(
+ Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + adminToken);
+ conf.setProperties(properties);
+ conf.setFailedAuthenticationDelayMs(FAILED_AUTHENTICATION_DELAY_MS);
+
+ super.internalSetup();
+ log.info("success internal setup");
+
+ if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/__kafka")) {
+ admin.namespaces().createNamespace(TENANT + "/__kafka");
+ admin.namespaces().setNamespaceReplicationClusters(TENANT + "/__kafka", Sets.newHashSet("test"));
+ admin.namespaces().setRetention(TENANT + "/__kafka",
+ new RetentionPolicies(-1, -1));
+ }
+ log.info("created namespaces, init handler");
+
+ time = Time.SYSTEM;
+ Metrics metrics = new Metrics(time);
+ ProducerConfiguration producerConfiguration = producerConfiguration();
+ ChannelBuilder channelBuilder =
+ ClientUtils.createChannelBuilder(new ProducerConfig(producerConfiguration.toProperties()));
+ String clientId = "clientId";
+ selector = new Selector(
+ DEFAULT_CONNECTION_MAX_IDLE_MS,
+ metrics,
+ time,
+ "test-selector",
+ channelBuilder,
+ new LogContext(String.format("[Test Selector clientId=%s] ", clientId)));
+ }
+
+ @Override
+ protected void createAdmin() throws Exception {
+ super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(this.conf.getBrokerClientAuthenticationPlugin(),
+ this.conf.getBrokerClientAuthenticationParameters()).build());
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 20000)
+ void testClientConnectionClose() throws IOException {
+ String id = "0";
+ blockingConnect(id);
+ KafkaChannel channel = selector.channel(id);
+ assertTrue(channel.isConnected());
+
+ log.info("Channel is connected: [{}]", channel.isConnected());
+
+ long startTimeMs = time.milliseconds();
+
+ // Send Metadata request
+ MetadataRequest.Builder builder = MetadataRequest.Builder.allTopics();
+ AbstractRequest request = builder.build();
+ selector.send(request.toSend(id,
+ new RequestHeader(builder.apiKey(), request.version(), "fake_client_id", 0)));
+
+ Awaitility.await().until(() -> {
+ poll(selector);
+ return !selector.channels().isEmpty();
+ });
+
+ // Wait until handler close.
+ Awaitility.await().until(() -> {
+ poll(selector);
+ return selector.channels().isEmpty();
+ });
+
+ assertTrue(time.milliseconds() >= startTimeMs + FAILED_AUTHENTICATION_DELAY_MS);
+ }
+
+
+ protected ProducerConfiguration producerConfiguration() {
+ return ProducerConfiguration.builder()
+ .bootstrapServers("localhost:" + getKafkaBrokerPort())
+ .keySerializer(KafkaVersion.DEFAULT.getStringSerializer())
+ .valueSerializer(KafkaVersion.DEFAULT.getStringSerializer())
+ .build();
+ }
+
+ // connect and wait for the connection to complete
+ private void blockingConnect(String node) throws IOException {
+ blockingConnect(node, new InetSocketAddress("localhost", getKafkaBrokerPort()));
+ }
+
+ protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
+ selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
+ while (!selector.connected().contains(node)) {
+ selector.poll(10000L);
+ }
+ while (!selector.isChannelReady(node)) {
+ selector.poll(10000L);
+ }
+ }
+
+ private void poll(Selector selector) {
+ try {
+ selector.poll(50);
+ } catch (IOException e) {
+ Assert.fail("Caught unexpected exception " + e);
+ }
+ }
+}