From 8fb10fdf9614c53861162ebb7662f8c8d980b2a0 Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Tue, 7 Sep 2021 22:34:11 +0800 Subject: [PATCH] [FEATURE] Support delay close on authentication failed (#709) Fix #668 ## Motivation Performing authentication is one of the most expensive operation performed in the network thread, so much so that it could end up saturating them, preventing from getting any useful work from being done. ## Modifications * Support delay close on authentication failed. * Change logs level. --- docs/configuration.md | 1 + .../handlers/kop/KafkaChannelInitializer.java | 6 +- .../handlers/kop/KafkaCommandDecoder.java | 9 +- .../handlers/kop/KafkaProtocolHandler.java | 5 +- .../handlers/kop/KafkaRequestHandler.java | 30 ++- .../kop/KafkaServiceConfiguration.java | 8 + .../kop/security/SaslAuthenticator.java | 76 ++++--- .../kop/utils/ShutdownableThread.java | 16 +- .../DelayAuthorizationFailedCloseTest.java | 204 ++++++++++++++++++ 9 files changed, 314 insertions(+), 41 deletions(-) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java diff --git a/docs/configuration.md b/docs/configuration.md index c16bec9d7d..b9729ad924 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -53,6 +53,7 @@ This section lists configurations that may affect the performance. | ----------------- | ------------------------------------------------------------ | ------- | | maxQueuedRequests | Limit the queue size for request, like `queued.max.requests` in Kafka server. | 500 | | requestTimeoutMs | Limit the timeout in milliseconds for request, like `request.timeout.ms` in Kafka client.
If a request was not processed in the timeout, KoP would return an error response to client. | 30000 | +| failedAuthenticationDelayMs | Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure, like `connection.failed.authentication.delay.ms` in Kafka server. | 300 | > **NOTE** > diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java index ab86d9ea2c..4d43d3fb45 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java @@ -92,9 +92,9 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); ch.pipeline().addLast("handler", - new KafkaRequestHandler(pulsarService, kafkaConfig, - groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache, - enableTls, advertisedEndPoint, statsLogger)); + new KafkaRequestHandler(pulsarService, kafkaConfig, + groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache, + enableTls, advertisedEndPoint, statsLogger)); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index ffe1449557..592e7aadc7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -184,12 +184,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // execute channelPrepare to complete authentication if (isActive.get() && !channelReady()) { try { - channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency); return; } catch (AuthenticationException e) { - log.error("unexpected error in authenticate:", e); - close(); + log.error("Failed authentication with [{}] ({})", this.remoteAddress, e.getMessage()); + maybeDelayCloseOnAuthenticationFailure(); return; } finally { buffer.release(); @@ -445,6 +444,10 @@ protected abstract void channelPrepare(ChannelHandlerContext ctx, BiConsumer registerRequestLatency) throws AuthenticationException; + protected abstract void maybeDelayCloseOnAuthenticationFailure(); + + protected abstract void completeCloseOnAuthenticationFailure(); + protected abstract void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 2fe8eeffc3..6c27b4a048 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -376,17 +376,16 @@ public Map> newChannelIniti case SASL_PLAINTEXT: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, false, - advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache)); + advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache)); break; case SSL: case SASL_SSL: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, true, - advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache)); + advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache)); break; } }); - return builder.build(); } catch (Exception e){ log.error("KafkaProtocolHandler newChannelInitializers failed with ", e); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index abec8db920..999eda14b2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -206,6 +206,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final String advertisedListeners; private final int defaultNumPartitions; public final int maxReadEntriesNum; + private final int failedAuthenticationDelayMs; private final String offsetsTopicName; private final String txnTopicName; private final Set allowedNamespaces; @@ -286,6 +287,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath(); this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L; this.resumeThresholdPendingBytes = this.maxPendingBytes / 2; + this.failedAuthenticationDelayMs = kafkaConfig.getFailedAuthenticationDelayMs(); // update alive channel count stats RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet(); @@ -310,7 +312,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { // update active channel count stats RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet(); - log.info("channel inactive {}", ctx.channel()); close(); } @@ -349,6 +350,33 @@ protected void channelPrepare(ChannelHandlerContext ctx, } } + @Override + protected void maybeDelayCloseOnAuthenticationFailure() { + if (this.failedAuthenticationDelayMs > 0) { + this.ctx.executor().schedule( + this::handleCloseOnAuthenticationFailure, + this.failedAuthenticationDelayMs, + TimeUnit.MILLISECONDS); + } else { + handleCloseOnAuthenticationFailure(); + } + } + + private void handleCloseOnAuthenticationFailure() { + try { + this.completeCloseOnAuthenticationFailure(); + } finally { + this.close(); + } + } + + @Override + protected void completeCloseOnAuthenticationFailure() { + if (isActive.get() && authenticator != null) { + authenticator.sendAuthenticationFailureResponse(); + } + } + protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest, CompletableFuture resultFuture) { if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersionRequest.getHeader().apiVersion())) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 8f63bca196..932d709d1c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -200,6 +200,14 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int requestTimeoutMs = 30000; + @FieldContext( + category = CATEGORY_KOP, + doc = "Connection close delay on failed authentication: " + + "this is the time (in milliseconds) by which connection close " + + "will be delayed on authentication failure. " + ) + private int failedAuthenticationDelayMs = 300; + // Kafka SSL configs @FieldContext( category = CATEGORY_KOP_SSL, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java index 31a6991ca7..93f174dc7a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java @@ -79,7 +79,8 @@ public class SaslAuthenticator { private SaslServer saslServer; private Session session; private boolean enableKafkaSaslAuthenticateHeaders; - + private ByteBuf authenticationFailureResponse = null; + private ChannelHandlerContext ctx = null; private enum State { HANDSHAKE_OR_VERSIONS_REQUEST, @@ -108,6 +109,28 @@ public UnsupportedSaslMechanismException(String mechanism) { } } + /** + * Build a {@link ByteBuf} response on authenticate failure. The actual response is sent out when + * {@link #sendAuthenticationFailureResponse()} is called. + */ + private void buildResponseOnAuthenticateFailure(RequestHeader header, + AbstractRequest request, + AbstractResponse abstractResponse, + Exception e) { + this.authenticationFailureResponse = buildKafkaResponse(header, request, abstractResponse, e); + } + + /** + * Send any authentication failure response that may have been previously built. + */ + public void sendAuthenticationFailureResponse() { + if (authenticationFailureResponse == null) { + return; + } + this.sendKafkaResponse(authenticationFailureResponse); + authenticationFailureResponse = null; + } + private static void setCurrentAuthenticationService(AuthenticationService authenticationService) { if (SaslAuthenticator.authenticationService == null) { SaslAuthenticator.authenticationService = authenticationService; @@ -154,6 +177,8 @@ public void authenticate(ChannelHandlerContext ctx, throws AuthenticationException { checkArgument(requestBuf.readableBytes() > 0); log.info("Authenticate {} {} {}", ctx, saslServer, state); + + this.ctx = ctx; if (saslServer != null && saslServer.isComplete()) { setState(State.COMPLETE); return; @@ -307,11 +332,7 @@ private void handleKafkaRequest(ChannelHandlerContext ctx, try { createSaslServer(clientMechanism); } catch (AuthenticationException e) { - sendKafkaResponse(ctx, - header, - body, - null, - e); + this.authenticationFailureResponse = buildKafkaResponse(header, body, null, e); throw e; } @@ -325,6 +346,22 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx, AbstractRequest request, AbstractResponse abstractResponse, Exception e) { + ByteBuf response = buildKafkaResponse(header, request, abstractResponse, e); + ctx.channel().eventLoop().execute(() -> { + ctx.channel().writeAndFlush(response); + }); + } + + private void sendKafkaResponse(ByteBuf response) { + ctx.channel().eventLoop().execute(() -> { + ctx.channel().writeAndFlush(response); + }); + } + + private static ByteBuf buildKafkaResponse(RequestHeader header, + AbstractRequest request, + AbstractResponse abstractResponse, + Exception e) { short version = header.apiVersion(); ApiKeys apiKey = header.apiKey(); AbstractResponse backResponse; @@ -339,14 +376,11 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx, && !ApiKeys.API_VERSIONS.isVersionSupported(version)){ version = ApiKeys.API_VERSIONS.oldestVersion(); } - ByteBuf response = ResponseUtils.serializeResponse( + return ResponseUtils.serializeResponse( version, header.toResponseHeader(), backResponse ); - ctx.channel().eventLoop().execute(() -> { - ctx.channel().writeAndFlush(response); - }); } @VisibleForTesting @@ -415,11 +449,7 @@ private void handleSaslToken(ChannelHandlerContext ctx, AuthenticationException e = new AuthenticationException( "Unexpected Kafka request of type " + apiKey + " during SASL authentication"); registerRequestLatency.accept(apiKey.name, startProcessTime); - sendKafkaResponse(ctx, - header, - request, - null, - e); + buildResponseOnAuthenticateFailure(header, request, null, e); throw e; } if (!apiKey.isVersionSupported(version)) { @@ -448,11 +478,9 @@ private void handleSaslToken(ChannelHandlerContext ctx, } } catch (SaslException e) { registerRequestLatency.accept(apiKey.name, startProcessTime); - sendKafkaResponse(ctx, - header, - request, - new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), - null); + buildResponseOnAuthenticateFailure(header, request, + new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), null); + sendAuthenticationFailureResponse(); if (log.isDebugEnabled()) { log.debug("Authenticate failed for client, header {}, request {}, reason {}", header, saslAuthenticateRequest, e.getMessage()); @@ -475,9 +503,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx, } if (request.hasUnsupportedRequestVersion()) { registerRequestLatency.accept(header.apiKey().name, startProcessTime); - sendKafkaResponse(ctx, - header, - request, + sendKafkaResponse(ctx, header, request, request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()), null); } else { @@ -532,9 +558,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx, log.debug("SASL mechanism '{}' requested by client is not supported", mechanism); } registerRequestLatency.accept(header.apiKey().name, startProcessTime); - sendKafkaResponse(ctx, - header, - request, + buildResponseOnAuthenticateFailure(header, request, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, allowedMechanisms), null); throw new UnsupportedSaslMechanismException(mechanism); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java index 3c02bff57c..23f0424178 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ShutdownableThread.java @@ -56,8 +56,8 @@ public boolean isShutdownComplete() { } public synchronized boolean initiateShutdown() { - if (isRunning()) { - log.info("{} Shutting down", logIdent); + if (isRunning() && log.isDebugEnabled()) { + log.debug("{} Shutting down", logIdent); } shutdownInitiated.countDown(); if (isInterruptible) { @@ -73,7 +73,9 @@ public synchronized boolean initiateShutdown() { */ public void awaitShutdown() throws InterruptedException { shutdownComplete.await(); - log.info("{} Shutdown completed", logIdent); + if (log.isDebugEnabled()) { + log.debug("{} Shutdown completed", logIdent); + } } /** @@ -98,7 +100,9 @@ public void pause(long timeout, TimeUnit unit) throws InterruptedException { @Override public void run() { - log.info("{} Starting", logIdent); + if (log.isDebugEnabled()) { + log.debug("{} Starting", logIdent); + } try { while (isRunning()) { doWork(); @@ -106,7 +110,9 @@ public void run() { } catch (FatalExitError e) { shutdownInitiated.countDown(); shutdownComplete.countDown(); - log.info("{} Stopped", logIdent); + if (log.isDebugEnabled()) { + log.debug("{} Stopped", logIdent); + } Exit.exit(e.statusCode()); } catch (Throwable cause) { if (isRunning()) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java new file mode 100644 index 0000000000..7759d9ddad --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DelayAuthorizationFailedCloseTest.java @@ -0,0 +1,204 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.kafka.client.api.KafkaVersion; +import io.streamnative.kafka.client.api.ProducerConfiguration; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.Properties; +import javax.crypto.SecretKey; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.KafkaChannel; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test delay close handler when authorization failed. + */ +@Slf4j +public class DelayAuthorizationFailedCloseTest extends KopProtocolHandlerTestBase { + + private static final String TENANT = "DelayAuthorizationFailedCloseTest"; + private static final String NAMESPACE = "ns1"; + + private static final int FAILED_AUTHENTICATION_DELAY_MS = 300; + private static final String ADMIN_USER = "admin_user"; + protected static final int BUFFER_SIZE = 4 * 1024; + private static final long DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 1000; + + private Selector selector; + private Time time; + + @BeforeClass + @Override + protected void setup() throws Exception { + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + AuthenticationProviderToken provider = new AuthenticationProviderToken(); + + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); + ServiceConfiguration authConf = new ServiceConfiguration(); + authConf.setProperties(properties); + provider.initialize(authConf); + + String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); + + super.resetConfig(); + conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); + conf.setKafkaMetadataTenant("internal"); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setKafkaTenant(TENANT); + conf.setKafkaNamespace(NAMESPACE); + + conf.setClusterName(super.configClusterName); + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setSuperUserRoles(Sets.newHashSet(ADMIN_USER)); + conf.setAuthenticationProviders( + Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + adminToken); + conf.setProperties(properties); + conf.setFailedAuthenticationDelayMs(FAILED_AUTHENTICATION_DELAY_MS); + + super.internalSetup(); + log.info("success internal setup"); + + if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/__kafka")) { + admin.namespaces().createNamespace(TENANT + "/__kafka"); + admin.namespaces().setNamespaceReplicationClusters(TENANT + "/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention(TENANT + "/__kafka", + new RetentionPolicies(-1, -1)); + } + log.info("created namespaces, init handler"); + + time = Time.SYSTEM; + Metrics metrics = new Metrics(time); + ProducerConfiguration producerConfiguration = producerConfiguration(); + ChannelBuilder channelBuilder = + ClientUtils.createChannelBuilder(new ProducerConfig(producerConfiguration.toProperties())); + String clientId = "clientId"; + selector = new Selector( + DEFAULT_CONNECTION_MAX_IDLE_MS, + metrics, + time, + "test-selector", + channelBuilder, + new LogContext(String.format("[Test Selector clientId=%s] ", clientId))); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 20000) + void testClientConnectionClose() throws IOException { + String id = "0"; + blockingConnect(id); + KafkaChannel channel = selector.channel(id); + assertTrue(channel.isConnected()); + + log.info("Channel is connected: [{}]", channel.isConnected()); + + long startTimeMs = time.milliseconds(); + + // Send Metadata request + MetadataRequest.Builder builder = MetadataRequest.Builder.allTopics(); + AbstractRequest request = builder.build(); + selector.send(request.toSend(id, + new RequestHeader(builder.apiKey(), request.version(), "fake_client_id", 0))); + + Awaitility.await().until(() -> { + poll(selector); + return !selector.channels().isEmpty(); + }); + + // Wait until handler close. + Awaitility.await().until(() -> { + poll(selector); + return selector.channels().isEmpty(); + }); + + assertTrue(time.milliseconds() >= startTimeMs + FAILED_AUTHENTICATION_DELAY_MS); + } + + + protected ProducerConfiguration producerConfiguration() { + return ProducerConfiguration.builder() + .bootstrapServers("localhost:" + getKafkaBrokerPort()) + .keySerializer(KafkaVersion.DEFAULT.getStringSerializer()) + .valueSerializer(KafkaVersion.DEFAULT.getStringSerializer()) + .build(); + } + + // connect and wait for the connection to complete + private void blockingConnect(String node) throws IOException { + blockingConnect(node, new InetSocketAddress("localhost", getKafkaBrokerPort())); + } + + protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException { + selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE); + while (!selector.connected().contains(node)) { + selector.poll(10000L); + } + while (!selector.isChannelReady(node)) { + selector.poll(10000L); + } + } + + private void poll(Selector selector) { + try { + selector.poll(50); + } catch (IOException e) { + Assert.fail("Caught unexpected exception " + e); + } + } +}