Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Support kop for v0 version of SASL_HANDSHAKE #676

Merged
merged 13 commits into from
Aug 27, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.streamnative.kafka.client.api.ConsumerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

/**
* The implementation of Kafka consumer 0.10.0.0.
Expand All @@ -35,4 +37,9 @@ public List<ConsumerRecord<K, V>> receive(long timeoutMs) {
poll(timeoutMs).forEach(record -> records.add(ConsumerRecord.createOldRecord(record)));
return records;
}

@Override
public Map<String, List<PartitionInfo>> listTopics(long timeoutMS) {
return listTopics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.streamnative.kafka.client.api.ConsumerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

/**
* The implementation of Kafka consumer 1.0.0.
Expand All @@ -35,4 +37,9 @@ public List<ConsumerRecord<K, V>> receive(long timeoutMs) {
poll(timeoutMs).forEach(record -> records.add(ConsumerRecord.create(record)));
return records;
}

@Override
public Map<String, List<PartitionInfo>> listTopics(long timeoutMS) {
return listTopics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import org.apache.kafka.common.PartitionInfo;

/**
* A common interface of Kafka consumer.
Expand Down Expand Up @@ -57,4 +59,6 @@ default List<ConsumerRecord<K, V>> receiveUntil(int maxNumMessages, long timeout
}
return records;
}

Map<String, List<PartitionInfo>> listTopics(long timeoutMS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class ConsumerConfiguration {
private Object valueDeserializer;
@Builder.Default
private boolean fromEarliest = true;
private String securityProtocol;
private String saslMechanism;
private String userName;
private String password;
private String requestTimeoutMs;

public Properties toProperties() {
final Properties props = new Properties();
Expand All @@ -44,6 +49,21 @@ public Properties toProperties() {
props.put("value.deserializer", valueDeserializer);
}
props.put("auto.offset.reset", fromEarliest ? "earliest" : "latest");

if (securityProtocol != null) {
props.put("security.protocol", securityProtocol);
}
if (saslMechanism != null) {
props.put("sasl.mechanism", saslMechanism);
}
if (userName != null && password != null) {
final String kafkaAuth = String.format("username=\"%s\" password=\"%s\";", userName, password);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ kafkaAuth);
}
if (requestTimeoutMs != null) {
props.put("request.timeout.ms", requestTimeoutMs);
}
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public class ProducerConfiguration {
private String bootstrapServers;
private Object keySerializer;
private Object valueSerializer;
private String maxBlockMs;
private String securityProtocol;
private String saslMechanism;
private String userName;
private String password;

public Properties toProperties() {
final Properties props = new Properties();
Expand All @@ -37,6 +42,20 @@ public Properties toProperties() {
if (valueSerializer != null) {
props.put("value.serializer", valueSerializer);
}
if (maxBlockMs != null) {
props.put("max.block.ms", maxBlockMs);
}
if (securityProtocol != null) {
props.put("security.protocol", securityProtocol);
}
if (saslMechanism != null) {
props.put("sasl.mechanism", saslMechanism);
}
if (userName != null && password != null) {
final String kafkaAuth = String.format("username=\"%s\" password=\"%s\";", userName, password);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ kafkaAuth);
}
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import javax.naming.AuthenticationException;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -154,11 +155,47 @@ protected static ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeade
}
}

protected Boolean channelReady() {
return hasAuthenticated();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;

// Update parse request latency metrics
final BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> {
requestStats.getRequestParseLatencyStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(timeBeforeParse), TimeUnit.NANOSECONDS);
};

// Update handle request latency metrics
final BiConsumer<String, Long> registerRequestLatency = (apiName, startProcessTime) -> {
requestStats.getStatsLogger()
.scopeLabel(KopServerStats.REQUEST_SCOPE, apiName)
.getOpStatsLogger(KopServerStats.REQUEST_LATENCY)
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime),
TimeUnit.NANOSECONDS);
};

// If kop is enabled for authentication and the client
// has not completed the handshake authentication,
// execute channelPrepare to complete authentication
if (isActive.get() && !channelReady()) {
try {

channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency);
return;
} catch (AuthenticationException e) {
log.error("unexpected error in authenticate:", e);
close();
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
return;
} finally {
buffer.release();
}
}

Channel channel = ctx.channel();
SocketAddress remoteAddress = null;
if (null != channel) {
Expand All @@ -168,8 +205,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
final long timeBeforeParse = MathUtils.nowInNano();
KafkaHeaderAndRequest kafkaHeaderAndRequest = byteBufToRequest(buffer, remoteAddress);
// potentially blocking until there is room in the queue for the request.
requestStats.getRequestParseLatencyStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(timeBeforeParse), TimeUnit.NANOSECONDS);
registerRequestParseLatency.accept(timeBeforeParse, null);

try {
if (log.isDebugEnabled()) {
log.debug("[{}] Received kafka cmd {}, the request content is: {}",
Expand All @@ -189,11 +226,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// no need to call `writeAndFlushResponseToClient` again.
return;
}
requestStats.getStatsLogger()
.scopeLabel(KopServerStats.REQUEST_SCOPE, kafkaHeaderAndRequest.getHeader().apiKey().name)
.getOpStatsLogger(KopServerStats.REQUEST_LATENCY)
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessRequestTimestamp),
TimeUnit.NANOSECONDS);

registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey().name,
startProcessRequestTimestamp);

ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
Expand All @@ -206,10 +241,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (!isActive.get()) {
handleInactive(kafkaHeaderAndRequest, responseFuture);
} else {
if (!hasAuthenticated(kafkaHeaderAndRequest)) {
authenticate(kafkaHeaderAndRequest, responseFuture);
return;
}
switch (kafkaHeaderAndRequest.getHeader().apiKey()) {
case API_VERSIONS:
handleApiVersionsRequest(kafkaHeaderAndRequest, responseFuture);
Expand Down Expand Up @@ -295,9 +326,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
handleError(kafkaHeaderAndRequest, responseFuture);
}
}
} catch (AuthenticationException e) {
log.error("unexpected error in authenticate:", e);
close();
} catch (Exception e) {
log.error("error while handle command:", e);
close();
Expand Down Expand Up @@ -409,10 +437,12 @@ protected void writeAndFlushResponseToClient(Channel channel) {
}
}

protected abstract boolean hasAuthenticated(KafkaHeaderAndRequest kafkaHeaderAndRequest);
protected abstract boolean hasAuthenticated();

protected abstract void
authenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response)
protected abstract void channelPrepare(ChannelHandlerContext ctx,
ByteBuf requestBuf,
BiConsumer<Long, Throwable> registerRequestParseLatency,
BiConsumer<String, Long> registerRequestLatency)
throws AuthenticationException;

protected abstract void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,18 @@ protected void close() {
}

@Override
protected boolean hasAuthenticated(KafkaHeaderAndRequest request) {
protected boolean hasAuthenticated() {
return authenticator == null || authenticator.complete();
}

@Override
protected void authenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> responseFuture) throws AuthenticationException {
protected void channelPrepare(ChannelHandlerContext ctx,
ByteBuf requestBuf,
BiConsumer<Long, Throwable> registerRequestParseLatency,
BiConsumer<String, Long> registerRequestLatency)
throws AuthenticationException {
if (authenticator != null) {
authenticator.authenticate(
kafkaHeaderAndRequest.getHeader(), kafkaHeaderAndRequest.getRequest(), responseFuture);
authenticator.authenticate(ctx, requestBuf, registerRequestParseLatency, registerRequestLatency);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {

authorizationId = authState.getAuthRole();
complete = true;
return null;
return new byte[0];
} catch (AuthenticationException e) {
throw new SaslException(e.getMessage());
}
Expand Down
Loading