Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[FEATURE] Support delay close on authentication failed (#709)
Browse files Browse the repository at this point in the history
Fix #668

## Motivation
Performing authentication is one of the most expensive operation performed in the network thread, so much so that it could end up saturating them, preventing from getting any useful work from being done.

## Modifications
* Support delay close on authentication failed.
* Change logs level.
  • Loading branch information
Demogorgon314 authored and BewareMyPower committed Sep 8, 2021
1 parent c68a086 commit 8fb10fd
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ This section lists configurations that may affect the performance.
| ----------------- | ------------------------------------------------------------ | ------- |
| maxQueuedRequests | Limit the queue size for request, like `queued.max.requests` in Kafka server. | 500 |
| requestTimeoutMs | Limit the timeout in milliseconds for request, like `request.timeout.ms` in Kafka client.<br>If a request was not processed in the timeout, KoP would return an error response to client. | 30000 |
| failedAuthenticationDelayMs | Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure, like `connection.failed.authentication.delay.ms` in Kafka server. | 300 |

> **NOTE**
>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache,
enableTls, advertisedEndPoint, statsLogger));
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, adminManager, localBrokerDataCache,
enableTls, advertisedEndPoint, statsLogger));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// execute channelPrepare to complete authentication
if (isActive.get() && !channelReady()) {
try {

channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency);
return;
} catch (AuthenticationException e) {
log.error("unexpected error in authenticate:", e);
close();
log.error("Failed authentication with [{}] ({})", this.remoteAddress, e.getMessage());
maybeDelayCloseOnAuthenticationFailure();
return;
} finally {
buffer.release();
Expand Down Expand Up @@ -445,6 +444,10 @@ protected abstract void channelPrepare(ChannelHandlerContext ctx,
BiConsumer<String, Long> registerRequestLatency)
throws AuthenticationException;

protected abstract void maybeDelayCloseOnAuthenticationFailure();

protected abstract void completeCloseOnAuthenticationFailure();

protected abstract void
handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,16 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, false,
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, transactionCoordinator, adminManager, true,
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
advertisedEndPoint, rootStatsLogger.scope(SERVER_SCOPE), localBrokerDataCache));
break;
}
});

return builder.build();
} catch (Exception e){
log.error("KafkaProtocolHandler newChannelInitializers failed with ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final String advertisedListeners;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
private final int failedAuthenticationDelayMs;
private final String offsetsTopicName;
private final String txnTopicName;
private final Set<String> allowedNamespaces;
Expand Down Expand Up @@ -286,6 +287,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
this.resumeThresholdPendingBytes = this.maxPendingBytes / 2;
this.failedAuthenticationDelayMs = kafkaConfig.getFailedAuthenticationDelayMs();

// update alive channel count stats
RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
Expand All @@ -310,7 +312,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// update active channel count stats
RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
log.info("channel inactive {}", ctx.channel());

close();
}
Expand Down Expand Up @@ -349,6 +350,33 @@ protected void channelPrepare(ChannelHandlerContext ctx,
}
}

@Override
protected void maybeDelayCloseOnAuthenticationFailure() {
if (this.failedAuthenticationDelayMs > 0) {
this.ctx.executor().schedule(
this::handleCloseOnAuthenticationFailure,
this.failedAuthenticationDelayMs,
TimeUnit.MILLISECONDS);
} else {
handleCloseOnAuthenticationFailure();
}
}

private void handleCloseOnAuthenticationFailure() {
try {
this.completeCloseOnAuthenticationFailure();
} finally {
this.close();
}
}

@Override
protected void completeCloseOnAuthenticationFailure() {
if (isActive.get() && authenticator != null) {
authenticator.sendAuthenticationFailureResponse();
}
}

protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest,
CompletableFuture<AbstractResponse> resultFuture) {
if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersionRequest.getHeader().apiVersion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private int requestTimeoutMs = 30000;

@FieldContext(
category = CATEGORY_KOP,
doc = "Connection close delay on failed authentication: "
+ "this is the time (in milliseconds) by which connection close "
+ "will be delayed on authentication failure. "
)
private int failedAuthenticationDelayMs = 300;

// Kafka SSL configs
@FieldContext(
category = CATEGORY_KOP_SSL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public class SaslAuthenticator {
private SaslServer saslServer;
private Session session;
private boolean enableKafkaSaslAuthenticateHeaders;

private ByteBuf authenticationFailureResponse = null;
private ChannelHandlerContext ctx = null;

private enum State {
HANDSHAKE_OR_VERSIONS_REQUEST,
Expand Down Expand Up @@ -108,6 +109,28 @@ public UnsupportedSaslMechanismException(String mechanism) {
}
}

/**
* Build a {@link ByteBuf} response on authenticate failure. The actual response is sent out when
* {@link #sendAuthenticationFailureResponse()} is called.
*/
private void buildResponseOnAuthenticateFailure(RequestHeader header,
AbstractRequest request,
AbstractResponse abstractResponse,
Exception e) {
this.authenticationFailureResponse = buildKafkaResponse(header, request, abstractResponse, e);
}

/**
* Send any authentication failure response that may have been previously built.
*/
public void sendAuthenticationFailureResponse() {
if (authenticationFailureResponse == null) {
return;
}
this.sendKafkaResponse(authenticationFailureResponse);
authenticationFailureResponse = null;
}

private static void setCurrentAuthenticationService(AuthenticationService authenticationService) {
if (SaslAuthenticator.authenticationService == null) {
SaslAuthenticator.authenticationService = authenticationService;
Expand Down Expand Up @@ -154,6 +177,8 @@ public void authenticate(ChannelHandlerContext ctx,
throws AuthenticationException {
checkArgument(requestBuf.readableBytes() > 0);
log.info("Authenticate {} {} {}", ctx, saslServer, state);

this.ctx = ctx;
if (saslServer != null && saslServer.isComplete()) {
setState(State.COMPLETE);
return;
Expand Down Expand Up @@ -307,11 +332,7 @@ private void handleKafkaRequest(ChannelHandlerContext ctx,
try {
createSaslServer(clientMechanism);
} catch (AuthenticationException e) {
sendKafkaResponse(ctx,
header,
body,
null,
e);
this.authenticationFailureResponse = buildKafkaResponse(header, body, null, e);
throw e;
}

Expand All @@ -325,6 +346,22 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx,
AbstractRequest request,
AbstractResponse abstractResponse,
Exception e) {
ByteBuf response = buildKafkaResponse(header, request, abstractResponse, e);
ctx.channel().eventLoop().execute(() -> {
ctx.channel().writeAndFlush(response);
});
}

private void sendKafkaResponse(ByteBuf response) {
ctx.channel().eventLoop().execute(() -> {
ctx.channel().writeAndFlush(response);
});
}

private static ByteBuf buildKafkaResponse(RequestHeader header,
AbstractRequest request,
AbstractResponse abstractResponse,
Exception e) {
short version = header.apiVersion();
ApiKeys apiKey = header.apiKey();
AbstractResponse backResponse;
Expand All @@ -339,14 +376,11 @@ private static void sendKafkaResponse(ChannelHandlerContext ctx,
&& !ApiKeys.API_VERSIONS.isVersionSupported(version)){
version = ApiKeys.API_VERSIONS.oldestVersion();
}
ByteBuf response = ResponseUtils.serializeResponse(
return ResponseUtils.serializeResponse(
version,
header.toResponseHeader(),
backResponse
);
ctx.channel().eventLoop().execute(() -> {
ctx.channel().writeAndFlush(response);
});
}

@VisibleForTesting
Expand Down Expand Up @@ -415,11 +449,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
AuthenticationException e = new AuthenticationException(
"Unexpected Kafka request of type " + apiKey + " during SASL authentication");
registerRequestLatency.accept(apiKey.name, startProcessTime);
sendKafkaResponse(ctx,
header,
request,
null,
e);
buildResponseOnAuthenticateFailure(header, request, null, e);
throw e;
}
if (!apiKey.isVersionSupported(version)) {
Expand Down Expand Up @@ -448,11 +478,9 @@ private void handleSaslToken(ChannelHandlerContext ctx,
}
} catch (SaslException e) {
registerRequestLatency.accept(apiKey.name, startProcessTime);
sendKafkaResponse(ctx,
header,
request,
new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()),
null);
buildResponseOnAuthenticateFailure(header, request,
new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), null);
sendAuthenticationFailureResponse();
if (log.isDebugEnabled()) {
log.debug("Authenticate failed for client, header {}, request {}, reason {}",
header, saslAuthenticateRequest, e.getMessage());
Expand All @@ -475,9 +503,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx,
}
if (request.hasUnsupportedRequestVersion()) {
registerRequestLatency.accept(header.apiKey().name, startProcessTime);
sendKafkaResponse(ctx,
header,
request,
sendKafkaResponse(ctx, header, request,
request.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()),
null);
} else {
Expand Down Expand Up @@ -532,9 +558,7 @@ private void handleApiVersionsRequest(ChannelHandlerContext ctx,
log.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
}
registerRequestLatency.accept(header.apiKey().name, startProcessTime);
sendKafkaResponse(ctx,
header,
request,
buildResponseOnAuthenticateFailure(header, request,
new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, allowedMechanisms),
null);
throw new UnsupportedSaslMechanismException(mechanism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean isShutdownComplete() {
}

public synchronized boolean initiateShutdown() {
if (isRunning()) {
log.info("{} Shutting down", logIdent);
if (isRunning() && log.isDebugEnabled()) {
log.debug("{} Shutting down", logIdent);
}
shutdownInitiated.countDown();
if (isInterruptible) {
Expand All @@ -73,7 +73,9 @@ public synchronized boolean initiateShutdown() {
*/
public void awaitShutdown() throws InterruptedException {
shutdownComplete.await();
log.info("{} Shutdown completed", logIdent);
if (log.isDebugEnabled()) {
log.debug("{} Shutdown completed", logIdent);
}
}

/**
Expand All @@ -98,15 +100,19 @@ public void pause(long timeout, TimeUnit unit) throws InterruptedException {

@Override
public void run() {
log.info("{} Starting", logIdent);
if (log.isDebugEnabled()) {
log.debug("{} Starting", logIdent);
}
try {
while (isRunning()) {
doWork();
}
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
log.info("{} Stopped", logIdent);
if (log.isDebugEnabled()) {
log.debug("{} Stopped", logIdent);
}
Exit.exit(e.statusCode());
} catch (Throwable cause) {
if (isRunning()) {
Expand Down
Loading

0 comments on commit 8fb10fd

Please sign in to comment.