Skip to content

Commit

Permalink
Add support of auth&auth at discovery BinaryProto-lookup (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Dec 3, 2016
1 parent d04b36f commit 710755f
Show file tree
Hide file tree
Showing 18 changed files with 752 additions and 529 deletions.
26 changes: 26 additions & 0 deletions conf/discovery.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@
# Zookeeper quorum connection string (comma-separated)
zookeeperServers=

# Global zookeeper quorum connection string (comma-separated)
globalZookeeperServers=

# Port to use to server binary-proto request
servicePort=6650

# Port to use to server binary-proto-tls request
servicePortTls=6651

# Port that discovery service listen on
webServicePort=8080

# Port to use to server HTTPS request
webServicePortTls=8443

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false

### --- Authentication --- ###

# Enable authentication
authenticationEnabled=false

# Authentication provider name list, which is comma separated list of class names (comma-separated)
authenticationProviders=

# Enforce authorization
authorizationEnabled=false

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics (comma-separated)
superUserRoles=

##### --- TLS --- #####
# Enable TLS
Expand Down
14 changes: 1 addition & 13 deletions docs/BinaryProtocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,7 @@ consumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similar to the topic
lookup. The client send a request to the service discovery address and the
response will contain either the actual metadata or a redirect to a certain
broker address.
response will contain actual metadata.

##### Command PartitionedTopicMetadata

Expand All @@ -560,14 +559,3 @@ CommandPartitionedTopicMetadataResponse {
"partitions" : 32
}
```

Example of response with redirection:

```json
CommandPartitionedTopicMetadataResponse {
"request_id" : 1,
"response" : "Redirect",
"brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
"brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651"
}
```
4 changes: 4 additions & 0 deletions docs/ClusterSetup.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ list of active brokers from ZooKeeper.

Add the ZK servers in `conf/discovery.conf`:
```shell
# Zookeeper quorum connection string
zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181

# Global zookeeper quorum connection string
globalZookeeperServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184
```

Start the service:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
Expand All @@ -34,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.naming.AuthenticationException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -52,6 +54,8 @@
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.authentication.AuthenticationDataSource;
import com.yahoo.pulsar.broker.authentication.AuthenticationProvider;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -475,6 +479,7 @@ public void testDiscoveryLookup() throws Exception {

}


/**
* Verify discovery-service binary-proto lookup using tls
*
Expand Down Expand Up @@ -547,5 +552,204 @@ public void testDiscoveryLookupTls() throws Exception {
producer.close();

}


@Test
public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception {

// (1) start discovery service
ServiceConfig config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setBindOnLocalhost(true);
// add Authentication Provider
Set<String> providersClassNames = Sets.newHashSet(MockAuthenticationProvider.class.getName());
config.setAuthenticationProviders(providersClassNames);
// enable authentication and authorization
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
discoveryService.start();

// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();
ClientConfiguration clientConfig = new ClientConfiguration();
// set authentication data
clientConfig.setAuthentication(new Authentication() {
@Override
public void close() throws IOException {
}
@Override
public String getAuthMethodName() {
return "auth";
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
return new AuthenticationDataProvider() {
};
}
@Override
public void configure(Map<String, String> authParams) {
}
@Override
public void start() throws PulsarClientException {
}
});

PulsarClient pulsarClient = PulsarClient.create(discoverySvcUrl, clientConfig);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1",
"my-subscriber-name", new ConsumerConfiguration());
Producer producer = pulsarClient.createProducer("persistent://my-property/use2/my-ns/my-topic1",
new ProducerConfiguration());
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
producer.close();
}

@Test
public void testDiscoveryLookupAuthenticationFailure() throws Exception {

// (1) start discovery service
ServiceConfig config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setBindOnLocalhost(true);
// set Authentication provider which fails authentication
Set<String> providersClassNames = Sets.newHashSet(MockAuthenticationProviderFail.class.getName());
config.setAuthenticationProviders(providersClassNames);
// enable authentication
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
discoveryService.start();
// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();
ClientConfiguration clientConfig = new ClientConfiguration();
// set authentication data
clientConfig.setAuthentication(new Authentication() {
@Override
public void close() throws IOException {
}
@Override
public String getAuthMethodName() {
return "auth";
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
return new AuthenticationDataProvider() {
};
}
@Override
public void configure(Map<String, String> authParams) {
}
@Override
public void start() throws PulsarClientException {
}
});
PulsarClient pulsarClient = PulsarClient.create(discoverySvcUrl, clientConfig);
try {
pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name",
new ConsumerConfiguration());
Assert.fail("should have failed due to authentication");
} catch (PulsarClientException e) {
// Ok: expected
}
}

@Test
public void testDiscoveryLookupAuthorizationFailure() throws Exception {

// (1) start discovery service
ServiceConfig config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setBindOnLocalhost(true);
// set Authentication provider which returns "invalid" appid so, authorization fails
Set<String> providersClassNames = Sets.newHashSet(MockAuthorizationProviderFail.class.getName());
config.setAuthenticationProviders(providersClassNames);
// enable authentication
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
DiscoveryService discoveryService = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory();
discoveryService.start();
// (2) lookup using discovery service
final String discoverySvcUrl = discoveryService.getServiceUrl();
ClientConfiguration clientConfig = new ClientConfiguration();
// set authentication data
clientConfig.setAuthentication(new Authentication() {
@Override
public void close() throws IOException {
}
@Override
public String getAuthMethodName() {
return "auth";
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
return new AuthenticationDataProvider() {
};
}
@Override
public void configure(Map<String, String> authParams) {
}
@Override
public void start() throws PulsarClientException {
}
});
PulsarClient pulsarClient = PulsarClient.create(discoverySvcUrl, clientConfig);
try {
pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name",
new ConsumerConfiguration());
Assert.fail("should have failed due to authentication");
} catch (PulsarClientException e) {
// Ok: expected
Assert.assertTrue(e instanceof PulsarClientException.LookupException);
}
}

/**** helper classes ****/

public static class MockAuthenticationProvider implements AuthenticationProvider {
@Override
public void close() throws IOException {
}
@Override
public void initialize(ServiceConfiguration config) throws IOException {
}
@Override
public String getAuthMethodName() {
return "auth";
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
return "appid1";
}
}

public static class MockAuthenticationProviderFail extends MockAuthenticationProvider {
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
throw new AuthenticationException("authentication failed");
}
}

public static class MockAuthorizationProviderFail extends MockAuthenticationProvider {
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
return "invalid";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,31 +137,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
ByteBuf request = Commands.newPartitionMetadataRequest(destination.toString(), requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
try {
URI uri = null;
// (1) if redirect request for different broker lookup
if (lookupDataResult.redirect) {

if (useTls) {
uri = new URI(lookupDataResult.brokerUrlTls);
} else {
String serviceUrl = lookupDataResult.brokerUrl;
uri = new URI(serviceUrl);
}
InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
//(1.a) retry getPartitionedMetadata with different redirected broker
getPartitionedTopicMetadata(responseBrokerAddress, destination).thenAccept(metadata -> {
partitionFuture.complete(metadata);
}).exceptionally((lookupException) -> {
// lookup failed
log.warn("[{}] lookup failed : {}", destination.toString(), lookupException.getMessage(),
lookupException);
partitionFuture.completeExceptionally(lookupException);
return null;
});
} else {
// (2) received result partitions
partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
}
partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s , partitions with %s",
Expand Down Expand Up @@ -200,12 +176,9 @@ public LookupDataResult(String brokerUrl, String brokerUrlTls, boolean redirect,
this.redirect = redirect;
}

public LookupDataResult(int partitions, String brokerUrl, String brokerUrlTls, boolean redirect) {
public LookupDataResult(int partitions) {
super();
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
this.partitions = partitions;
this.redirect = redirect;
}

}
Expand Down
Loading

0 comments on commit 710755f

Please sign in to comment.