From 27b91560d20d994b8abf2d3a9e91e9bb2d1d0f0d Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 08:37:15 -0700 Subject: [PATCH 01/11] Adding README samples and update README.md --- .../azure-messaging-servicebus/README.md | 333 +++++++++++------- .../messaging/servicebus/ReadmeSamples.java | 137 ++++++- 2 files changed, 324 insertions(+), 146 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 942d82b4e46e..facb7b167576 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -1,39 +1,40 @@ # Azure Service Bus client library for Java Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple -applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data -and state. Data is transferred between different applications and services using messages. If you would like to know -more about Azure Service Bus, you may wish to review: [What is Service Bus](https://docs.microsoft.com/en-us/azure/service-bus-messaging)? +applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and +state. Data is transferred between different applications and services using messages. If you would like to know more +about Azure Service Bus, you may wish to review: [What is Service Bus][product_docs]? The Azure Service Bus client library allows for sending and receiving of Azure Service Bus messages and may be used to: - -- Messaging: Transfer business data, such as sales or purchase orders, journals, or inventory movements. -- Decouple applications: Improve reliability and scalability of applications and services. Client and service don't +- Transfer business data, such as sales or purchase orders, journals, or inventory movements. +- Decouple applications to improve reliability and scalability of applications and services. Clients and services don't have to be online at the same time. -- Topics and subscriptions: Enable 1:n relationships between publishers and subscribers. -- Message sessions. Implement work-flows that require message ordering or message deferral. +- Enable 1:n relationships between publishers and subscribers. +- Implement workflows that require message ordering or message deferral. -[Source code][source_code] | [API reference documentation][api_documentation] | [Samples][sample_readme] +[Source code][source_code] | [API reference documentation][api_documentation] +| [Product documentation][product_docs]| [Samples][sample_examples] ## Table of contents -- [Table of contents](#table-of-contents) -- [Getting started](#getting-started) - - [Prerequisites](#prerequisites) - - [Adding the package to your product](#adding-the-package-to-your-product) - - [Authenticate the client](#authenticate-the-client) -- [Key concepts](#key-concepts) -- [Examples](#examples) - - [Sending messages](#sending-messages) - - [Receiving messages](#receiving-messages) -- [Troubleshooting](#troubleshooting) - - [Enable client logging](#enable-client-logging) - - [Enable AMQP transport logging](#enable-amqp-transport-logging) - - [Common exceptions](#common-exceptions) - - [Handling transient AMQP exceptions](#handling-transient-amqp-exceptions) - - [Default SSL library](#default-ssl-library) -- [Next steps](#next-steps) -- [Contributing](#contributing) +- [Azure Service Bus client library for Java](#azure-service-bus-client-library-for-java) + - [Table of contents](#table-of-contents) + - [Getting started](#getting-started) + - [Prerequisites](#prerequisites) + - [Adding the package to your product](#adding-the-package-to-your-product) + - [Authenticate the client](#authenticate-the-client) + - [Key concepts](#key-concepts) + - [Examples](#examples) + - [Send messages](#send-messages) + - [Receive messages](#receive-messages) + - [Settle messages](#settle-messages) + - [Send and receive from session enabled queues or topics](#send-and-receive-from-session-enabled-queues-or-topics) + - [Troubleshooting](#troubleshooting) + - [Enable client logging](#enable-client-logging) + - [Enable AMQP transport logging](#enable-amqp-transport-logging) + - [Common exceptions](#common-exceptions) + - [Next steps](#next-steps) + - [Contributing](#contributing) ## Getting started @@ -43,6 +44,8 @@ have to be online at the same time. - [Maven][maven] - Microsoft Azure subscription - You can create a free account at: https://azure.microsoft.com +- Azure Service Bus instance + - Step-by-step guide for [creating a Service Bus instance using Azure Portal][service_bus_create] ### Adding the package to your product @@ -58,12 +61,41 @@ have to be online at the same time. ### Authenticate the client -For the Service Bus client library to interact with an Service Bus, it will need to understand how to connect -and authorize with it. +For the Service Bus client library to interact with Service Bus, it will need to understand how to connect and authorize +with it. + +#### Create Service Bus clients using a connection string + +The easiest means for doing so is to use a connection string, which automatically created when creating a Service Bus +namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to +[get a Service Bus connection string][service_bus_connection_string]. + +Both the asynchronous and synchronous Service Bus sender and receiver clients are instantiated using +`ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver, +respectively. + + +```java +ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); +``` + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .buildAsyncClient(); +``` -#### Create an Service Bus client using Microsoft identity platform (formerly Azure Active Directory) +#### Create a Service Bus client using Microsoft identity platform (formerly Azure Active Directory) -Azure SDK for Java supports an Azure Identity package, making it simple get credentials from Microsoft identity +Azure SDK for Java supports an Azure Identity package, making it simple to get credentials from the Microsoft identity platform. First, add the package: [//]: # ({x-version-update-start;com.azure:azure-identity;dependency}) @@ -76,17 +108,14 @@ platform. First, add the package: ``` [//]: # ({x-version-update-end}) -All the implemented ways to request a credential can be found under the `com.azure.identity.credential` package. The -sample below shows how to use an Azure Active Directory (AAD) application client secret to authorize with Azure Service Bus. - -#### Authorizing with AAD application client secret +The implemented ways to request a credential are under the `com.azure.identity.credential` package. The sample below +shows how to use an Azure Active Directory (AAD) application client secret to authorize with Azure Service Bus. -Follow the instructions in [Creating a service principal using Azure Portal][application_client_secret] to create a -service principal and a client secret. The corresponding `clientId` and `tenantId` for the service principal can be -obtained from the [App registration page][app_registration_page]. +##### Authorizing with DefaultAzureCredential -Set the values of the client ID, tenant ID, and client secret of the AAD application as environment variables: -AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET. +Authorization is easiest using [DefaultAzureCredential][wiki_identity]. It finds the best credential to use in its +running environment. For more information about using Azure Active Directory authorization with Service Bus, please +refer to [the associated documentation][aad_authorization]. Use the returned token credential to authenticate the client: @@ -100,92 +129,163 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .buildAsyncClient(); ``` -##### Service Bus Roles - -When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such -as the `Azure Service Bus Data Owner`, `Azure Service Bus Data Sender` and `Azure Service Bus Data Receiver` -[role][servicebus_roles]. -For more information about using Azure Active Directory authorization with Service Bus, please refer to -[the associated documentation][aad_authorization]. - ## Key concepts You can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and -on which actual message transmission takes place, the namespace often serving as an application container: - -* [Queue][queue_concept]: Allows for Sending and Receiving of messages, ordered first-in-first-out. Often used for -point-to-point communication. +on which actual message transmission takes place. The namespace often serves as an application container: -* [Topic][topic_concept]: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can -be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from. - -* [Subscription][subscription_concept]: The mechanism to consume from a Topic. Each subscription is independent, and -receaves a copy of each message sent to the topic. +* A **[queue][queue_concept]** allows for the sending and receiving of messages, ordered first-in-first-out. It is often + used for point-to-point communication. +* A **[topic][topic_concept]** is better suited to publisher and subscriber scenarios. A topic publishes messages to + _subscriptions_, of which, multiple can exist simultaneously. +* A **[subscription][subscription_concept]** receives messages from a topic. Each subscription is independent and + receives a copy of the message sent to the topic. ## Examples -### Create a sender or receiver using connection string - -The easiest means for doing so is to use a connection string, which is created automatically when creating an Service Bus -namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to -[get an Service Bus connection string][service_bus_connection_string]. +### Send messages -Both the asynchronous and synchronous Service Bus sender and receiver clients can be created using -`ServiceBusClientBuilder`.The examples are explained blow. +You'll need to create an asynchronous [`ServiceBusSenderAsyncClient`][ServiceBusSenderAsyncClient] or a synchronous +[`ServiceBusSenderClient`][ServiceBusSenderClient] to send messages. Each sender can send messages to either a queue or +a topic. -The snippet below creates an asynchronous Service Bus sender. +The snippet below creates a synchronous `ServiceBusSender` to publish a message to a queue. - + ```java -String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; -ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() - .connectionString(connectionString) +ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sender() .queueName("<< QUEUE NAME >>") - .buildAsyncClient(); + .buildClient(); +List messages = Arrays.asList( + new ServiceBusMessage("Hello world".getBytes()).setMessageId("1"), + new ServiceBusMessage("Bonjour".getBytes()).setMessageId("2")); + +sender.send(messages); ``` -The snippet below creates an asynchronous Service Bus receiver. +### Receive messages + +You'll need to create an asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] or a synchronous +[`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages. Each receiver can consume messages from +either a queue or a topic subscription. + +#### Receive a batch of messages + +The snippet below creates a `ServiceBusReceiverClient` to receive messages from a topic subscription. It returns a batch +of messages when 10 messages are received or until 30 seconds have elapsed, whichever happens first. - + ```java -String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; -ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .connectionString(connectionString) +ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() .topicName("<< TOPIC NAME >>") .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + +IterableStream messages = receiver.receive(10, Duration.ofSeconds(30)); +messages.forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), + new String(message.getBody(), StandardCharsets.UTF_8)); +}); +``` + +#### Receive a stream of messages + +The asynchronous `ServiceBusReceiverAsyncClient` continuously fetches messages until the `subscription` is disposed. + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); + +Disposable subscription = receiver.receive().subscribe(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s%n", message.getMessageId()); + System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); +}, error -> { + System.err.println("Error occurred while receiving messages: " + error); +}, () -> { + System.out.println("Finished receiving messages."); +}); ``` -### Sending messages +### Settle messages + +When a message is received, it can be settled using any of the `complete()`, `abandon()`, `defer()`, or `deadLetter()` +overloads. The sample below completes a received message from synchronous `ServiceBusReceiverClient`. -You'll need to create an asynchronous [`ServiceBusSenderAsyncClient`][ServiceBusSenderAsyncClient] or -a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to send message. Each sender can send message to either, a queue, -or topic. + +```java +receiver.receive(10).forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); -* [Sending a message asynchronously][sample-send-async-message]. -* [Sending a message asynchronously using active directory credential][sample-send-async-aad-message]. -* [Send message in batch synchronously][sample-send-batch-messages]. + // Process message and then complete it. + receiver.complete(message); +}); +``` -### Receiving messages +### Send and receive from session enabled queues or topics + +> Using sessions requires you to create a session enabled queue or subscription. You can read more about how to +> configure this in "[Message sessions][message-sessions]". + +Unlike non-session-enabled queues or subscriptions, only a single receiver can read from a session at any time. When a +receiver fetches a session, Service Bus locks the session for that receiver, and it has exclusive access to messages in +that session. + +#### Send message to a session + +Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on +a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. + + +```java +ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) + .setSessionId("greetings"); + +sender.send(message); +``` -You'll need to create an asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] or -a synchronous [`ServiceBusReceiverClient`][ServiceBusReceiverClient]. Each receiver can receive message from either, a queue, -or subscriber. +#### Receive messages from a session -* [Receiving a message asynchronously][sample-receive-async-message]. -* [Receiving a message asynchronously using active directory credential][sample-receive-async-aad-message]. -* [Receiving a message asynchronously and settling][sample-receive-message-and-settle]. -* [Receiving messages synchronously][sample-receive-message-synchronously]. +Receivers can fetch messages from a specific session or the first available, unlocked session. The first snippet creates +a receiver for a session with id "greetings". The second snippet creates a receiver that fetches the first available +session. + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .topicName("<< QUEUE NAME >>") + .sessionId("greetings") + .buildAsyncClient(); +``` + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .topicName("<< QUEUE NAME >>") + .buildAsyncClient(); +``` ## Troubleshooting ### Enable client logging -You can set the `AZURE_LOG_LEVEL` environment variable to view logging statements made in the client library. For -example, setting `AZURE_LOG_LEVEL=2` would show all informational, warning, and error log messages. The log levels can -be found here: [log levels][LogLevels]. +Azure SDK for Java offers a consistent logging story to help aid in troubleshooting application errors and expedite +their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help +locate the root issue. View the [logging][logging] wiki for guidance about enabling logging. ### Enable AMQP transport logging @@ -224,33 +324,14 @@ services. When an AMQP exception is thrown, examining the error condition field exception occurred and if possible, how to mitigate this exception. A list of all the AMQP exceptions can be found in [OASIS AMQP Version 1.0 Transport Errors][oasis_amqp_v1_error]. -The [`AmqpErrorContext`][AmqpErrorContext] in the [`AmqpException`][AmqpException] provides information about the AMQP -session, link, or connection that the exception occurred in. This is useful to diagnose which level in the transport -this exception occurred at and whether it was an issue in one of the producers or consumers. - -#### Operation cancelled exception - -It occurs when the underlying AMQP layer encounters an abnormal link abort or the connection is disconnected in an -unexpected fashion. It is recommended to attempt to verify the current state and retry if necessary. - -### Handling transient AMQP exceptions - -If a transient AMQP exception occurs, the client library retries the operation as many times as the -[AmqpRetryOptions][AmqpRetryOptions] allows. Afterwards, the operation fails and an exception is propagated back to the -user. - -### Default SSL library - -All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL -operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides -better performance compared to the default SSL implementation within the JDK. For more information, including how to -reduce the dependency size, refer to the [performance tuning][performance_tuning] section of the wiki. +The recommended way to solve the specific exception the AMQP exception represents is to follow the +[Service Bus Messaging Exceptions][servicebus_messaging_exceptions] guidance. ## Next steps Beyond those discussed, the Azure Service Bus client library offers support for many additional scenarios to help take -advantage of the full feature set of the Azure Service Bus service. In order to help explore some of the these scenarios, -the following set of sample is available [here][samples_readme]. +advantage of the full feature set of the Azure Service Bus service. In order to help explore some of these scenarios, +check out the [samples README][samples_readme]. ## Contributing @@ -261,29 +342,22 @@ Guidelines](./CONTRIBUTING.md) for more information. [aad_authorization]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application [amqp_transport_error]: https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error [AmqpErrorCondition]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java -[AmqpErrorContext]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorContext.java -[AmqpException]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpException.java [api_documentation]: https://aka.ms/java-docs -[app_registration_page]: https://docs.microsoft.com/azure/active-directory/develop/howto-create-service-principal-portal#get-values-for-signing-in -[application_client_secret]: https://docs.microsoft.com/azure/active-directory/develop/howto-create-service-principal-portal#create-a-new-application-secret [java_8_sdk_javadocs]: https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html -[LogLevels]: ../../core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java +[logging]: https://github.com/Azure/azure-sdk-for-java/wiki/Logging-with-Azure-SDK [maven]: https://maven.apache.org/ +[message-sessions]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions [oasis_amqp_v1_error]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-error [oasis_amqp_v1]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html -[performance_tuning]: https://github.com/Azure/azure-sdk-for-java/wiki/Performance-Tuning +[product_docs]: https://docs.microsoft.com/azure/service-bus-messaging [qpid_proton_j_apache]: http://qpid.apache.org/proton/ [queue_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#queues [RetryOptions]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java -[sample_readme]: ./src/samples/README.md -[sample-receive-async-aad-message]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java -[sample-receive-async-message]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java -[sample-receive-message-and-settle]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java -[sample-receive-message-synchronously]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java -[sample-send-async-aad-message]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageWithAzureIdentityAsyncSample.java -[sample-send-async-message]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageAsyncSample.java -[sample-send-batch-messages]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageBatchSyncSample.java +[sample_examples]: ./src/samples/java/com/azure/messaging/servicebus/ +[samples_readme]: ./src/samples/README.md [service_bus_connection_string]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string +[servicebus_create]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal +[servicebus_messsaging_exceptions]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-exceptions [servicebus_roles]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application#built-in-rbac-roles-for-azure-service-bus [ServiceBusReceiverAsyncClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java [ServiceBusReceiverClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -292,5 +366,6 @@ Guidelines](./CONTRIBUTING.md) for more information. [source_code]: ./ [subscription_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions [topic_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#topics +[wiki_identity]: https://github.com/Azure/azure-sdk-for-java/wiki/Identity-and-Authentication ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fservicebus%2Fazure-messaging-servicebus%2FREADME.png) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index bc2ea9444be7..8c0abcb52407 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -4,9 +4,15 @@ package com.azure.messaging.servicebus; import com.azure.core.credential.TokenCredential; -import com.azure.identity.ClientSecretCredential; -import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.core.util.IterableStream; import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; /** * WARNING: MODIFYING THIS FILE WILL REQUIRE CORRESPONDING UPDATES TO README.md FILE. LINE NUMBERS ARE USED TO EXTRACT @@ -20,21 +26,19 @@ public class ReadmeSamples { * Code sample for creating an asynchronous Service Bus sender. */ public void createAsynchronousServiceBusSender() { - String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; - ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() - .connectionString(connectionString) + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sender() .queueName("<< QUEUE NAME >>") - .buildAsyncClient(); + .buildClient(); } /** * Code sample for creating an asynchronous Service Bus receiver. */ public void createAsynchronousServiceBusReceiver() { - String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .connectionString(connectionString) + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() .topicName("<< TOPIC NAME >>") .subscriptionName("<< SUBSCRIPTION NAME >>") @@ -55,19 +59,118 @@ public void createAsynchronousServiceBusReceiverWithAzureIdentity() { } /** - * Code sample for creating an asynchronous Service Bus receiver using Aad. + * Sends messages to a queue. */ - public void createAsynchronousServiceBusReceiverWithAad() { - ClientSecretCredential credential = new ClientSecretCredentialBuilder() - .clientId("<< APPLICATION (CLIENT) ID >>") - .clientSecret("<< APPLICATION SECRET >>") - .tenantId("<< TENANT ID >>") - .build(); + public void sendMessage() { + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); + List messages = Arrays.asList( + new ServiceBusMessage("Hello world".getBytes()).setMessageId("1"), + new ServiceBusMessage("Bonjour".getBytes()).setMessageId("2")); + + sender.send(messages); + } + + /** + * Receives messages from a topic and subscription. + */ + public void receiveMessages() { + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + IterableStream messages = receiver.receive(10, Duration.ofSeconds(30)); + messages.forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), + new String(message.getBody(), StandardCharsets.UTF_8)); + }); + } + + /** + * Receives messages asynchronously. + */ + public void receiveMessagesAsync() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .credential("<>", credential) + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() - .queueName("<>") + .queueName("<< QUEUE NAME >>") + .buildAsyncClient(); + + Disposable subscription = receiver.receive().subscribe(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s%n", message.getMessageId()); + System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); + }, error -> { + System.err.println("Error occurred while receiving messages: " + error); + }, () -> { + System.out.println("Finished receiving messages."); + }); + } + + /** + * Complete a message. + */ + public void completeMessage() { + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + + receiver.receive(10).forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + + // Process message and then complete it. + receiver.complete(message); + }); + } + + /** + * Create a session message. + */ + public void createSessionMessage() { + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); + + ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) + .setSessionId("greetings"); + + sender.send(message); + } + + /** + * Create session receiver for "greetings" + */ + public void namedSessionReceiver() { + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .topicName("<< QUEUE NAME >>") + .sessionId("greetings") + .buildAsyncClient(); + } + + /** + * Create session receiver for "greetings" + */ + public void unnamedSessionReceiver() { + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .topicName("<< QUEUE NAME >>") .buildAsyncClient(); } } From 034728686326f9c0e4ae107831f49b5a4a648f1c Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 08:53:07 -0700 Subject: [PATCH 02/11] Remove unmaintained CONTRIBUTING and link to one in master. --- .../azure-messaging-servicebus/CHANGELOG.md | 11 ++- .../CONTRIBUTING.md | 92 ------------------- .../azure-messaging-servicebus/README.md | 2 +- 3 files changed, 10 insertions(+), 95 deletions(-) delete mode 100644 sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index e1a9ba15e60a..9c2fb0cf29b1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -1,7 +1,14 @@ # Release History -## 7.0.0-beta.2 (Unreleased) - +## 7.0.0-beta.2 (2020-05-07) + +- Add support for receiving messages from specific sessions +- Add support for processing messages from multiple sessions +- Add missing schedule and cancel APIs in ServiceBusSenderClient +- Add support to send a collection of messages at once +- Change return type from `ServiceBusReceivedMessage` to `ServiceBusReceivedMessageContext` when calling `receive()` +- Fix message settlement to occur on receive link +- Various bug fixes ## 7.0.0-beta.1 (2020-04-06) diff --git a/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md b/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md deleted file mode 100644 index 760ae8b962fb..000000000000 --- a/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md +++ /dev/null @@ -1,92 +0,0 @@ -# Contributing - -This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License -Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For -details, visit https://cla.microsoft.com. - -When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate -the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to -do this once across all repositories using our CLA. - -## Code of conduct - -This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). -For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact -[opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. - -## Getting started - -Before working on a contribution, it would be beneficial to familiarize yourself with the process and guidelines used -for the Azure SDKs so that your submission is consistent with the project standards and is ready to be accepted with -fewer changes requested. In particular, it is recommended to review: - -- [Azure SDK README][github-general], to learn more about the overall project and processes used. -- [Azure SDK Design Guidelines][design-guidelines], to understand the general guidelines for the SDKs across all - languages and platforms -- [Azure SDK Design Guidelines for Java][java-spec], to understand the guidelines specific to the Azure SDKs for Java. - -## Development environment setup - -### Prerequisites - -- Java Development Kit (JDK) with version 8 or above -- [Maven][maven] -- Git -- Microsoft Azure subscription - - You can create a free account at: https://azure.microsoft.com - -## Building all the client libraries - -Open a command prompt/terminal: -1. Execute `git clone https://github.com/Azure/azure-sdk-for-java.git` -2. Traverse to the repository root. -3. Execute `mvn compile -f pom.client.xml` -4. Install the tooling and build the product by executing: - * `mvn install -Dinclude-non-shipping-modules -DskipTests -Dgpg.skip -f pom.client.xml` - -## Building only the Azure SDK client library for Service Bus - -After building the tooling and solution once from the section, [Building all the client libraries](#building-all-the-client-libraries), you can build just the Azure SDK client library for Service Bus by -executing: -1. `mvn compile -f servicebus\azure-messaging-servicebus\pom.xml` - -## Running tests - -After following instructions in [Building all the client libraries](#building-all-the-client-libraries), you can run the -unit tests by executing: -1. `mvn test -f servicebus\azure-messaging-servicebus\pom.xml` - -For unit tests, there are no special considerations; these are self-contained and execute locally without any reliance -on external resources. These tests are run for all PR validations. - -### Running integration tests - -Integration tests have dependencies on live Azure resources and require setting up your development environment prior -to running. Known in the Azure SDK project commonly as "Live" tests, these tests only run when the -[`TestMode`][test-mode] is equal to [`TestMode.RECORD`][test-mode-record]. The Live tests read information from the -following environment variables: - -- AZURE_TEST_MODE - - Can be either "RECORD" or "PLAYBACK". If AZURE_TEST_MODE is equal to "RECORD", then the tests will run against the - live service. -- AZURE_SERVICEBUS_CONNECTION_STRING - - The connection string to the Azure Service Bus instance. This is required when running tests in RECORD mode. It must - contain the name of the Service Bus instance (the `EntityPath=` component must be in the connection string). - -## Logging output - -Log messages can be seen in the output window by: -1. Setting `AZURE_LOG_LEVEL` to the desired verbosity. Log levels can be found in [ClientLogger][log-level] -1. Adding an implementation of [slf4j][slf4j] to the classpath. Implementations can be found under section "[Binding with a - logging framework at deployment time][slf4j-implementations]". - - -[design-guidelines]: https://azuresdkspecs.z5.web.core.windows.net/DesignGuidelines.html -[github-general]: https://github.com/Azure/azure-sdk -[java-spec]: https://azuresdkspecs.z5.web.core.windows.net/JavaSpec.html -[log-level]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java#L40 -[maven]: https://maven.apache.org/ -[slf4j]: https://www.slf4j.org/ -[slf4j-implementations]: https://www.slf4j.org/manual.html#swapping -[test-mode]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core-test/src/main/java/com/azure/core/test/TestMode.java -[test-mode-record]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core-test/src/main/java/com/azure/core/test/TestMode.java#L12 diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index facb7b167576..64632f679ed8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -336,7 +336,7 @@ check out the [samples README][samples_readme]. ## Contributing If you would like to become an active contributor to this project please refer to our [Contribution -Guidelines](./CONTRIBUTING.md) for more information. +Guidelines](./../../../CONTRIBUTING.md) for more information. [aad_authorization]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application From 5e566e0434aa2548931959d10859e7999a8869be Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 10:06:22 -0700 Subject: [PATCH 03/11] Fixing samples and fixing them. --- .../src/samples/README.md | 18 +++-- .../servicebus/ReceiveMessageSyncSample.java | 2 + ...> ReceiveMultipleSessionsAsyncSample.java} | 21 +++--- .../ReceiveNamedSessionAsyncSample.java | 68 ++++++++++++++++++ .../ReceiveSingleSessionAsyncSample.java | 69 +++++++++++++++++++ 5 files changed, 163 insertions(+), 15 deletions(-) rename sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/{MultiSessionReceiveAsyncSample.java => ReceiveMultipleSessionsAsyncSample.java} (82%) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md b/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md index e272b68c4a76..5b3df0220482 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md @@ -18,7 +18,7 @@ Key concepts are explained in detail [here][sdk_readme_key_concepts]. ## Getting started Please refer to the [Getting Started][sdk_readme_getting_started] section. -### Obtaining an Event Hub instance connection string +### Obtaining a Service Bus namespace connection string Most of the samples authorize with Service Bus using a connection string generated for that Service Bus namespace. The connection string value can be obtained by: @@ -34,12 +34,18 @@ connection string value can be obtained by: - [Send messages using Azure Identity][SendMessageWithAzureIdentityAsyncSample] - [Send message batches synchronously][SendMessageBatchSyncSample] - [Schedule and cancel a message][MessageCancelScheduleAsyncSample] +- [Receive messages synchronously][ReceiveMessageSyncSample] - [Receive and auto-complete messages][ReceiveMessageAsyncSample] - [Receive messages using Azure Identity][ReceiveMessageAzureIdentityAsyncSample] - [Receive and settle messages via complete or abandon][ReceiveMessageAndSettleAsyncSample] -- [Receive messages synchronously][ReceiveMessageSyncSample] - [Peek at a message][PeekMessageAsyncSample] +### Message sessions +- [Send and receive messages from a session][SendAndReceiveSessionMessageSample] +- [Receive messages from multiple available sessions][ReceiveMultipleSessionsAsyncSample] +- [Receive messages from a specific session][ReceiveNamedSessionAsyncSample] +- [Receive messages from the first available session][ReceiveSingleSessionAsyncSample] + ## Troubleshooting See [Troubleshooting][sdk_readme_troubleshooting]. @@ -49,7 +55,7 @@ See [Next steps][sdk_readme_next_steps]. ## Contributing If you would like to become an active contributor to this project please refer to our [Contribution -Guidelines](../../CONTRIBUTING.md) for more information. +Guidelines](../../../../../CONTRIBUTING.md) for more information. [sdk_readme_key_concepts]: ../../README.md#key-concepts @@ -57,14 +63,18 @@ Guidelines](../../CONTRIBUTING.md) for more information. [sdk_readme_troubleshooting]: ../../README.md#troubleshooting [sdk_readme_next_steps]: ../../README.md#next-steps -[MessageCancelScheduleAsyncSample]: ./java/com/azure/messaging/servicebus/SendScheduledMessageAndCancelAsyncSample.java [PeekMessageAsyncSample]: ./java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java [ReceiveMessageAndSettleAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java [ReceiveMessageAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java [ReceiveMessageAzureIdentityAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java [ReceiveMessageSyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java +[ReceiveMultipleSessionsAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java +[ReceiveNamedSessionAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +[ReceiveSingleSessionAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +[SendAndReceiveSessionMessageSample]: ./java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java [SendMessageAsyncSample]: ./java/com/azure/messaging/servicebus/SendMessageAsyncSample.java [SendMessageBatchSyncSample]: ./java/com/azure/messaging/servicebus/SendMessageBatchSyncSample.java [SendMessageWithAzureIdentityAsyncSample]: ./java/com/azure/messaging/servicebus/SendMessageWithAzureIdentityAsyncSample.java +[SendScheduledMessageAndCancelAsyncSample]: ./java/com/azure/messaging/servicebus/SendScheduledMessageAndCancelAsyncSample.java ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fappconfiguration%2Fazure-messaging-servicebus%2Fsrc%2Fsamples%2FREADME.png) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java index c120a5a08f27..a23782c0fafc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java @@ -43,6 +43,8 @@ public static void main(String[] args) { System.out.println("Received Message Id: " + message.getMessageId()); System.out.println("Received Message: " + new String(message.getBody())); + + receiverClient.complete(message); }); // Close the receiver. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java similarity index 82% rename from sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java rename to sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java index a868bcee7b9e..184805b6311e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java @@ -9,18 +9,18 @@ import reactor.core.publisher.Mono; import java.security.SecureRandom; -import java.time.Duration; import java.util.concurrent.TimeUnit; /** - * Sample demonstrates how to receive and process multiple samples. + * Sample demonstrates how to receive and process multiple sessions. In the sample, at most 3 sessions are processed + * concurrently. When there are no more messages in a session, the receiver finds the next available session to + * process. */ -public class MultiSessionReceiveAsyncSample { +public class ReceiveMultipleSessionsAsyncSample { private static final SecureRandom RANDOM = new SecureRandom(); /** - * Main method to invoke this demo on how to receive from multiple sessions {@link ServiceBusReceivedMessage} from - * an Azure Service Bus Queue. + * Main method to invoke this demo on how to receive messages from multiple sessions in an Azure Service Bus Queue. * * @param args Unused arguments to the program. * @@ -36,8 +36,8 @@ public static void main(String[] args) throws InterruptedException { + "SharedAccessKey={key}"; // Create a receiver. - // "<>" will be the name of the Service Bus queue instance you created inside the Service Bus - // namespace. + // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the + // Service Bus namespace. ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() @@ -46,12 +46,11 @@ public static void main(String[] args) throws InterruptedException { .queueName("<>") .buildAsyncClient(); - // At most, the receiver will automatically renew the session lock until 120 seconds have elapsed. // By default, after messages are processed, they are completed (ie. removed from the queue/topic). Setting - // enableAutoComplete to true will tell the processor to complete or abandon the message depending + // enableAutoComplete to true will tell the processor to complete or abandon the message depending on whether or + // not processing the message results in an exception. ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false) - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(120)); + .setIsAutoCompleteEnabled(false); Disposable subscription = receiver.receive(options) .flatMap(context -> { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java new file mode 100644 index 000000000000..71cee20693b0 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates how to receive messages from a named session. + */ +public class ReceiveNamedSessionAsyncSample { + /** + * Main method to invoke this demo on how to receive messages from a session with id "greetings" in an Azure Service + * Bus Queue. + * + * @param args Unused arguments to the program. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. + */ + public static void main(String[] args) throws InterruptedException { + + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the + // Service Bus namespace. + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sessionReceiver() + .sessionId("greetings") + .receiveMode(ReceiveMode.PEEK_LOCK) + .queueName("<>") + .buildAsyncClient(); + + Disposable subscription = receiver.receive() + .subscribe(context -> { + if (context.hasError()) { + System.out.printf("An error occurred in session %s. Error: %s%n", + context.getSessionId(), context.getThrowable()); + return; + } + + System.out.println("Processing message from session: " + context.getSessionId()); + + // Process message + // The message is automatically completed if no exceptions are thrown while processing message. + }, error -> { + System.err.println("Error occurred: " + error); + }); + + // Subscribe is not a blocking call so we sleep here so the program does not end. + TimeUnit.SECONDS.sleep(60); + + // Disposing of the subscription will cancel the receive() operation. + subscription.dispose(); + + // Close the receiver. + receiver.close(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java new file mode 100644 index 000000000000..8b0efccd3cdc --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates how to receive from the first available session. + */ +public class ReceiveSingleSessionAsyncSample { + /** + * Main method to invoke this demo on how to receive messages from the first available session in a Service Bus + * topic subscription. + * + * @param args Unused arguments to the program. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. + */ + public static void main(String[] args) throws InterruptedException { + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will be the name of the Service Bus topic you created inside the Service Bus namespace. + // "<>" will be the name of the session-enabled subscription. + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sessionReceiver() + .receiveMode(ReceiveMode.PEEK_LOCK) + .topicName("<>") + .subscriptionName("<>") + .buildAsyncClient(); + + // Messages are continuously received until there are no more messages in the current session. Then, the Flux + // completes. + Disposable subscription = receiver.receive() + .subscribe(context -> { + if (context.hasError()) { + System.out.printf("An error occurred in session %s. Error: %s%n", + context.getSessionId(), context.getThrowable()); + return; + } + + System.out.println("Processing message from session: " + context.getSessionId()); + + // Process message + // The message is automatically completed if no exceptions are thrown while processing message. + }, error -> { + System.err.println("Error occurred: " + error); + }); + + // Subscribe is not a blocking call so we sleep here so the program does not end. + TimeUnit.SECONDS.sleep(60); + + // Disposing of the subscription will cancel the receive() operation. + subscription.dispose(); + + // Close the receiver. + receiver.close(); + } +} From 6738033234ba6007a4438d1feda80d51f3952a77 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 10:15:00 -0700 Subject: [PATCH 04/11] Fixing checkstyles --- sdk/servicebus/azure-messaging-servicebus/README.md | 8 ++++---- .../com/azure/messaging/servicebus/ReadmeSamples.java | 8 ++++---- .../servicebus/ReceiveNamedSessionAsyncSample.java | 4 ++-- .../servicebus/ReceiveSingleSessionAsyncSample.java | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 64632f679ed8..a71bf0081dba 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -211,10 +211,10 @@ Disposable subscription = receiver.receive().subscribe(context -> { System.out.printf("Id: %s%n", message.getMessageId()); System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); }, error -> { - System.err.println("Error occurred while receiving messages: " + error); -}, () -> { - System.out.println("Finished receiving messages."); -}); + System.err.println("Error occurred while receiving messages: " + error); + }, () -> { + System.out.println("Finished receiving messages."); + }); ``` ### Settle messages diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index 8c0abcb52407..2b43a7837b98 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -109,10 +109,10 @@ public void receiveMessagesAsync() { System.out.printf("Id: %s%n", message.getMessageId()); System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); }, error -> { - System.err.println("Error occurred while receiving messages: " + error); - }, () -> { - System.out.println("Finished receiving messages."); - }); + System.err.println("Error occurred while receiving messages: " + error); + }, () -> { + System.out.println("Finished receiving messages."); + }); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java index 71cee20693b0..6b33ecbd6f95 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -53,8 +53,8 @@ public static void main(String[] args) throws InterruptedException { // Process message // The message is automatically completed if no exceptions are thrown while processing message. }, error -> { - System.err.println("Error occurred: " + error); - }); + System.err.println("Error occurred: " + error); + }); // Subscribe is not a blocking call so we sleep here so the program does not end. TimeUnit.SECONDS.sleep(60); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java index 8b0efccd3cdc..6b1ec92583d6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -54,8 +54,8 @@ public static void main(String[] args) throws InterruptedException { // Process message // The message is automatically completed if no exceptions are thrown while processing message. }, error -> { - System.err.println("Error occurred: " + error); - }); + System.err.println("Error occurred: " + error); + }); // Subscribe is not a blocking call so we sleep here so the program does not end. TimeUnit.SECONDS.sleep(60); From f134e2217ba0d1f4792a42ebfb2bc8379e44fea3 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 10:32:01 -0700 Subject: [PATCH 05/11] Update header --- sdk/servicebus/azure-messaging-servicebus/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index a71bf0081dba..5067a5ef66ad 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -21,7 +21,7 @@ have to be online at the same time. - [Table of contents](#table-of-contents) - [Getting started](#getting-started) - [Prerequisites](#prerequisites) - - [Adding the package to your product](#adding-the-package-to-your-product) + - [Include the package](#include-the-package) - [Authenticate the client](#authenticate-the-client) - [Key concepts](#key-concepts) - [Examples](#examples) @@ -47,14 +47,14 @@ have to be online at the same time. - Azure Service Bus instance - Step-by-step guide for [creating a Service Bus instance using Azure Portal][service_bus_create] -### Adding the package to your product +### Include the package [//]: # ({x-version-update-start;com.azure:azure-messaging-servicebus;current}) ```xml com.azure azure-messaging-servicebus - 7.0.0-beta.1 + 7.0.0-beta.2 ``` [//]: # ({x-version-update-end}) From 5bff8cd661890d9ccf6128a9e9e006f22c8b2a91 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 10:54:18 -0700 Subject: [PATCH 06/11] README fixes. --- .../azure-messaging-servicebus/README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 5067a5ef66ad..742aa67c74e6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -93,7 +93,7 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .buildAsyncClient(); ``` -#### Create a Service Bus client using Microsoft identity platform (formerly Azure Active Directory) +#### Create a Service Bus client using Microsoft Identity platform (formerly Azure Active Directory) Azure SDK for Java supports an Azure Identity package, making it simple to get credentials from the Microsoft identity platform. First, add the package: @@ -118,6 +118,7 @@ running environment. For more information about using Azure Active Directory aut refer to [the associated documentation][aad_authorization]. Use the returned token credential to authenticate the client: + ```java TokenCredential credential = new DefaultAzureCredentialBuilder() @@ -222,7 +223,7 @@ Disposable subscription = receiver.receive().subscribe(context -> { When a message is received, it can be settled using any of the `complete()`, `abandon()`, `defer()`, or `deadLetter()` overloads. The sample below completes a received message from synchronous `ServiceBusReceiverClient`. - + ```java receiver.receive(10).forEach(context -> { ServiceBusReceivedMessage message = context.getMessage(); @@ -246,7 +247,7 @@ that session. Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. - + ```java ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) .setSessionId("greetings"); @@ -260,7 +261,7 @@ Receivers can fetch messages from a specific session or the first available, unl a receiver for a session with id "greetings". The second snippet creates a receiver that fetches the first available session. - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -270,7 +271,7 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .buildAsyncClient(); ``` - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -346,7 +347,7 @@ Guidelines](./../../../CONTRIBUTING.md) for more information. [java_8_sdk_javadocs]: https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html [logging]: https://github.com/Azure/azure-sdk-for-java/wiki/Logging-with-Azure-SDK [maven]: https://maven.apache.org/ -[message-sessions]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions +[message-sessions]: https://docs.microsoft.com/azure/service-bus-messaging/message-sessions [oasis_amqp_v1_error]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-error [oasis_amqp_v1]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html [product_docs]: https://docs.microsoft.com/azure/service-bus-messaging @@ -356,7 +357,7 @@ Guidelines](./../../../CONTRIBUTING.md) for more information. [sample_examples]: ./src/samples/java/com/azure/messaging/servicebus/ [samples_readme]: ./src/samples/README.md [service_bus_connection_string]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string -[servicebus_create]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal +[servicebus_create]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal [servicebus_messsaging_exceptions]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-exceptions [servicebus_roles]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application#built-in-rbac-roles-for-azure-service-bus [ServiceBusReceiverAsyncClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java From 4202045cba41529e9c64d9adb86e6d258d2055ea Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 12:01:19 -0700 Subject: [PATCH 07/11] Fix readmes --- .../azure-messaging-servicebus/README.md | 24 +++++++++---------- .../messaging/servicebus/ReadmeSamples.java | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 742aa67c74e6..c91116b2e36e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -74,7 +74,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar `ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver, respectively. - + ```java ServiceBusSenderClient sender = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -83,7 +83,7 @@ ServiceBusSenderClient sender = new ServiceBusClientBuilder() .buildClient(); ``` - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -119,7 +119,7 @@ refer to [the associated documentation][aad_authorization]. Use the returned token credential to authenticate the client: - + ```java TokenCredential credential = new DefaultAzureCredentialBuilder() .build(); @@ -152,7 +152,7 @@ a topic. The snippet below creates a synchronous `ServiceBusSender` to publish a message to a queue. - + ```java ServiceBusSenderClient sender = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -177,7 +177,7 @@ either a queue or a topic subscription. The snippet below creates a `ServiceBusReceiverClient` to receive messages from a topic subscription. It returns a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever happens first. - + ```java ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -199,7 +199,7 @@ messages.forEach(context -> { The asynchronous `ServiceBusReceiverAsyncClient` continuously fetches messages until the `subscription` is disposed. - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -223,7 +223,7 @@ Disposable subscription = receiver.receive().subscribe(context -> { When a message is received, it can be settled using any of the `complete()`, `abandon()`, `defer()`, or `deadLetter()` overloads. The sample below completes a received message from synchronous `ServiceBusReceiverClient`. - + ```java receiver.receive(10).forEach(context -> { ServiceBusReceivedMessage message = context.getMessage(); @@ -247,7 +247,7 @@ that session. Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. - + ```java ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) .setSessionId("greetings"); @@ -261,22 +261,22 @@ Receivers can fetch messages from a specific session or the first available, unl a receiver for a session with id "greetings". The second snippet creates a receiver that fetches the first available session. - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() - .topicName("<< QUEUE NAME >>") + .queueName("<< QUEUE NAME >>") .sessionId("greetings") .buildAsyncClient(); ``` - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() - .topicName("<< QUEUE NAME >>") + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); ``` diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index 2b43a7837b98..c93dc38cceb4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -158,7 +158,7 @@ public void namedSessionReceiver() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() - .topicName("<< QUEUE NAME >>") + .queueName("<< QUEUE NAME >>") .sessionId("greetings") .buildAsyncClient(); } @@ -170,7 +170,7 @@ public void unnamedSessionReceiver() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() - .topicName("<< QUEUE NAME >>") + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); } } From a8020c49fbc2ea5da16544f87df42c2894af3ca2 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 12:31:31 -0700 Subject: [PATCH 08/11] Fix tests. --- .../ServiceBusMessageProcessorTest.java | 6 +++--- .../ServiceBusReceiveLinkProcessorTest.java | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java index d8691cb6ebc3..e458b17cc77a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java @@ -127,7 +127,7 @@ void autoCompletesAndAutoRenews() { logger.info("Now: {}", Instant.now()); try { - TimeUnit.SECONDS.sleep(8); + TimeUnit.SECONDS.sleep(15); } catch (InterruptedException ignored) { } logger.info("After: {}", Instant.now()); @@ -211,7 +211,7 @@ void autoRenewExpires() { @Test void autoRenewOperationErrors() { // Arrange - final Duration maxRenewDuration = Duration.ofSeconds(10); + final Duration maxRenewDuration = Duration.ofSeconds(20); final String lock1 = UUID.randomUUID().toString(); final String lock2 = UUID.randomUUID().toString(); when(message1.getLockToken()).thenReturn(lock1); @@ -234,7 +234,7 @@ void autoRenewOperationErrors() { logger.info("Now: {}", Instant.now()); try { - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(10); } catch (InterruptedException ignored) { } logger.info("After: {}", Instant.now()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 5ea87289d080..52aa09adf4a8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -23,6 +23,7 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.test.StepVerifier; @@ -65,8 +66,8 @@ class ServiceBusReceiveLinkProcessorTest { private ArgumentCaptor> creditSupplierCaptor; private final AmqpErrorContext amqpErrorContext = new AmqpErrorContext("test-context"); - private final DirectProcessor endpointProcessor = DirectProcessor.create(); - private final DirectProcessor messageProcessor = DirectProcessor.create(); + private final EmitterProcessor endpointProcessor = EmitterProcessor.create(); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(); private final FluxSink messageProcessorSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); private ServiceBusReceiveLinkProcessor linkProcessor; @@ -288,7 +289,9 @@ void newLinkOnRetryableError() { final ServiceBusReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor); final FluxSink endpointSink = endpointProcessor.sink(); - when(link2.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); + when(link2.getEndpointStates()).thenReturn(Flux.defer(() -> Flux.create(e -> { + e.next(AmqpEndpointState.ACTIVE); + }))); when(link2.receive()).thenReturn(Flux.just(message2)); final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-error", @@ -581,8 +584,8 @@ void receivesFromFirstLink() { } /** - * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only - * that number is consumed. + * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only that + * number is consumed. */ @Test void backpressureRequestOnlyEmitsThatAmount() { From 8b81d9665a9c252d2bfb16ef23c68d4d1a659676 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 13:00:11 -0700 Subject: [PATCH 09/11] Add informational logging. --- sdk/servicebus/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/tests.yml b/sdk/servicebus/tests.yml index a2032a72c0d9..0f9627249c51 100644 --- a/sdk/servicebus/tests.yml +++ b/sdk/servicebus/tests.yml @@ -10,4 +10,4 @@ jobs: safeName: azuremessagingservicebus EnvVars: AZURE_TEST_MODE: RECORD - AZURE_LOG_LEVEL: 3 + AZURE_LOG_LEVEL: 2 From e33e132541a83ffc826e6d6d40ba91a506911d69 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 13:12:00 -0700 Subject: [PATCH 10/11] Fixing README.md --- sdk/servicebus/azure-messaging-servicebus/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index c91116b2e36e..fcc0ffd2d395 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -74,7 +74,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar `ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver, respectively. - + ```java ServiceBusSenderClient sender = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") From 7686853a9b38e6c3093008e03cf7068ed582a46f Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 6 May 2020 20:50:17 -0700 Subject: [PATCH 11/11] README feedback --- sdk/servicebus/azure-messaging-servicebus/README.md | 12 ++++++------ .../azure/messaging/servicebus/ReadmeSamples.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index fcc0ffd2d395..9c9b489e3172 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -3,7 +3,7 @@ Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and state. Data is transferred between different applications and services using messages. If you would like to know more -about Azure Service Bus, you may wish to review: [What is Service Bus][product_docs]? +about Azure Service Bus, you may wish to review: [What is Service Bus][product_docs] The Azure Service Bus client library allows for sending and receiving of Azure Service Bus messages and may be used to: - Transfer business data, such as sales or purchase orders, journals, or inventory movements. @@ -66,7 +66,7 @@ with it. #### Create Service Bus clients using a connection string -The easiest means for doing so is to use a connection string, which automatically created when creating a Service Bus +The easiest means for authenticating is to use a connection string, which automatically created when creating a Service Bus namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to [get a Service Bus connection string][service_bus_connection_string]. @@ -95,7 +95,7 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() #### Create a Service Bus client using Microsoft Identity platform (formerly Azure Active Directory) -Azure SDK for Java supports an Azure Identity package, making it simple to get credentials from the Microsoft identity +Azure SDK for Java supports the Azure Identity package, making it simple to get credentials from the Microsoft identity platform. First, add the package: [//]: # ({x-version-update-start;com.azure:azure-identity;dependency}) @@ -138,7 +138,7 @@ on which actual message transmission takes place. The namespace often serves as * A **[queue][queue_concept]** allows for the sending and receiving of messages, ordered first-in-first-out. It is often used for point-to-point communication. * A **[topic][topic_concept]** is better suited to publisher and subscriber scenarios. A topic publishes messages to - _subscriptions_, of which, multiple can exist simultaneously. + subscriptions, of which, multiple can exist simultaneously. * A **[subscription][subscription_concept]** receives messages from a topic. Each subscription is independent and receives a copy of the message sent to the topic. @@ -326,7 +326,7 @@ exception occurred and if possible, how to mitigate this exception. A list of al [OASIS AMQP Version 1.0 Transport Errors][oasis_amqp_v1_error]. The recommended way to solve the specific exception the AMQP exception represents is to follow the -[Service Bus Messaging Exceptions][servicebus_messaging_exceptions] guidance. +[Service Bus Messaging Exceptions][] guidance. ## Next steps @@ -358,7 +358,7 @@ Guidelines](./../../../CONTRIBUTING.md) for more information. [samples_readme]: ./src/samples/README.md [service_bus_connection_string]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string [servicebus_create]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal -[servicebus_messsaging_exceptions]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-exceptions +[servicebus_messaging_exceptions]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-exceptions [servicebus_roles]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application#built-in-rbac-roles-for-azure-service-bus [ServiceBusReceiverAsyncClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java [ServiceBusReceiverClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index c93dc38cceb4..70c1cc118aa0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -164,7 +164,7 @@ public void namedSessionReceiver() { } /** - * Create session receiver for "greetings" + * Create session receiver for the first available session. */ public void unnamedSessionReceiver() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()