Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Support OAUTHBEARER mechanism for KoP (#400)
Browse files Browse the repository at this point in the history
This PR introduces OAUTHBEARER mechanism to KoP.

Two config items are added. One is the server callback handler to validate OAUTHBEARER token, the other is the properties configuration file that contains the server callback handler's configs. The default server callback handler is the same as Kafka, which validates unsecured JSON Web Tokens.

In addition, the authenticator is refactored that now it uses different `SaslServer` for different mechanisms to perform authentication.

Related tests are added to verify these configs and the default server callback handler:
- `KafkaServerConfiguration#testGetKopOauth2Configs`: unit test for `getKopOauth2Properties` method.
- `SaslOAuthBearerTest`: test `OAUTHBEARER` SASL mechanism with Kafka's default server callback handler.
- `CustomOAuthBearerCallbackHandlerTest`: test custom `AuthenticateCallbackHandler`.
  • Loading branch information
BewareMyPower authored Mar 12, 2021
1 parent 5ad37f3 commit 0484bc5
Show file tree
Hide file tree
Showing 11 changed files with 671 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
final boolean authenticationEnabled = pulsarService.getBrokerService().isAuthenticationEnabled()
&& !kafkaConfig.getSaslAllowedMechanisms().isEmpty();
this.authenticator = authenticationEnabled
? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms())
? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms(), kafkaConfig)
: null;
this.adminManager = new AdminManager(admin);
this.tlsEnabled = tlsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -40,6 +46,8 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
private static final int OffsetsRetentionMinutes = 7 * 24 * 60;
public static final int DefaultOffsetsTopicNumPartitions = 8;

public static final String DEFAULT_OAUTH2_CONFIG_FILE = "kop-oauth2.properties";

@Category
private static final String CATEGORY_KOP = "Kafka on Pulsar";

Expand Down Expand Up @@ -292,6 +300,22 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean enableTransactionCoordinator = false;

@FieldContext(
category = CATEGORY_KOP,
doc = "The fully qualified name of a SASL server callback handler class that implements the "
+ "AuthenticateCallbackHandler interface, which is used for OAuth2 authentication. "
+ "If it's not set, the class will be Kafka's default server callback handler for "
+ "OAUTHBEARER mechanism: OAuthBearerUnsecuredValidatorCallbackHandler."
)
private String kopOauth2AuthenticateCallbackHandler;

@FieldContext(
category = CATEGORY_KOP,
doc = "The properties configuration file of OAuth2 authentication. If it's not specified, use"
+ " kop-oauth2.properties file under resource directory"
)
private String kopOauth2ConfigFile;

public @NonNull String getKafkaAdvertisedListeners() {
if (kafkaAdvertisedListeners != null) {
return kafkaAdvertisedListeners;
Expand All @@ -307,4 +331,21 @@ public String getListeners() {
return (kafkaListeners != null) ? kafkaListeners : listeners;
}

private InputStream getKopOauth2ConfigInputStream() throws FileNotFoundException {
if (kopOauth2ConfigFile == null) {
return KafkaServiceConfiguration.class.getClassLoader().getResourceAsStream(DEFAULT_OAUTH2_CONFIG_FILE);
} else {
return new FileInputStream(kopOauth2ConfigFile);
}
}

public @NonNull Properties getKopOauth2Properties() {
final Properties props = new Properties();
try (InputStream inputStream = getKopOauth2ConfigInputStream()) {
props.load(inputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security;

import io.streamnative.pulsar.handlers.kop.SaslAuth;
import io.streamnative.pulsar.handlers.kop.utils.SaslUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import javax.naming.AuthenticationException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.policies.data.AuthAction;

/**
* The SaslServer implementation for SASL/PLAIN.
*/
public class PlainSaslServer implements SaslServer {

public static final String PLAIN_MECHANISM = "PLAIN";

private final AuthenticationService authenticationService;
private final PulsarAdmin admin;

private boolean complete;
private String authorizationId;

public PlainSaslServer(AuthenticationService authenticationService, PulsarAdmin admin) {
this.authenticationService = authenticationService;
this.admin = admin;
}

@Override
public String getMechanismName() {
return PLAIN_MECHANISM;
}

@Override
public byte[] evaluateResponse(byte[] response) throws SaslException {
SaslAuth saslAuth;
try {
saslAuth = SaslUtils.parseSaslAuthBytes(response);
} catch (IOException e) {
throw new SaslException(e.getMessage());
}

AuthenticationProvider authenticationProvider =
authenticationService.getAuthenticationProvider(saslAuth.getAuthMethod());
if (authenticationProvider == null) {
throw new SaslException("No AuthenticationProvider found for method " + saslAuth.getAuthMethod());
}

try {
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8)), null, null);
// TODO: we need to let KafkaRequestHandler do the authorization works. Here we just check the permissions
// of the namespace, which is the namespace. See https://github.com/streamnative/kop/issues/236
final String namespace = saslAuth.getUsername();
Map<String, Set<AuthAction>> permissions = admin.namespaces().getPermissions(namespace);
final String role = authState.getAuthRole();
if (!permissions.containsKey(role)) {
throw new AuthenticationException("Role: " + role + " is not allowed on namespace " + namespace);
}

authorizationId = role;
complete = true;
return null;
} catch (AuthenticationException | PulsarAdminException e) {
throw new SaslException(e.getMessage());
}
}

@Override
public boolean isComplete() {
return complete;
}

@Override
public String getAuthorizationID() {
return authorizationId;
}

@Override
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
if (!complete) {
throw new IllegalStateException("Authentication exchange has not completed");
}
return Arrays.copyOfRange(incoming, offset, offset + len);
}

@Override
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
if (!complete) {
throw new IllegalStateException("Authentication exchange has not completed");
}
return Arrays.copyOfRange(outgoing, offset, offset + len);
}

@Override
public Object getNegotiatedProperty(String propName) {
if (!complete) {
throw new IllegalStateException("Authentication exchange has not completed");
}
return null;
}

@Override
public void dispose() throws SaslException {
}
}
Loading

0 comments on commit 0484bc5

Please sign in to comment.