Skip to content

Commit

Permalink
Change receiveMessages API to return ServiceBusReceivedMessage (Azure…
Browse files Browse the repository at this point in the history
…#17100)

* Change receiveMessages API to return ServiceBusReceivedMessage

* Update spring sample
  • Loading branch information
srnagar authored Nov 3, 2020
1 parent 6e8a0f5 commit 55690a7
Show file tree
Hide file tree
Showing 43 changed files with 459 additions and 517 deletions.
16 changes: 8 additions & 8 deletions sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()

// Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
// happens first.
IterableStream<ServiceBusReceivedMessageContext> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
messages.forEach(context -> {
ServiceBusReceivedMessage message = context.getMessage();
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
messages.forEach(message -> {

System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(),
message.getBody().toString());
});
Expand All @@ -224,8 +224,8 @@ ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()

// receive() operation continuously fetches messages until the subscription is disposed.
// The stream is infinite, and completes when the subscription or receiver is closed.
Disposable subscription = receiver.receiveMessages().subscribe(context -> {
ServiceBusReceivedMessage message = context.getMessage();
Disposable subscription = receiver.receiveMessages().subscribe(message -> {

System.out.printf("Id: %s%n", message.getMessageId());
System.out.printf("Contents: %s%n", message.getBody().toString());
}, error -> {
Expand All @@ -250,10 +250,10 @@ overloads. The sample below completes a received message from synchronous
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L145-L151 -->
```java
// This fetches a batch of 10 messages or until the default operation timeout has elapsed.
receiver.receiveMessages(10).forEach(context -> {
ServiceBusReceivedMessage message = context.getMessage();

receiver.receiveMessages(10).forEach(message -> {
// Process message and then complete it.
System.out.println("Completing message " + message.getLockToken());

receiver.complete(message);
});
```
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-messaging-servicebus/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ counterpart `ServiceBusReceiverAsyncClient`.
```java

// Sample code that processes a single message
Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
try {
System.out.println(messageContext.getMessage().getMessageId());
// other message processing code
Expand Down Expand Up @@ -323,7 +323,7 @@ The below code snippet shows you how to use the processor client to receive mess

```java
// Sample code that processes a single message
Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
try {
System.out.println(messageContext.getMessage().getMessageId());
// other message processing code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
/**
* Flux operator that auto-completes or auto-abandons messages when control is returned successfully.
*/
final class FluxAutoComplete extends FluxOperator<ServiceBusReceivedMessageContext, ServiceBusReceivedMessageContext> {
final class FluxAutoComplete extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
private final Semaphore completionLock;
private final Function<ServiceBusReceivedMessageContext, Mono<Void>> onComplete;
private final Function<ServiceBusReceivedMessageContext, Mono<Void>> onAbandon;
private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
private final ClientLogger logger = new ClientLogger(FluxAutoComplete.class);

FluxAutoComplete(Flux<? extends ServiceBusReceivedMessageContext> upstream, Semaphore completionLock,
Function<ServiceBusReceivedMessageContext, Mono<Void>> onComplete,
Function<ServiceBusReceivedMessageContext, Mono<Void>> onAbandon) {
FluxAutoComplete(Flux<? extends ServiceBusMessageContext> upstream, Semaphore completionLock,
Function<ServiceBusMessageContext, Mono<Void>> onComplete,
Function<ServiceBusMessageContext, Mono<Void>> onAbandon) {
super(upstream);
this.completionLock = completionLock;
this.onComplete = Objects.requireNonNull(onComplete, "'onComplete' cannot be null.");
Expand All @@ -40,7 +40,7 @@ final class FluxAutoComplete extends FluxOperator<ServiceBusReceivedMessageConte
* @param coreSubscriber The subscriber interested in the published items.
*/
@Override
public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessageContext> coreSubscriber) {
public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");

final AutoCompleteSubscriber subscriber =
Expand All @@ -49,17 +49,17 @@ public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessageContext> c
source.subscribe(subscriber);
}

static final class AutoCompleteSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> {
private final CoreSubscriber<? super ServiceBusReceivedMessageContext> downstream;
private final Function<ServiceBusReceivedMessageContext, Mono<Void>> onComplete;
private final Function<ServiceBusReceivedMessageContext, Mono<Void>> onAbandon;
static final class AutoCompleteSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
private final Semaphore semaphore;
private final ClientLogger logger;

AutoCompleteSubscriber(CoreSubscriber<? super ServiceBusReceivedMessageContext> downstream,
Semaphore completionLock,
Function<ServiceBusReceivedMessageContext, Mono<Void>> onComplete,
Function<ServiceBusReceivedMessageContext, Mono<Void>> onAbandon, ClientLogger logger) {
AutoCompleteSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream,
Semaphore completionLock,
Function<ServiceBusMessageContext, Mono<Void>> onComplete,
Function<ServiceBusMessageContext, Mono<Void>> onAbandon, ClientLogger logger) {
this.downstream = downstream;
this.onComplete = onComplete;
this.onAbandon = onAbandon;
Expand All @@ -75,7 +75,7 @@ protected void hookOnSubscribe(Subscription subscription) {
}

@Override
protected void hookOnNext(ServiceBusReceivedMessageContext value) {
protected void hookOnNext(ServiceBusMessageContext value) {
final ServiceBusReceivedMessage message = value.getMessage();
final String sequenceNumber = message != null ? String.valueOf(message.getSequenceNumber()) : "n/a";

Expand Down Expand Up @@ -131,8 +131,8 @@ public Context currentContext() {
* @param message received message to apply function to.
* @param operation The operation name.
*/
private void applyWithCatch(Function<ServiceBusReceivedMessageContext, Mono<Void>> function,
ServiceBusReceivedMessageContext message, String operation) {
private void applyWithCatch(Function<ServiceBusMessageContext, Mono<Void>> function,
ServiceBusMessageContext message, String operation) {
try {
function.apply(message).block();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Receives messages from to upstream, subscribe lock renewal subscriber.
*/
final class FluxAutoLockRenew extends FluxOperator<ServiceBusReceivedMessageContext, ServiceBusReceivedMessageContext> {
final class FluxAutoLockRenew extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {

private final ClientLogger logger = new ClientLogger(FluxAutoLockRenew.class);

Expand All @@ -41,8 +41,9 @@ final class FluxAutoLockRenew extends FluxOperator<ServiceBusReceivedMessageCont
* @throws IllegalArgumentException If maxLockRenewalDuration is zero or negative.
*/
FluxAutoLockRenew(
Flux<? extends ServiceBusReceivedMessageContext> source, Duration maxAutoLockRenewDuration,
LockContainer<LockRenewalOperation> messageLockContainer, Function<String, Mono<OffsetDateTime>> onRenewLock) {
Flux<? extends ServiceBusMessageContext> source, Duration maxAutoLockRenewDuration,
LockContainer<LockRenewalOperation> messageLockContainer, Function<String,
Mono<OffsetDateTime>> onRenewLock) {
super(source);

this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
Expand All @@ -58,7 +59,7 @@ final class FluxAutoLockRenew extends FluxOperator<ServiceBusReceivedMessageCont
}

@Override
public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessageContext> coreSubscriber) {
public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");

final LockRenewSubscriber newLockRenewSubscriber = new LockRenewSubscriber(coreSubscriber, maxAutoLockRenewal,
Expand All @@ -70,17 +71,17 @@ public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessageContext> c
/**
* Receives messages from to upstream, pushes them downstream and start lock renewal.
*/
static final class LockRenewSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> {
private static final Consumer<ServiceBusReceivedMessageContext> LOCK_RENEW_NO_OP = messageContext -> { };
static final class LockRenewSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
private static final Consumer<ServiceBusMessageContext> LOCK_RENEW_NO_OP = messageContext -> { };

private final ClientLogger logger = new ClientLogger(LockRenewSubscriber.class);

private final Function<String, Mono<OffsetDateTime>> onRenewLock;
private final Duration maxAutoLockRenewal;
private final LockContainer<LockRenewalOperation> messageLockContainer;
private final CoreSubscriber<? super ServiceBusReceivedMessageContext> actual;
private final CoreSubscriber<? super ServiceBusMessageContext> actual;

LockRenewSubscriber(CoreSubscriber<? super ServiceBusReceivedMessageContext> actual,
LockRenewSubscriber(CoreSubscriber<? super ServiceBusMessageContext> actual,
Duration maxAutoLockRenewDuration, LockContainer<LockRenewalOperation> messageLockContainer,
Function<String, Mono<OffsetDateTime>> onRenewLock) {
this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
Expand Down Expand Up @@ -117,10 +118,10 @@ protected void hookOnError(Throwable throwable) {
}

@Override
protected void hookOnNext(ServiceBusReceivedMessageContext messageContext) {
protected void hookOnNext(ServiceBusMessageContext messageContext) {
final ServiceBusReceivedMessage message = messageContext.getMessage();

final Consumer<ServiceBusReceivedMessageContext> lockCleanup;
final Consumer<ServiceBusMessageContext> lockCleanup;

if (message != null) {
final String lockToken = message.getLockToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ public ServiceBusSenderClient buildClient() {
public final class ServiceBusSessionProcessorClientBuilder {
private final ServiceBusProcessorClientOptions processorClientOptions;
private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder;
private Consumer<ServiceBusProcessorMessageContext> processMessage;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;

private ServiceBusSessionProcessorClientBuilder() {
Expand Down Expand Up @@ -739,7 +739,7 @@ public ServiceBusSessionProcessorClientBuilder topicName(String topicName) {
* @return The updated {@link ServiceBusProcessorClientBuilder} object.
*/
public ServiceBusSessionProcessorClientBuilder processMessage(
Consumer<ServiceBusProcessorMessageContext> processMessage) {
Consumer<ServiceBusReceivedMessageContext> processMessage) {
this.processMessage = processMessage;
return this;
}
Expand Down Expand Up @@ -1080,7 +1080,7 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
public final class ServiceBusProcessorClientBuilder {
private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder;
private final ServiceBusProcessorClientOptions processorClientOptions;
private Consumer<ServiceBusProcessorMessageContext> processMessage;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;

private ServiceBusProcessorClientBuilder() {
Expand Down Expand Up @@ -1159,7 +1159,7 @@ public ServiceBusProcessorClientBuilder topicName(String topicName) {
* @return The updated {@link ServiceBusProcessorClientBuilder} object.
*/
public ServiceBusProcessorClientBuilder processMessage(
Consumer<ServiceBusProcessorMessageContext> processMessage) {
Consumer<ServiceBusReceivedMessageContext> processMessage) {
this.processMessage = processMessage;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;

import java.util.Objects;

/**
* Represents the result of a receive message operation with context from Service Bus.
*/
final class ServiceBusMessageContext {
private final ServiceBusReceivedMessage message;
private final String sessionId;
private final Throwable error;

/**
* Creates an instance where a message was successfully received.
*
* @param message Message received.
*/
ServiceBusMessageContext(ServiceBusReceivedMessage message) {
this.message = Objects.requireNonNull(message, "'message' cannot be null.");
this.sessionId = message.getSessionId();
this.error = null;
}

/**
* Creates an instance where an error occurred such as session-lock-lost.
*
* @param sessionId Session id that the error occurred in.
* @param error AMQP exception that occurred in session.
*/
ServiceBusMessageContext(String sessionId, Throwable error) {
this.sessionId = Objects.requireNonNull(sessionId, "'sessionId' cannot be null.");
this.error = Objects.requireNonNull(error, "'error' cannot be null.");
this.message = null;
}

/**
* Gets the session id of the message or that the error occurred in.
*
* @return The session id associated with the error or message. {@code null} if there is no session.
*/
public String getSessionId() {
return sessionId;
}

/**
* Gets the throwable that occurred.
*
* @return The throwable that occurred or {@code null} if there was no error.
*/
public Throwable getThrowable() {
return error;
}

/**
* Gets the message received from Service Bus.
*
* @return The message received from Service Bus or {@code null} if an exception occurred.
*/
public ServiceBusReceivedMessage getMessage() {
return message;
}

/**
* Gets whether or not an error occurred while receiving the next message.
*
* @return {@code true} if there was an error when receiving the next message; {@code false} otherwise.
*/
public boolean hasError() {
return error != null;
}
}
Loading

0 comments on commit 55690a7

Please sign in to comment.