Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposes management node in azure-core-amqp #22095

Merged
merged 60 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
8f8ebf4
Update AmqpConnection to have a getManagementNode.
conniey Jun 1, 2021
e9bca38
Adding AmqpManagementNode.
conniey Jun 1, 2021
4772c57
AmqpManagementNode can be closed asynchronously.
conniey Jun 2, 2021
de72055
Update AmqpConnection to use AsyncCloseable.
conniey Jun 1, 2021
cade086
Adding AsyncCloseable to AmqpLink.
conniey Jun 2, 2021
fdf337c
AmqpSession extends from AsyncCloseable.
conniey Jun 4, 2021
bfc6945
ClaimsBasedSecurityNode.java uses AsyncCloseable.
conniey Jun 4, 2021
95a9bad
Implements CbsNode's closeAsync() and adds tests.
conniey Jun 4, 2021
810c0bf
ReactorSession implements closeAsync()
conniey Jun 4, 2021
a1360ae
ReactorConnection uses closeAsync(). Renames dispose() to closeAsync(…
conniey Jun 4, 2021
13c575a
RequestResponseChannel. Remove close operation with message.
conniey Jun 4, 2021
0868c62
Adding import for Mono in AmqpLink
conniey Jun 4, 2021
f734470
Removing unused import
conniey Jun 4, 2021
f4f5290
Adding DeliveryOutcome models.
conniey Jun 3, 2021
c37553f
Adding delivery state enum
conniey Jun 3, 2021
06d2586
Add authorization scope to connection options.
conniey Jun 4, 2021
859c04c
Add utility to serialize and deserialize AmqpAnnotatedMessage
conniey Jun 4, 2021
7fd706a
Update AmqpManagementNode to expose delivery outcomes because they ca…
conniey Jun 4, 2021
24b84a0
Rename AmqpMessageUtils to MessageUtils
conniey Jun 4, 2021
9e82d11
Adding ReceivedDeliveryOutcome.
conniey Jun 4, 2021
acb6f66
Adding additional link to DeliveryState enum.
conniey Jun 4, 2021
cde64c7
Adding MessageUtil support for converting DeliveryOutcome and Outcomes.
conniey Jun 4, 2021
963f331
Fixing build breaks from ConnectionOptions.
conniey Jun 4, 2021
ede5ead
Adding management channel class.
conniey Jun 4, 2021
5ff8fe5
Adding management node into reactor connection.
conniey Jun 4, 2021
261a8ea
Update ExceptionUtil to return instead of throwing on unknown amqp er…
conniey Jun 4, 2021
7419dd9
Moving ManagementChannel formatting.
conniey Jun 4, 2021
5e27c83
Add javadocs to ReceivedDeliveryOutcome.
conniey Jun 4, 2021
a2e5b52
Fix possible NPE in MessageUtils.
conniey Jun 4, 2021
1316667
Add tests for ManagementChannel
conniey Jun 4, 2021
6e61aa3
Adding tests for message utils.
conniey Jun 5, 2021
807f028
Fixing checkstyle issues with imports.
conniey Jun 5, 2021
674faee
Fix checkstyle with logger throwing.
conniey Jun 5, 2021
9d0b395
Fixing spacing on AmqpConnection.
conniey Jun 5, 2021
344025a
Fix javadoc on ModifiedDeliveryOutcome
conniey Jun 5, 2021
5a98198
Fix SpotBug issues in MessageUtils.
conniey Jun 5, 2021
959dc28
ReactorConnection: Hook up dispose method.
conniey Jun 5, 2021
eea2f6b
EventHubs: Fixing instances of ConnectionOptions.
conniey Jun 5, 2021
cdcd920
Fix build errors using ConnectionOptions.
conniey Jun 5, 2021
9db7990
Change deliverystate to expandable enum
conniey Jun 3, 2021
a453928
Update MessageUtils to consider DeliveryState not as enum.
conniey Jun 5, 2021
64558fa
DeliveryStates are final.
conniey Jun 5, 2021
450ccfb
Fix spotbug errors.
conniey Jun 5, 2021
698abe8
Fix NPE when closing connection in SB.
conniey Jun 5, 2021
f23815b
Fix spotbug error.
conniey Jun 5, 2021
0d5c7e3
Fix documentation on TransactionalDeliveryOutcome
conniey Jun 6, 2021
112de02
Adding additional tests for MessageUtilsTest
conniey Jun 6, 2021
f74e702
Adding MessageUtilsTests.
conniey Jun 7, 2021
32fdcd7
Mapping String to Symbol for maps.
conniey Jun 7, 2021
8652c23
Adding more tests.
conniey Jun 7, 2021
e5f556e
Update sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/imp…
conniey Jun 7, 2021
c3f814c
Adding documentation to DeliveryOutcome.
conniey Jun 7, 2021
bdb581c
Updating CHANGELOG.
conniey Jun 7, 2021
5c07227
DeliveryState is final.
conniey Jun 7, 2021
ba29460
Add common methods to DeliveryState.
conniey Jun 7, 2021
593cc90
Updating DeliveryOutcome documentation.
conniey Jun 7, 2021
3d02781
Add documentation to management node.
conniey Jun 7, 2021
45c040a
Adds documentation to AmqpConnection
conniey Jun 7, 2021
64e1aa6
Adding test to get management node.
conniey Jun 7, 2021
0b6e749
Add final keyword.
conniey Jun 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### New Features
- Exposing CbsAuthorizationType.
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
- Delivery outcomes and delivery states are added.

### Bug Fixes
- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.amqp;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -13,7 +14,7 @@
/**
* Represents a TCP connection between the client and a service that uses the AMQP protocol.
*/
public interface AmqpConnection extends Disposable {
public interface AmqpConnection extends Disposable, AsyncCloseable {
/**
* Gets the connection identifier.
*
Expand Down Expand Up @@ -53,6 +54,7 @@ public interface AmqpConnection extends Disposable {
* Creates a new session with the given session name.
*
* @param sessionName Name of the session.
*
* @return The AMQP session that was created.
*/
Mono<AmqpSession> createSession(String sessionName);
Expand All @@ -61,6 +63,7 @@ public interface AmqpConnection extends Disposable {
* Removes a session with the {@code sessionName} from the AMQP connection.
*
* @param sessionName Name of the session to remove.
*
* @return {@code true} if a session with the name was removed; {@code false} otherwise.
*/
boolean removeSession(String sessionName);
Expand All @@ -79,4 +82,26 @@ public interface AmqpConnection extends Disposable {
* @return A stream of shutdown signals that occur in the AMQP endpoint.
*/
Flux<AmqpShutdownSignal> getShutdownSignals();

/**
* Gets or creates the management node.
*
* @param entityPath Entity for which to get the management node of.
*
* @return A Mono that completes with the management node.
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws UnsupportedOperationException if there is no implementation of fetching a management node.
*/
default Mono<AmqpManagementNode> getManagementNode(String entityPath) {
return Mono.error(new UnsupportedOperationException("This has not been implemented."));
}

/**
* Disposes of the AMQP connection.
*
* @return Mono that completes when the close operation is complete.
*/
default Mono<Void> closeAsync() {
return Mono.fromRunnable(this::dispose);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package com.azure.core.amqp;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* Represents a unidirectional AMQP link.
*/
public interface AmqpLink extends Disposable {
public interface AmqpLink extends Disposable, AsyncCloseable {

/**
* Gets the name of the link.
*
Expand Down Expand Up @@ -39,4 +42,13 @@ public interface AmqpLink extends Disposable {
* @return A stream of endpoint states for the AMQP link.
*/
Flux<AmqpEndpointState> getEndpointStates();

/**
* Disposes of the AMQP link.
*
* @return A mono that completes when the link is disposed.
*/
default Mono<Void> closeAsync() {
return Mono.fromRunnable(() -> dispose());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.DeliveryOutcome;
import com.azure.core.util.AsyncCloseable;
import reactor.core.publisher.Mono;

/**
* An AMQP endpoint that allows users to perform management and metadata operations on it.
*/
public interface AmqpManagementNode extends AsyncCloseable {
/**
* Sends a message to the management node.
*
* @param message Message to send.
*
* @return Response from management node.
*/
Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message);
conniey marked this conversation as resolved.
Show resolved Hide resolved

/**
* Sends a message to the management node and associates the {@code deliveryOutcome} with that message.
*
* @param message Message to send.
* @param deliveryOutcome Delivery outcome to associate with the message.
*
* @return Response from management node.
*/
Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.amqp;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -13,7 +14,7 @@
/**
* An AMQP session representing bidirectional communication that supports multiple {@link AmqpLink AMQP links}.
*/
public interface AmqpSession extends Disposable {
public interface AmqpSession extends Disposable, AsyncCloseable {
/**
* Gets the name for this AMQP session.
*
Expand Down Expand Up @@ -91,4 +92,9 @@ public interface AmqpSession extends Disposable {
* @return A completable mono.
*/
Mono<Void> rollbackTransaction(AmqpTransaction transaction);

@Override
default Mono<Void> closeAsync() {
return Mono.fromRunnable(() -> dispose());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.amqp;

import com.azure.core.credential.TokenCredential;
import com.azure.core.util.AsyncCloseable;
import reactor.core.publisher.Mono;

import java.time.OffsetDateTime;
Expand All @@ -14,7 +15,7 @@
* @see <a href="https://www.oasis-open.org/committees/download.php/62097/amqp-cbs-v1.0-wd05.doc">
* AMPQ Claims-based Security v1.0</a>
*/
public interface ClaimsBasedSecurityNode extends AutoCloseable {
public interface ClaimsBasedSecurityNode extends AutoCloseable, AsyncCloseable {
/**
* Authorizes the caller with the CBS node to access resources for the {@code audience}.
*
Expand All @@ -31,4 +32,9 @@ public interface ClaimsBasedSecurityNode extends AutoCloseable {
*/
@Override
void close();

@Override
default Mono<Void> closeAsync() {
return Mono.fromRunnable(() -> close());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
Expand All @@ -35,7 +34,6 @@ public class ClaimsBasedSecurityChannel implements ClaimsBasedSecurityNode {
private static final String PUT_TOKEN_OPERATION = "operation";
private static final String PUT_TOKEN_OPERATION_VALUE = "put-token";

private final ClientLogger logger = new ClientLogger(ClaimsBasedSecurityChannel.class);
private final TokenCredential credential;
private final Mono<RequestResponseChannel> cbsChannelMono;
private final CbsAuthorizationType authorizationType;
Expand Down Expand Up @@ -87,9 +85,11 @@ public Mono<OffsetDateTime> authorize(String tokenAudience, String scopes) {

@Override
public void close() {
final RequestResponseChannel channel = cbsChannelMono.block(retryOptions.getTryTimeout());
if (channel != null) {
channel.closeAsync().block();
}
closeAsync().block(retryOptions.getTryTimeout());
}

@Override
public Mono<Void> closeAsync() {
return cbsChannelMono.flatMap(channel -> channel.closeAsync());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@
*/
@Immutable
public class ConnectionOptions {
// These name version keys are used in our properties files to specify client product and version information.
static final String NAME_KEY = "name";
static final String VERSION_KEY = "version";
static final String UNKNOWN = "UNKNOWN";

private final TokenCredential tokenCredential;
private final AmqpTransportType transport;
private final AmqpRetryOptions retryOptions;
private final ProxyOptions proxyOptions;
private final Scheduler scheduler;
private final String fullyQualifiedNamespace;
private final CbsAuthorizationType authorizationType;
private final String authorizationScope;
private final ClientOptions clientOptions;
private final String product;
private final String clientVersion;
Expand Down Expand Up @@ -62,10 +58,10 @@ public class ConnectionOptions {
* {@code proxyOptions} or {@code verifyMode} is null.
*/
public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential,
CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions,
ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
CbsAuthorizationType authorizationType, String authorizationScope, AmqpTransportType transport,
AmqpRetryOptions retryOptions, ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
SslDomain.VerifyMode verifyMode, String product, String clientVersion) {
this(fullyQualifiedNamespace, tokenCredential, authorizationType, transport, retryOptions,
this(fullyQualifiedNamespace, tokenCredential, authorizationType, authorizationScope, transport, retryOptions,
proxyOptions, scheduler, clientOptions, verifyMode, product, clientVersion, fullyQualifiedNamespace,
getPort(transport));
}
Expand Down Expand Up @@ -94,14 +90,15 @@ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCr
* {@code clientOptions}, {@code hostname}, or {@code verifyMode} is null.
*/
public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential,
CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions,
ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
CbsAuthorizationType authorizationType, String authorizationScope, AmqpTransportType transport,
AmqpRetryOptions retryOptions, ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
SslDomain.VerifyMode verifyMode, String product, String clientVersion, String hostname, int port) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' is required.");
this.tokenCredential = Objects.requireNonNull(tokenCredential, "'tokenCredential' is required.");
this.authorizationType = Objects.requireNonNull(authorizationType, "'authorizationType' is required.");
this.authorizationScope = Objects.requireNonNull(authorizationScope, "'authorizationScope' is required.");
conniey marked this conversation as resolved.
Show resolved Hide resolved
this.transport = Objects.requireNonNull(transport, "'transport' is required.");
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' is required.");
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' is required.");
Expand All @@ -115,6 +112,15 @@ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCr
this.clientVersion = Objects.requireNonNull(clientVersion, "'clientVersion' cannot be null.");
}

/**
* Gets the scope to use when authorizing.
*
* @return The scope to use when authorizing.
*/
public String getAuthorizationScope() {
return authorizationScope;
}

/**
* Gets the authorisation type for the CBS node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;

import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -78,8 +77,9 @@ public static Exception toException(String errorCondition, String description, A
case NOT_FOUND:
return distinguishNotFound(description, errorContext);
default:
throw new IllegalArgumentException(String.format(Locale.ROOT, "This condition '%s' is not known.",
condition));
return new AmqpException(false, condition, String.format("errorCondition[%s]. description[%s] "
+ "Condition could not be mapped to a transient condition.",
errorCondition, description), errorContext);
}

return new AmqpException(isTransient, condition, description, errorContext);
Expand Down
Loading