Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Fix the incompatibility with Pulsar 3.0.0.1-SNAPSHOT #1759

Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
kafkaConfig,
requestStats,
Time.SYSTEM,
brokerService.getEntryFilters(),
brokerService.getEntryFilterProvider().getBrokerEntryFilters(),
producePurgatory,
fetchPurgatory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.AuthenticationException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -99,8 +101,10 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio
throw new SchemaStorageException("Pulsar is not configured for Token auth");
}
try {
AuthData authData = AuthData.of(password.getBytes(StandardCharsets.UTF_8));
final AuthenticationState authState = authenticationProvider
.newAuthState(AuthData.of(password.getBytes(StandardCharsets.UTF_8)), null, null);
.newAuthState(authData, null, null);
authState.authenticateAsync(authData).get(kafkaConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
final String role = authState.getAuthRole();

final String tenant;
Expand All @@ -118,7 +122,7 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio

performAuthorizationValidation(username, role, tenant);
return tenant;
} catch (AuthenticationException err) {
} catch (ExecutionException | InterruptedException | TimeoutException | AuthenticationException err) {
throw new SchemaStorageException(err);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
Expand All @@ -32,7 +31,6 @@
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand All @@ -45,9 +43,9 @@ public abstract class AbstractEntryFormatter implements EntryFormatter {
public static final String IDENTITY_KEY = "entry.format";
public static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
private final Time time = Time.SYSTEM;
private final ImmutableList<EntryFilterWithClassLoader> entryfilters;
private final List<EntryFilter> entryfilters;

protected AbstractEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
protected AbstractEntryFormatter(List<EntryFilter> entryfilters) {
this.entryfilters = entryfilters;
}

Expand Down Expand Up @@ -140,7 +138,7 @@ protected static boolean isKafkaEntryFormat(final MessageMetadata messageMetadat
}

protected EntryFilter.FilterResult filterOnlyByMsgMetadata(MessageMetadata msgMetadata, Entry entry,
List<EntryFilterWithClassLoader> entryFilters) {
List<EntryFilter> entryFilters) {
if (entryFilters == null || entryFilters.isEmpty()) {
return EntryFilter.FilterResult.ACCEPT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
*/
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import java.util.List;
import org.apache.pulsar.broker.service.plugin.EntryFilter;

/**
* Factory of EntryFormatter.
Expand All @@ -32,21 +31,18 @@ enum EntryFormat {
}

public static EntryFormatter create(final KafkaServiceConfiguration kafkaConfig,
final ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap,
final List<EntryFilter> entryFilters,
final String format) {
try {
EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase());

ImmutableList<EntryFilterWithClassLoader> entryfilters =
entryfilterMap == null ? ImmutableList.of() : entryfilterMap.values().asList();

switch (entryFormat) {
case PULSAR:
return new PulsarEntryFormatter(entryfilters);
return new PulsarEntryFormatter(entryFilters);
case KAFKA:
return new KafkaV1EntryFormatter(entryfilters);
return new KafkaV1EntryFormatter(entryFilters);
case MIXED_KAFKA:
return new KafkaMixedEntryFormatter(entryfilters);
return new KafkaMixedEntryFormatter(entryFilters);
default:
throw new Exception("No EntryFormatter for " + entryFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
Expand All @@ -25,7 +24,7 @@
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;

Expand All @@ -37,7 +36,7 @@
@Slf4j
public class KafkaMixedEntryFormatter extends AbstractEntryFormatter {

protected KafkaMixedEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
protected KafkaMixedEntryFormatter(List<EntryFilter> entryfilters) {
super(entryfilters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
*/
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;

Expand All @@ -32,7 +31,7 @@
@Slf4j
public class KafkaV1EntryFormatter extends AbstractEntryFormatter {

protected KafkaV1EntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
protected KafkaV1EntryFormatter(List<EntryFilter> entryfilters) {
super(entryfilters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.utils.PulsarMessageBuilder;
Expand All @@ -28,7 +27,7 @@
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MarkerType;
Expand All @@ -46,8 +45,8 @@ public class PulsarEntryFormatter extends AbstractEntryFormatter {
private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

protected PulsarEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
super(entryfilters);
protected PulsarEntryFormatter(List<EntryFilter> entryFilters) {
super(entryFilters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.SaslAuth;
import io.streamnative.pulsar.handlers.kop.utils.SaslUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.AuthenticationException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
Expand All @@ -31,7 +35,6 @@
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.api.AuthData;

/**
Expand All @@ -43,17 +46,19 @@ public class PlainSaslServer implements SaslServer {
public static final String PLAIN_MECHANISM = "PLAIN";

private final AuthenticationService authenticationService;
private final PulsarAdmin admin;
private final KafkaServiceConfiguration config;

private boolean complete;
private String authorizationId;
private String username;
private AuthenticationDataSource authDataSource;
private Set<String> proxyRoles;

public PlainSaslServer(AuthenticationService authenticationService, PulsarAdmin admin, Set<String> proxyRoles) {
public PlainSaslServer(AuthenticationService authenticationService,
KafkaServiceConfiguration config,
Set<String> proxyRoles) {
this.authenticationService = authenticationService;
this.admin = admin;
this.config = config;
this.proxyRoles = proxyRoles;
}

Expand All @@ -79,8 +84,9 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
}

try {
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8)), null, null);
final AuthData authData = AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8));
final AuthenticationState authState = authenticationProvider.newAuthState(authData, null, null);
authState.authenticateAsync(authData).get(config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
final String role = authState.getAuthRole();
if (StringUtils.isEmpty(role)) {
throw new AuthenticationException("Role cannot be empty.");
Expand Down Expand Up @@ -109,7 +115,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
}
complete = true;
return new byte[0];
} catch (AuthenticationException e) {
} catch (AuthenticationException | ExecutionException | InterruptedException | TimeoutException e) {
throw new SaslException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class SaslAuthenticator {
public static final String USER_NAME_PROP = "username";
public static final String AUTH_DATA_SOURCE_PROP = "authDataSource";
public static final String AUTHENTICATION_SERVER_OBJ = "authenticationServerObj";
public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMs";

private static final byte[] EMPTY_BUFFER = new byte[0];

Expand All @@ -88,6 +89,7 @@ public class SaslAuthenticator {
private boolean enableKafkaSaslAuthenticateHeaders;
private ByteBuf authenticationFailureResponse = null;
private ChannelHandlerContext ctx = null;
private KafkaServiceConfiguration config;
private String defaultKafkaMetadataTenant;

private enum State {
Expand Down Expand Up @@ -174,6 +176,7 @@ public SaslAuthenticator(PulsarService pulsarService,
? createOAuth2CallbackHandler(config) : null;
this.enableKafkaSaslAuthenticateHeaders = false;
this.defaultKafkaMetadataTenant = config.getKafkaMetadataTenant();
this.config = config;
}

/**
Expand Down Expand Up @@ -288,6 +291,7 @@ private void setState(State state) {
oauth2Configs);
HashMap<String, Object> configs = new HashMap<>();
configs.put(AUTHENTICATION_SERVER_OBJ, this.getAuthenticationService());
configs.put(REQUEST_TIMEOUT_MS, config.getRequestTimeoutMs());
handler.configure(configs, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
Collections.singletonList(appConfigurationEntry));
return handler;
Expand All @@ -296,7 +300,7 @@ private void setState(State state) {
private void createSaslServer(final String mechanism) throws AuthenticationException {
// TODO: support more mechanisms, see https://github.com/streamnative/kop/issues/235
if (mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) {
saslServer = new PlainSaslServer(authenticationService, admin, proxyRoles);
saslServer = new PlainSaslServer(authenticationService, config, proxyRoles);
} else if (mechanism.equals(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) {
if (this.oauth2CallbackHandler == null) {
throw new IllegalArgumentException("No OAuth2CallbackHandler found when mechanism is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.AuthenticationException;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
Expand All @@ -47,6 +50,8 @@ public class OauthValidatorCallbackHandler implements AuthenticateCallbackHandle

private ServerConfig config = null;
private AuthenticationService authenticationService;
private int requestTimeoutMs;


public OauthValidatorCallbackHandler() {}

Expand Down Expand Up @@ -78,6 +83,7 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf

this.authenticationService = (AuthenticationService) configs.get(SaslAuthenticator.AUTHENTICATION_SERVER_OBJ);
this.config = new ServerConfig(options);
this.requestTimeoutMs = (Integer) configs.get(SaslAuthenticator.REQUEST_TIMEOUT_MS);
}

@Override
Expand Down Expand Up @@ -126,8 +132,10 @@ protected void handleCallback(KopOAuthBearerValidatorCallback callback) {
final String tenant = tokenAndTenant.getRight();

try {
AuthData authData = AuthData.of(token.getBytes(StandardCharsets.UTF_8));
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(token.getBytes(StandardCharsets.UTF_8)), null, null);
authData, null, null);
authState.authenticateAsync(authData).get(requestTimeoutMs, TimeUnit.MILLISECONDS);
final String role = authState.getAuthRole();
AuthenticationDataSource authDataSource = authState.getAuthDataSource();
callback.token(new KopOAuthBearerToken() {
Expand Down Expand Up @@ -168,7 +176,7 @@ public Long startTimeMs() {
return Long.MAX_VALUE;
}
});
} catch (AuthenticationException e) {
} catch (AuthenticationException | InterruptedException | ExecutionException | TimeoutException e) {
log.error("OAuth validator callback handler new auth state failed: ", e);
throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure(e.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -79,7 +78,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.naming.TopicName;

/**
Expand Down Expand Up @@ -112,18 +111,18 @@ public class PartitionLog {
private final AtomicReference<CompletableFuture<EntryFormatter>> entryFormatter = new AtomicReference<>();
private final ProducerStateManager producerStateManager;

private final ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap;
private final List<EntryFilter> entryFilters;
private final boolean preciseTopicPublishRateLimitingEnable;

public PartitionLog(KafkaServiceConfiguration kafkaConfig,
RequestStats requestStats,
Time time,
TopicPartition topicPartition,
String fullPartitionName,
ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap,
List<EntryFilter> entryFilters,
ProducerStateManager producerStateManager) {
this.kafkaConfig = kafkaConfig;
this.entryfilterMap = entryfilterMap;
this.entryFilters = entryFilters;
this.requestStats = requestStats;
this.time = time;
this.topicPartition = topicPartition;
Expand Down Expand Up @@ -183,7 +182,7 @@ private EntryFormatter buildEntryFormatter(Map<String, String> topicProperties)
log.debug("entryFormat for {} is {} (topicProperties {})", fullPartitionName,
entryFormat, topicProperties);
}
return EntryFormatterFactory.create(kafkaConfig, entryfilterMap, entryFormat);
return EntryFormatterFactory.create(kafkaConfig, entryFilters, entryFormat);
}

@Data
Expand Down
Loading