Skip to content

Commit

Permalink
Implement a Service Bus Shared Access Key Credential (#21227)
Browse files Browse the repository at this point in the history
* Fix issue#16465 Implement a Service Bus Shared Access Key Credential 202105061716 by LiHong
  • Loading branch information
v-hongli1 authored Jun 14, 2021
1 parent 548badd commit 7177fbe
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
Expand Down Expand Up @@ -235,6 +237,55 @@ public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenC
return this;
}

/**
* Sets the credential for the Service Bus resource.
*
* @param fullyQualifiedNamespace for the Service Bus.
* @param credential {@link AzureNamedKeyCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
Objects.requireNonNull(credential, "'credential' cannot be null.");

this.credentials = new ServiceBusSharedKeyCredential(credential.getAzureNamedKey().getName(),
credential.getAzureNamedKey().getKey(), ServiceBusConstants.TOKEN_VALIDITY);

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}

return this;
}

/**
* Sets the credential for the Service Bus resource.
*
* @param fullyQualifiedNamespace for the Service Bus.
* @param credential {@link AzureSasCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
Objects.requireNonNull(credential, "'credential' cannot be null.");

this.credentials = new ServiceBusSharedKeyCredential(credential.getSignature());

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}

return this;
}

/**
* Sets the proxy configuration to use for {@link ServiceBusSenderAsyncClient}. When a proxy is configured, {@link
* AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.test.TestBase;
import com.azure.core.test.TestMode;
import com.azure.core.util.AsyncCloseable;
Expand All @@ -19,6 +20,7 @@
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -114,8 +116,20 @@ public TestMode getTestMode() {
return CoreUtils.isNullOrEmpty(getConnectionString()) ? TestMode.PLAYBACK : TestMode.RECORD;
}

public String getConnectionString() {
return TestUtils.getConnectionString();
public static String getConnectionString() {
return TestUtils.getConnectionString(false);
}

public static String getConnectionString(boolean withSas) {
return TestUtils.getConnectionString(withSas);
}

protected static ConnectionStringProperties getConnectionStringProperties() {
return new ConnectionStringProperties(getConnectionString(false));
}

protected static ConnectionStringProperties getConnectionStringProperties(boolean withSas) {
return new ConnectionStringProperties(getConnectionString(withSas));
}

public String getFullyQualifiedDomainName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSenderClientBuilder;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
Expand All @@ -16,6 +20,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.test.StepVerifier;

import java.net.InetSocketAddress;
import java.net.Proxy;
Expand All @@ -28,7 +33,9 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ServiceBusClientBuilderTest {
import static java.nio.charset.StandardCharsets.UTF_8;

class ServiceBusClientBuilderTest extends IntegrationTestBase {
private static final String NAMESPACE_NAME = "dummyNamespaceName";
private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/";
private static final String ENDPOINT_FORMAT = "sb://%s.%s";
Expand All @@ -51,6 +58,12 @@ class ServiceBusClientBuilderTest {
ENDPOINT, SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, QUEUE_NAME);
private static final Proxy PROXY_ADDRESS = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, Integer.parseInt(PROXY_PORT)));

private static final String TEST_MESSAGE = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \nUt sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id elit scelerisque, in elementum nulla pharetra. \nAenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices arcu mattis. Aliquam erat volutpat. \nAenean fringilla quam elit, id mattis purus vestibulum nec. Praesent porta eros in dapibus molestie. Vestibulum orci libero, tincidunt et turpis eget, condimentum lobortis enim. Fusce suscipit ante et mauris consequat cursus nec laoreet lorem. Maecenas in sollicitudin diam, non tincidunt purus. Nunc mauris purus, laoreet eget interdum vitae, placerat a sapien. In mi risus, blandit eu facilisis nec, molestie suscipit leo. Pellentesque molestie urna vitae dui faucibus bibendum. \nDonec quis ipsum ultricies, imperdiet ex vel, scelerisque eros. Ut at urna arcu. Vestibulum rutrum odio dolor, vitae cursus nunc pulvinar vel. Donec accumsan sapien in malesuada tempor. Maecenas in condimentum eros. Sed vestibulum facilisis massa a iaculis. Etiam et nibh felis. Donec maximus, sem quis vestibulum gravida, turpis risus congue dolor, pharetra tincidunt lectus nisi at velit.";

ServiceBusClientBuilderTest() {
super(new ClientLogger(ServiceBusClientBuilderTest.class));
}

@Test
void deadLetterqueueClient() {
// Arrange
Expand Down Expand Up @@ -243,6 +256,103 @@ public void testConnectionStringWithSas() {
.connectionString("Endpoint=sb://sb-name.servicebus.windows.net/;EntityPath=sb-name"));
}

@Test
public void testBatchSendEventByAzureNameKeyCredential() {
ConnectionStringProperties properties = getConnectionStringProperties();
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessKeyName = properties.getSharedAccessKeyName();
String sharedAccessKey = properties.getSharedAccessKey();
String queueName = getQueueName(0);

final ServiceBusMessage testData = new ServiceBusMessage(TEST_MESSAGE.getBytes(UTF_8));

ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))
.sender()
.queueName(queueName)
.buildAsyncClient();
try {
StepVerifier.create(
senderAsyncClient.createMessageBatch().flatMap(batch -> {
assertTrue(batch.tryAddMessage(testData));
return senderAsyncClient.sendMessages(batch);
})
).verifyComplete();
} finally {
senderAsyncClient.close();
}
}


@Test
public void testBatchSendEventByAzureSasCredential() {
ConnectionStringProperties properties = getConnectionStringProperties(true);
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessSignature = properties.getSharedAccessSignature();
String queueName = getQueueName(0);

final ServiceBusMessage testData = new ServiceBusMessage(TEST_MESSAGE.getBytes(UTF_8));

ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace,
new AzureSasCredential(sharedAccessSignature))
.sender()
.queueName(queueName)
.buildAsyncClient();
try {
StepVerifier.create(
senderAsyncClient.createMessageBatch().flatMap(batch -> {
assertTrue(batch.tryAddMessage(testData));
return senderAsyncClient.sendMessages(batch);
})
).verifyComplete();
} finally {
senderAsyncClient.close();
}
}

@Test
public void testConnectionWithAzureNameKeyCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessKeyName = "SharedAccessKeyName test-value";
String sharedAccessKey = "SharedAccessKey test-value";

assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder()
.credential(null,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder()
.credential("",
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, (AzureNamedKeyCredential) null));

}

@Test
public void testConnectionWithAzureSasCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessSignature = "SharedAccessSignature test-value";

assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder()
.credential(null, new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder()
.credential("", new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new AzureSasCredential(sharedAccessSignature)));

assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, (AzureSasCredential) null));

}

private static Stream<Arguments> getProxyConfigurations() {
return Stream.of(
Arguments.of("http://localhost:8080", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,27 @@
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.*;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.junit.jupiter.api.AfterEach;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Captor;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import org.mockito.Mockito;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ public class TestUtils {
*
* @return The namespace connection string.
*/
public static String getConnectionString() {
public static String getConnectionString(boolean withSas) {
if (withSas) {
return System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING_WITH_SAS");
}
return System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ void getSubscriptionRuntimePropertiesUnauthorizedClient(HttpClient httpClient) {
// Arrange
final String connectionString = interceptorManager.isPlaybackMode()
? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey"
: TestUtils.getConnectionString();
: TestUtils.getConnectionString(false);

final String connectionStringUpdated = connectionString.replace("SharedAccessKey=",
"SharedAccessKey=fake-key-");
Expand Down Expand Up @@ -990,7 +990,7 @@ void updateRuleResponse(HttpClient httpClient) {
private ServiceBusAdministrationAsyncClient createClient(HttpClient httpClient) {
final String connectionString = interceptorManager.isPlaybackMode()
? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey"
: TestUtils.getConnectionString();
: TestUtils.getConnectionString(false);

final ServiceBusAdministrationClientBuilder builder = new ServiceBusAdministrationClientBuilder()
.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void listQueuesImplementation(HttpClient httpClient) {
private ServiceBusManagementClientImpl createClient(HttpClient httpClient) {
final String connectionString = interceptorManager.isPlaybackMode()
? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey"
: TestUtils.getConnectionString();
: TestUtils.getConnectionString(false);
final ConnectionStringProperties properties = new ConnectionStringProperties(connectionString);
final ServiceBusSharedKeyCredential credential = new ServiceBusSharedKeyCredential(
properties.getSharedAccessKeyName(), properties.getSharedAccessKey());
Expand Down

0 comments on commit 7177fbe

Please sign in to comment.