diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index e1a9ba15e60a..9c2fb0cf29b1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -1,7 +1,14 @@ # Release History -## 7.0.0-beta.2 (Unreleased) - +## 7.0.0-beta.2 (2020-05-07) + +- Add support for receiving messages from specific sessions +- Add support for processing messages from multiple sessions +- Add missing schedule and cancel APIs in ServiceBusSenderClient +- Add support to send a collection of messages at once +- Change return type from `ServiceBusReceivedMessage` to `ServiceBusReceivedMessageContext` when calling `receive()` +- Fix message settlement to occur on receive link +- Various bug fixes ## 7.0.0-beta.1 (2020-04-06) diff --git a/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md b/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md deleted file mode 100644 index 760ae8b962fb..000000000000 --- a/sdk/servicebus/azure-messaging-servicebus/CONTRIBUTING.md +++ /dev/null @@ -1,92 +0,0 @@ -# Contributing - -This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License -Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For -details, visit https://cla.microsoft.com. - -When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate -the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to -do this once across all repositories using our CLA. - -## Code of conduct - -This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). -For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact -[opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. - -## Getting started - -Before working on a contribution, it would be beneficial to familiarize yourself with the process and guidelines used -for the Azure SDKs so that your submission is consistent with the project standards and is ready to be accepted with -fewer changes requested. In particular, it is recommended to review: - -- [Azure SDK README][github-general], to learn more about the overall project and processes used. -- [Azure SDK Design Guidelines][design-guidelines], to understand the general guidelines for the SDKs across all - languages and platforms -- [Azure SDK Design Guidelines for Java][java-spec], to understand the guidelines specific to the Azure SDKs for Java. - -## Development environment setup - -### Prerequisites - -- Java Development Kit (JDK) with version 8 or above -- [Maven][maven] -- Git -- Microsoft Azure subscription - - You can create a free account at: https://azure.microsoft.com - -## Building all the client libraries - -Open a command prompt/terminal: -1. Execute `git clone https://github.com/Azure/azure-sdk-for-java.git` -2. Traverse to the repository root. -3. Execute `mvn compile -f pom.client.xml` -4. Install the tooling and build the product by executing: - * `mvn install -Dinclude-non-shipping-modules -DskipTests -Dgpg.skip -f pom.client.xml` - -## Building only the Azure SDK client library for Service Bus - -After building the tooling and solution once from the section, [Building all the client libraries](#building-all-the-client-libraries), you can build just the Azure SDK client library for Service Bus by -executing: -1. `mvn compile -f servicebus\azure-messaging-servicebus\pom.xml` - -## Running tests - -After following instructions in [Building all the client libraries](#building-all-the-client-libraries), you can run the -unit tests by executing: -1. `mvn test -f servicebus\azure-messaging-servicebus\pom.xml` - -For unit tests, there are no special considerations; these are self-contained and execute locally without any reliance -on external resources. These tests are run for all PR validations. - -### Running integration tests - -Integration tests have dependencies on live Azure resources and require setting up your development environment prior -to running. Known in the Azure SDK project commonly as "Live" tests, these tests only run when the -[`TestMode`][test-mode] is equal to [`TestMode.RECORD`][test-mode-record]. The Live tests read information from the -following environment variables: - -- AZURE_TEST_MODE - - Can be either "RECORD" or "PLAYBACK". If AZURE_TEST_MODE is equal to "RECORD", then the tests will run against the - live service. -- AZURE_SERVICEBUS_CONNECTION_STRING - - The connection string to the Azure Service Bus instance. This is required when running tests in RECORD mode. It must - contain the name of the Service Bus instance (the `EntityPath=` component must be in the connection string). - -## Logging output - -Log messages can be seen in the output window by: -1. Setting `AZURE_LOG_LEVEL` to the desired verbosity. Log levels can be found in [ClientLogger][log-level] -1. Adding an implementation of [slf4j][slf4j] to the classpath. Implementations can be found under section "[Binding with a - logging framework at deployment time][slf4j-implementations]". - - -[design-guidelines]: https://azuresdkspecs.z5.web.core.windows.net/DesignGuidelines.html -[github-general]: https://github.com/Azure/azure-sdk -[java-spec]: https://azuresdkspecs.z5.web.core.windows.net/JavaSpec.html -[log-level]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java#L40 -[maven]: https://maven.apache.org/ -[slf4j]: https://www.slf4j.org/ -[slf4j-implementations]: https://www.slf4j.org/manual.html#swapping -[test-mode]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core-test/src/main/java/com/azure/core/test/TestMode.java -[test-mode-record]: https://github.com/Azure/azure-sdk-for-java/blob/master/core/azure-core-test/src/main/java/com/azure/core/test/TestMode.java#L12 diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 942d82b4e46e..9c9b489e3172 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -1,39 +1,40 @@ # Azure Service Bus client library for Java Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple -applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data -and state. Data is transferred between different applications and services using messages. If you would like to know -more about Azure Service Bus, you may wish to review: [What is Service Bus](https://docs.microsoft.com/en-us/azure/service-bus-messaging)? +applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and +state. Data is transferred between different applications and services using messages. If you would like to know more +about Azure Service Bus, you may wish to review: [What is Service Bus][product_docs] The Azure Service Bus client library allows for sending and receiving of Azure Service Bus messages and may be used to: - -- Messaging: Transfer business data, such as sales or purchase orders, journals, or inventory movements. -- Decouple applications: Improve reliability and scalability of applications and services. Client and service don't +- Transfer business data, such as sales or purchase orders, journals, or inventory movements. +- Decouple applications to improve reliability and scalability of applications and services. Clients and services don't have to be online at the same time. -- Topics and subscriptions: Enable 1:n relationships between publishers and subscribers. -- Message sessions. Implement work-flows that require message ordering or message deferral. +- Enable 1:n relationships between publishers and subscribers. +- Implement workflows that require message ordering or message deferral. -[Source code][source_code] | [API reference documentation][api_documentation] | [Samples][sample_readme] +[Source code][source_code] | [API reference documentation][api_documentation] +| [Product documentation][product_docs]| [Samples][sample_examples] ## Table of contents -- [Table of contents](#table-of-contents) -- [Getting started](#getting-started) - - [Prerequisites](#prerequisites) - - [Adding the package to your product](#adding-the-package-to-your-product) - - [Authenticate the client](#authenticate-the-client) -- [Key concepts](#key-concepts) -- [Examples](#examples) - - [Sending messages](#sending-messages) - - [Receiving messages](#receiving-messages) -- [Troubleshooting](#troubleshooting) - - [Enable client logging](#enable-client-logging) - - [Enable AMQP transport logging](#enable-amqp-transport-logging) - - [Common exceptions](#common-exceptions) - - [Handling transient AMQP exceptions](#handling-transient-amqp-exceptions) - - [Default SSL library](#default-ssl-library) -- [Next steps](#next-steps) -- [Contributing](#contributing) +- [Azure Service Bus client library for Java](#azure-service-bus-client-library-for-java) + - [Table of contents](#table-of-contents) + - [Getting started](#getting-started) + - [Prerequisites](#prerequisites) + - [Include the package](#include-the-package) + - [Authenticate the client](#authenticate-the-client) + - [Key concepts](#key-concepts) + - [Examples](#examples) + - [Send messages](#send-messages) + - [Receive messages](#receive-messages) + - [Settle messages](#settle-messages) + - [Send and receive from session enabled queues or topics](#send-and-receive-from-session-enabled-queues-or-topics) + - [Troubleshooting](#troubleshooting) + - [Enable client logging](#enable-client-logging) + - [Enable AMQP transport logging](#enable-amqp-transport-logging) + - [Common exceptions](#common-exceptions) + - [Next steps](#next-steps) + - [Contributing](#contributing) ## Getting started @@ -43,27 +44,58 @@ have to be online at the same time. - [Maven][maven] - Microsoft Azure subscription - You can create a free account at: https://azure.microsoft.com +- Azure Service Bus instance + - Step-by-step guide for [creating a Service Bus instance using Azure Portal][service_bus_create] -### Adding the package to your product +### Include the package [//]: # ({x-version-update-start;com.azure:azure-messaging-servicebus;current}) ```xml com.azure azure-messaging-servicebus - 7.0.0-beta.1 + 7.0.0-beta.2 ``` [//]: # ({x-version-update-end}) ### Authenticate the client -For the Service Bus client library to interact with an Service Bus, it will need to understand how to connect -and authorize with it. +For the Service Bus client library to interact with Service Bus, it will need to understand how to connect and authorize +with it. + +#### Create Service Bus clients using a connection string + +The easiest means for authenticating is to use a connection string, which automatically created when creating a Service Bus +namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to +[get a Service Bus connection string][service_bus_connection_string]. + +Both the asynchronous and synchronous Service Bus sender and receiver clients are instantiated using +`ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver, +respectively. + + +```java +ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); +``` + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .buildAsyncClient(); +``` -#### Create an Service Bus client using Microsoft identity platform (formerly Azure Active Directory) +#### Create a Service Bus client using Microsoft Identity platform (formerly Azure Active Directory) -Azure SDK for Java supports an Azure Identity package, making it simple get credentials from Microsoft identity +Azure SDK for Java supports the Azure Identity package, making it simple to get credentials from the Microsoft identity platform. First, add the package: [//]: # ({x-version-update-start;com.azure:azure-identity;dependency}) @@ -76,20 +108,18 @@ platform. First, add the package: ``` [//]: # ({x-version-update-end}) -All the implemented ways to request a credential can be found under the `com.azure.identity.credential` package. The -sample below shows how to use an Azure Active Directory (AAD) application client secret to authorize with Azure Service Bus. +The implemented ways to request a credential are under the `com.azure.identity.credential` package. The sample below +shows how to use an Azure Active Directory (AAD) application client secret to authorize with Azure Service Bus. -#### Authorizing with AAD application client secret +##### Authorizing with DefaultAzureCredential -Follow the instructions in [Creating a service principal using Azure Portal][application_client_secret] to create a -service principal and a client secret. The corresponding `clientId` and `tenantId` for the service principal can be -obtained from the [App registration page][app_registration_page]. - -Set the values of the client ID, tenant ID, and client secret of the AAD application as environment variables: -AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET. +Authorization is easiest using [DefaultAzureCredential][wiki_identity]. It finds the best credential to use in its +running environment. For more information about using Azure Active Directory authorization with Service Bus, please +refer to [the associated documentation][aad_authorization]. Use the returned token credential to authenticate the client: - + + ```java TokenCredential credential = new DefaultAzureCredentialBuilder() .build(); @@ -100,92 +130,163 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .buildAsyncClient(); ``` -##### Service Bus Roles - -When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such -as the `Azure Service Bus Data Owner`, `Azure Service Bus Data Sender` and `Azure Service Bus Data Receiver` -[role][servicebus_roles]. -For more information about using Azure Active Directory authorization with Service Bus, please refer to -[the associated documentation][aad_authorization]. - ## Key concepts You can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and -on which actual message transmission takes place, the namespace often serving as an application container: - -* [Queue][queue_concept]: Allows for Sending and Receiving of messages, ordered first-in-first-out. Often used for -point-to-point communication. +on which actual message transmission takes place. The namespace often serves as an application container: -* [Topic][topic_concept]: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can -be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from. - -* [Subscription][subscription_concept]: The mechanism to consume from a Topic. Each subscription is independent, and -receaves a copy of each message sent to the topic. +* A **[queue][queue_concept]** allows for the sending and receiving of messages, ordered first-in-first-out. It is often + used for point-to-point communication. +* A **[topic][topic_concept]** is better suited to publisher and subscriber scenarios. A topic publishes messages to + subscriptions, of which, multiple can exist simultaneously. +* A **[subscription][subscription_concept]** receives messages from a topic. Each subscription is independent and + receives a copy of the message sent to the topic. ## Examples -### Create a sender or receiver using connection string - -The easiest means for doing so is to use a connection string, which is created automatically when creating an Service Bus -namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to -[get an Service Bus connection string][service_bus_connection_string]. +### Send messages -Both the asynchronous and synchronous Service Bus sender and receiver clients can be created using -`ServiceBusClientBuilder`.The examples are explained blow. +You'll need to create an asynchronous [`ServiceBusSenderAsyncClient`][ServiceBusSenderAsyncClient] or a synchronous +[`ServiceBusSenderClient`][ServiceBusSenderClient] to send messages. Each sender can send messages to either a queue or +a topic. -The snippet below creates an asynchronous Service Bus sender. +The snippet below creates a synchronous `ServiceBusSender` to publish a message to a queue. - + ```java -String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; -ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() - .connectionString(connectionString) +ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sender() .queueName("<< QUEUE NAME >>") - .buildAsyncClient(); + .buildClient(); +List messages = Arrays.asList( + new ServiceBusMessage("Hello world".getBytes()).setMessageId("1"), + new ServiceBusMessage("Bonjour".getBytes()).setMessageId("2")); + +sender.send(messages); ``` -The snippet below creates an asynchronous Service Bus receiver. +### Receive messages + +You'll need to create an asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] or a synchronous +[`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages. Each receiver can consume messages from +either a queue or a topic subscription. + +#### Receive a batch of messages + +The snippet below creates a `ServiceBusReceiverClient` to receive messages from a topic subscription. It returns a batch +of messages when 10 messages are received or until 30 seconds have elapsed, whichever happens first. - + ```java -String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; -ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .connectionString(connectionString) +ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() .topicName("<< TOPIC NAME >>") .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + +IterableStream messages = receiver.receive(10, Duration.ofSeconds(30)); +messages.forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), + new String(message.getBody(), StandardCharsets.UTF_8)); +}); +``` + +#### Receive a stream of messages + +The asynchronous `ServiceBusReceiverAsyncClient` continuously fetches messages until the `subscription` is disposed. + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); + +Disposable subscription = receiver.receive().subscribe(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s%n", message.getMessageId()); + System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); +}, error -> { + System.err.println("Error occurred while receiving messages: " + error); + }, () -> { + System.out.println("Finished receiving messages."); + }); +``` + +### Settle messages + +When a message is received, it can be settled using any of the `complete()`, `abandon()`, `defer()`, or `deadLetter()` +overloads. The sample below completes a received message from synchronous `ServiceBusReceiverClient`. + + +```java +receiver.receive(10).forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + + // Process message and then complete it. + receiver.complete(message); +}); ``` -### Sending messages +### Send and receive from session enabled queues or topics -You'll need to create an asynchronous [`ServiceBusSenderAsyncClient`][ServiceBusSenderAsyncClient] or -a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to send message. Each sender can send message to either, a queue, -or topic. +> Using sessions requires you to create a session enabled queue or subscription. You can read more about how to +> configure this in "[Message sessions][message-sessions]". -* [Sending a message asynchronously][sample-send-async-message]. -* [Sending a message asynchronously using active directory credential][sample-send-async-aad-message]. -* [Send message in batch synchronously][sample-send-batch-messages]. +Unlike non-session-enabled queues or subscriptions, only a single receiver can read from a session at any time. When a +receiver fetches a session, Service Bus locks the session for that receiver, and it has exclusive access to messages in +that session. -### Receiving messages +#### Send message to a session -You'll need to create an asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] or -a synchronous [`ServiceBusReceiverClient`][ServiceBusReceiverClient]. Each receiver can receive message from either, a queue, -or subscriber. +Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on +a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. -* [Receiving a message asynchronously][sample-receive-async-message]. -* [Receiving a message asynchronously using active directory credential][sample-receive-async-aad-message]. -* [Receiving a message asynchronously and settling][sample-receive-message-and-settle]. -* [Receiving messages synchronously][sample-receive-message-synchronously]. + +```java +ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) + .setSessionId("greetings"); + +sender.send(message); +``` + +#### Receive messages from a session + +Receivers can fetch messages from a specific session or the first available, unlocked session. The first snippet creates +a receiver for a session with id "greetings". The second snippet creates a receiver that fetches the first available +session. + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .sessionId("greetings") + .buildAsyncClient(); +``` + + +```java +ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .buildAsyncClient(); +``` ## Troubleshooting ### Enable client logging -You can set the `AZURE_LOG_LEVEL` environment variable to view logging statements made in the client library. For -example, setting `AZURE_LOG_LEVEL=2` would show all informational, warning, and error log messages. The log levels can -be found here: [log levels][LogLevels]. +Azure SDK for Java offers a consistent logging story to help aid in troubleshooting application errors and expedite +their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help +locate the root issue. View the [logging][logging] wiki for guidance about enabling logging. ### Enable AMQP transport logging @@ -224,66 +325,40 @@ services. When an AMQP exception is thrown, examining the error condition field exception occurred and if possible, how to mitigate this exception. A list of all the AMQP exceptions can be found in [OASIS AMQP Version 1.0 Transport Errors][oasis_amqp_v1_error]. -The [`AmqpErrorContext`][AmqpErrorContext] in the [`AmqpException`][AmqpException] provides information about the AMQP -session, link, or connection that the exception occurred in. This is useful to diagnose which level in the transport -this exception occurred at and whether it was an issue in one of the producers or consumers. - -#### Operation cancelled exception - -It occurs when the underlying AMQP layer encounters an abnormal link abort or the connection is disconnected in an -unexpected fashion. It is recommended to attempt to verify the current state and retry if necessary. - -### Handling transient AMQP exceptions - -If a transient AMQP exception occurs, the client library retries the operation as many times as the -[AmqpRetryOptions][AmqpRetryOptions] allows. Afterwards, the operation fails and an exception is propagated back to the -user. - -### Default SSL library - -All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL -operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides -better performance compared to the default SSL implementation within the JDK. For more information, including how to -reduce the dependency size, refer to the [performance tuning][performance_tuning] section of the wiki. +The recommended way to solve the specific exception the AMQP exception represents is to follow the +[Service Bus Messaging Exceptions][] guidance. ## Next steps Beyond those discussed, the Azure Service Bus client library offers support for many additional scenarios to help take -advantage of the full feature set of the Azure Service Bus service. In order to help explore some of the these scenarios, -the following set of sample is available [here][samples_readme]. +advantage of the full feature set of the Azure Service Bus service. In order to help explore some of these scenarios, +check out the [samples README][samples_readme]. ## Contributing If you would like to become an active contributor to this project please refer to our [Contribution -Guidelines](./CONTRIBUTING.md) for more information. +Guidelines](./../../../CONTRIBUTING.md) for more information. [aad_authorization]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application [amqp_transport_error]: https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error [AmqpErrorCondition]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java -[AmqpErrorContext]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorContext.java -[AmqpException]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpException.java [api_documentation]: https://aka.ms/java-docs -[app_registration_page]: https://docs.microsoft.com/azure/active-directory/develop/howto-create-service-principal-portal#get-values-for-signing-in -[application_client_secret]: https://docs.microsoft.com/azure/active-directory/develop/howto-create-service-principal-portal#create-a-new-application-secret [java_8_sdk_javadocs]: https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html -[LogLevels]: ../../core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java +[logging]: https://github.com/Azure/azure-sdk-for-java/wiki/Logging-with-Azure-SDK [maven]: https://maven.apache.org/ +[message-sessions]: https://docs.microsoft.com/azure/service-bus-messaging/message-sessions [oasis_amqp_v1_error]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-error [oasis_amqp_v1]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html -[performance_tuning]: https://github.com/Azure/azure-sdk-for-java/wiki/Performance-Tuning +[product_docs]: https://docs.microsoft.com/azure/service-bus-messaging [qpid_proton_j_apache]: http://qpid.apache.org/proton/ [queue_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#queues [RetryOptions]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java -[sample_readme]: ./src/samples/README.md -[sample-receive-async-aad-message]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java -[sample-receive-async-message]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java -[sample-receive-message-and-settle]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java -[sample-receive-message-synchronously]: ./src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java -[sample-send-async-aad-message]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageWithAzureIdentityAsyncSample.java -[sample-send-async-message]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageAsyncSample.java -[sample-send-batch-messages]: ./src/samples/java/com/azure/messaging/servicebus/SendMessageBatchSyncSample.java +[sample_examples]: ./src/samples/java/com/azure/messaging/servicebus/ +[samples_readme]: ./src/samples/README.md [service_bus_connection_string]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string +[servicebus_create]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal +[servicebus_messaging_exceptions]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-exceptions [servicebus_roles]: https://docs.microsoft.com/azure/service-bus-messaging/authenticate-application#built-in-rbac-roles-for-azure-service-bus [ServiceBusReceiverAsyncClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java [ServiceBusReceiverClient]: ./src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -292,5 +367,6 @@ Guidelines](./CONTRIBUTING.md) for more information. [source_code]: ./ [subscription_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions [topic_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#topics +[wiki_identity]: https://github.com/Azure/azure-sdk-for-java/wiki/Identity-and-Authentication ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fservicebus%2Fazure-messaging-servicebus%2FREADME.png) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md b/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md index e272b68c4a76..5b3df0220482 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/README.md @@ -18,7 +18,7 @@ Key concepts are explained in detail [here][sdk_readme_key_concepts]. ## Getting started Please refer to the [Getting Started][sdk_readme_getting_started] section. -### Obtaining an Event Hub instance connection string +### Obtaining a Service Bus namespace connection string Most of the samples authorize with Service Bus using a connection string generated for that Service Bus namespace. The connection string value can be obtained by: @@ -34,12 +34,18 @@ connection string value can be obtained by: - [Send messages using Azure Identity][SendMessageWithAzureIdentityAsyncSample] - [Send message batches synchronously][SendMessageBatchSyncSample] - [Schedule and cancel a message][MessageCancelScheduleAsyncSample] +- [Receive messages synchronously][ReceiveMessageSyncSample] - [Receive and auto-complete messages][ReceiveMessageAsyncSample] - [Receive messages using Azure Identity][ReceiveMessageAzureIdentityAsyncSample] - [Receive and settle messages via complete or abandon][ReceiveMessageAndSettleAsyncSample] -- [Receive messages synchronously][ReceiveMessageSyncSample] - [Peek at a message][PeekMessageAsyncSample] +### Message sessions +- [Send and receive messages from a session][SendAndReceiveSessionMessageSample] +- [Receive messages from multiple available sessions][ReceiveMultipleSessionsAsyncSample] +- [Receive messages from a specific session][ReceiveNamedSessionAsyncSample] +- [Receive messages from the first available session][ReceiveSingleSessionAsyncSample] + ## Troubleshooting See [Troubleshooting][sdk_readme_troubleshooting]. @@ -49,7 +55,7 @@ See [Next steps][sdk_readme_next_steps]. ## Contributing If you would like to become an active contributor to this project please refer to our [Contribution -Guidelines](../../CONTRIBUTING.md) for more information. +Guidelines](../../../../../CONTRIBUTING.md) for more information. [sdk_readme_key_concepts]: ../../README.md#key-concepts @@ -57,14 +63,18 @@ Guidelines](../../CONTRIBUTING.md) for more information. [sdk_readme_troubleshooting]: ../../README.md#troubleshooting [sdk_readme_next_steps]: ../../README.md#next-steps -[MessageCancelScheduleAsyncSample]: ./java/com/azure/messaging/servicebus/SendScheduledMessageAndCancelAsyncSample.java [PeekMessageAsyncSample]: ./java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java [ReceiveMessageAndSettleAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java [ReceiveMessageAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java [ReceiveMessageAzureIdentityAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java [ReceiveMessageSyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java +[ReceiveMultipleSessionsAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java +[ReceiveNamedSessionAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +[ReceiveSingleSessionAsyncSample]: ./java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +[SendAndReceiveSessionMessageSample]: ./java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java [SendMessageAsyncSample]: ./java/com/azure/messaging/servicebus/SendMessageAsyncSample.java [SendMessageBatchSyncSample]: ./java/com/azure/messaging/servicebus/SendMessageBatchSyncSample.java [SendMessageWithAzureIdentityAsyncSample]: ./java/com/azure/messaging/servicebus/SendMessageWithAzureIdentityAsyncSample.java +[SendScheduledMessageAndCancelAsyncSample]: ./java/com/azure/messaging/servicebus/SendScheduledMessageAndCancelAsyncSample.java ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fappconfiguration%2Fazure-messaging-servicebus%2Fsrc%2Fsamples%2FREADME.png) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index bc2ea9444be7..70c1cc118aa0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -4,9 +4,15 @@ package com.azure.messaging.servicebus; import com.azure.core.credential.TokenCredential; -import com.azure.identity.ClientSecretCredential; -import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.core.util.IterableStream; import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; /** * WARNING: MODIFYING THIS FILE WILL REQUIRE CORRESPONDING UPDATES TO README.md FILE. LINE NUMBERS ARE USED TO EXTRACT @@ -20,21 +26,19 @@ public class ReadmeSamples { * Code sample for creating an asynchronous Service Bus sender. */ public void createAsynchronousServiceBusSender() { - String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; - ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() - .connectionString(connectionString) + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sender() .queueName("<< QUEUE NAME >>") - .buildAsyncClient(); + .buildClient(); } /** * Code sample for creating an asynchronous Service Bus receiver. */ public void createAsynchronousServiceBusReceiver() { - String connectionString = "<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>"; ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .connectionString(connectionString) + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() .topicName("<< TOPIC NAME >>") .subscriptionName("<< SUBSCRIPTION NAME >>") @@ -55,19 +59,118 @@ public void createAsynchronousServiceBusReceiverWithAzureIdentity() { } /** - * Code sample for creating an asynchronous Service Bus receiver using Aad. + * Sends messages to a queue. */ - public void createAsynchronousServiceBusReceiverWithAad() { - ClientSecretCredential credential = new ClientSecretCredentialBuilder() - .clientId("<< APPLICATION (CLIENT) ID >>") - .clientSecret("<< APPLICATION SECRET >>") - .tenantId("<< TENANT ID >>") - .build(); + public void sendMessage() { + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); + List messages = Arrays.asList( + new ServiceBusMessage("Hello world".getBytes()).setMessageId("1"), + new ServiceBusMessage("Bonjour".getBytes()).setMessageId("2")); + + sender.send(messages); + } + + /** + * Receives messages from a topic and subscription. + */ + public void receiveMessages() { + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + IterableStream messages = receiver.receive(10, Duration.ofSeconds(30)); + messages.forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), + new String(message.getBody(), StandardCharsets.UTF_8)); + }); + } + + /** + * Receives messages asynchronously. + */ + public void receiveMessagesAsync() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .credential("<>", credential) + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .receiver() - .queueName("<>") + .queueName("<< QUEUE NAME >>") + .buildAsyncClient(); + + Disposable subscription = receiver.receive().subscribe(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + System.out.printf("Id: %s%n", message.getMessageId()); + System.out.printf("Contents: %s%n", new String(message.getBody(), StandardCharsets.UTF_8)); + }, error -> { + System.err.println("Error occurred while receiving messages: " + error); + }, () -> { + System.out.println("Finished receiving messages."); + }); + } + + /** + * Complete a message. + */ + public void completeMessage() { + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .receiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .receiveMode(ReceiveMode.PEEK_LOCK) + .buildClient(); + + receiver.receive(10).forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); + + // Process message and then complete it. + receiver.complete(message); + }); + } + + /** + * Create a session message. + */ + public void createSessionMessage() { + ServiceBusSenderClient sender = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sender() + .queueName("<< QUEUE NAME >>") + .buildClient(); + + ServiceBusMessage message = new ServiceBusMessage("Hello world".getBytes()) + .setSessionId("greetings"); + + sender.send(message); + } + + /** + * Create session receiver for "greetings" + */ + public void namedSessionReceiver() { + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .sessionId("greetings") + .buildAsyncClient(); + } + + /** + * Create session receiver for the first available session. + */ + public void unnamedSessionReceiver() { + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java index c120a5a08f27..a23782c0fafc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java @@ -43,6 +43,8 @@ public static void main(String[] args) { System.out.println("Received Message Id: " + message.getMessageId()); System.out.println("Received Message: " + new String(message.getBody())); + + receiverClient.complete(message); }); // Close the receiver. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java similarity index 82% rename from sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java rename to sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java index a868bcee7b9e..184805b6311e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/MultiSessionReceiveAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java @@ -9,18 +9,18 @@ import reactor.core.publisher.Mono; import java.security.SecureRandom; -import java.time.Duration; import java.util.concurrent.TimeUnit; /** - * Sample demonstrates how to receive and process multiple samples. + * Sample demonstrates how to receive and process multiple sessions. In the sample, at most 3 sessions are processed + * concurrently. When there are no more messages in a session, the receiver finds the next available session to + * process. */ -public class MultiSessionReceiveAsyncSample { +public class ReceiveMultipleSessionsAsyncSample { private static final SecureRandom RANDOM = new SecureRandom(); /** - * Main method to invoke this demo on how to receive from multiple sessions {@link ServiceBusReceivedMessage} from - * an Azure Service Bus Queue. + * Main method to invoke this demo on how to receive messages from multiple sessions in an Azure Service Bus Queue. * * @param args Unused arguments to the program. * @@ -36,8 +36,8 @@ public static void main(String[] args) throws InterruptedException { + "SharedAccessKey={key}"; // Create a receiver. - // "<>" will be the name of the Service Bus queue instance you created inside the Service Bus - // namespace. + // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the + // Service Bus namespace. ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() @@ -46,12 +46,11 @@ public static void main(String[] args) throws InterruptedException { .queueName("<>") .buildAsyncClient(); - // At most, the receiver will automatically renew the session lock until 120 seconds have elapsed. // By default, after messages are processed, they are completed (ie. removed from the queue/topic). Setting - // enableAutoComplete to true will tell the processor to complete or abandon the message depending + // enableAutoComplete to true will tell the processor to complete or abandon the message depending on whether or + // not processing the message results in an exception. ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false) - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(120)); + .setIsAutoCompleteEnabled(false); Disposable subscription = receiver.receive(options) .flatMap(context -> { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java new file mode 100644 index 000000000000..6b33ecbd6f95 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates how to receive messages from a named session. + */ +public class ReceiveNamedSessionAsyncSample { + /** + * Main method to invoke this demo on how to receive messages from a session with id "greetings" in an Azure Service + * Bus Queue. + * + * @param args Unused arguments to the program. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. + */ + public static void main(String[] args) throws InterruptedException { + + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the + // Service Bus namespace. + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sessionReceiver() + .sessionId("greetings") + .receiveMode(ReceiveMode.PEEK_LOCK) + .queueName("<>") + .buildAsyncClient(); + + Disposable subscription = receiver.receive() + .subscribe(context -> { + if (context.hasError()) { + System.out.printf("An error occurred in session %s. Error: %s%n", + context.getSessionId(), context.getThrowable()); + return; + } + + System.out.println("Processing message from session: " + context.getSessionId()); + + // Process message + // The message is automatically completed if no exceptions are thrown while processing message. + }, error -> { + System.err.println("Error occurred: " + error); + }); + + // Subscribe is not a blocking call so we sleep here so the program does not end. + TimeUnit.SECONDS.sleep(60); + + // Disposing of the subscription will cancel the receive() operation. + subscription.dispose(); + + // Close the receiver. + receiver.close(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java new file mode 100644 index 000000000000..6b1ec92583d6 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.Disposable; + +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates how to receive from the first available session. + */ +public class ReceiveSingleSessionAsyncSample { + /** + * Main method to invoke this demo on how to receive messages from the first available session in a Service Bus + * topic subscription. + * + * @param args Unused arguments to the program. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. + */ + public static void main(String[] args) throws InterruptedException { + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will be the name of the Service Bus topic you created inside the Service Bus namespace. + // "<>" will be the name of the session-enabled subscription. + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sessionReceiver() + .receiveMode(ReceiveMode.PEEK_LOCK) + .topicName("<>") + .subscriptionName("<>") + .buildAsyncClient(); + + // Messages are continuously received until there are no more messages in the current session. Then, the Flux + // completes. + Disposable subscription = receiver.receive() + .subscribe(context -> { + if (context.hasError()) { + System.out.printf("An error occurred in session %s. Error: %s%n", + context.getSessionId(), context.getThrowable()); + return; + } + + System.out.println("Processing message from session: " + context.getSessionId()); + + // Process message + // The message is automatically completed if no exceptions are thrown while processing message. + }, error -> { + System.err.println("Error occurred: " + error); + }); + + // Subscribe is not a blocking call so we sleep here so the program does not end. + TimeUnit.SECONDS.sleep(60); + + // Disposing of the subscription will cancel the receive() operation. + subscription.dispose(); + + // Close the receiver. + receiver.close(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java index d8691cb6ebc3..e458b17cc77a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java @@ -127,7 +127,7 @@ void autoCompletesAndAutoRenews() { logger.info("Now: {}", Instant.now()); try { - TimeUnit.SECONDS.sleep(8); + TimeUnit.SECONDS.sleep(15); } catch (InterruptedException ignored) { } logger.info("After: {}", Instant.now()); @@ -211,7 +211,7 @@ void autoRenewExpires() { @Test void autoRenewOperationErrors() { // Arrange - final Duration maxRenewDuration = Duration.ofSeconds(10); + final Duration maxRenewDuration = Duration.ofSeconds(20); final String lock1 = UUID.randomUUID().toString(); final String lock2 = UUID.randomUUID().toString(); when(message1.getLockToken()).thenReturn(lock1); @@ -234,7 +234,7 @@ void autoRenewOperationErrors() { logger.info("Now: {}", Instant.now()); try { - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(10); } catch (InterruptedException ignored) { } logger.info("After: {}", Instant.now()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 5ea87289d080..52aa09adf4a8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -23,6 +23,7 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.test.StepVerifier; @@ -65,8 +66,8 @@ class ServiceBusReceiveLinkProcessorTest { private ArgumentCaptor> creditSupplierCaptor; private final AmqpErrorContext amqpErrorContext = new AmqpErrorContext("test-context"); - private final DirectProcessor endpointProcessor = DirectProcessor.create(); - private final DirectProcessor messageProcessor = DirectProcessor.create(); + private final EmitterProcessor endpointProcessor = EmitterProcessor.create(); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(); private final FluxSink messageProcessorSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); private ServiceBusReceiveLinkProcessor linkProcessor; @@ -288,7 +289,9 @@ void newLinkOnRetryableError() { final ServiceBusReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor); final FluxSink endpointSink = endpointProcessor.sink(); - when(link2.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); + when(link2.getEndpointStates()).thenReturn(Flux.defer(() -> Flux.create(e -> { + e.next(AmqpEndpointState.ACTIVE); + }))); when(link2.receive()).thenReturn(Flux.just(message2)); final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-error", @@ -581,8 +584,8 @@ void receivesFromFirstLink() { } /** - * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only - * that number is consumed. + * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only that + * number is consumed. */ @Test void backpressureRequestOnlyEmitsThatAmount() { diff --git a/sdk/servicebus/tests.yml b/sdk/servicebus/tests.yml index a2032a72c0d9..0f9627249c51 100644 --- a/sdk/servicebus/tests.yml +++ b/sdk/servicebus/tests.yml @@ -10,4 +10,4 @@ jobs: safeName: azuremessagingservicebus EnvVars: AZURE_TEST_MODE: RECORD - AZURE_LOG_LEVEL: 3 + AZURE_LOG_LEVEL: 2