Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eg sync stack testproxy #36656

Merged
merged 47 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
130895c
Add missing sync test cases
billwert Jun 12, 2023
5dc3fca
Enable sync-stack
billwert Jun 23, 2023
2daff85
Remove extra set of aeg-channel-name.
billwert Jun 23, 2023
6a0874a
enable AssertingHttpClient
billwert Jun 23, 2023
b80c1a8
add back missing Context arguments
billwert Jun 29, 2023
043e39a
Don't stop trying dev credentials on failures
billwert Jul 14, 2023
dbc2b6c
Tests now use test proxy and recordings are pushed to assets repo
billwert Sep 6, 2023
18dd529
resolve merge conflicts
srnagar Sep 14, 2023
f9ed9a0
address pr comments
srnagar Sep 14, 2023
64f5e98
fix merge conflicts
srnagar Sep 14, 2023
1b35edc
add exports
srnagar Sep 15, 2023
dcf148a
fix version
srnagar Sep 15, 2023
c6a2bb3
remove unused imports
srnagar Sep 15, 2023
26cc35a
use unreleased test version
srnagar Sep 19, 2023
dc41e49
fix tag
srnagar Sep 19, 2023
fb20989
fix tag
srnagar Sep 19, 2023
5560758
refactor
srnagar Sep 20, 2023
44e43fd
fix spring test after sync stack migration
srnagar Sep 20, 2023
1d2d820
log the test proxy response on error
srnagar Sep 20, 2023
48911db
fix header sanitizer in Service Bus
srnagar Sep 22, 2023
edbe426
fix regex for quantum job tests
srnagar Sep 24, 2023
f4d8493
Add missing sync test cases
billwert Jun 12, 2023
503813c
Enable sync-stack
billwert Jun 23, 2023
a1f0197
Remove extra set of aeg-channel-name.
billwert Jun 23, 2023
5087c06
enable AssertingHttpClient
billwert Jun 23, 2023
b224700
add back missing Context arguments
billwert Jun 29, 2023
e9a00d1
Tests now use test proxy and recordings are pushed to assets repo
billwert Sep 6, 2023
9b5a6eb
address pr comments
srnagar Sep 14, 2023
e01a753
add exports
srnagar Sep 15, 2023
7ec717e
fix version
srnagar Sep 15, 2023
67c932a
use unreleased test version
srnagar Sep 19, 2023
4cffdee
fix tag
srnagar Sep 19, 2023
66939e8
fix tag
srnagar Sep 19, 2023
a7ca1f7
refactor
srnagar Sep 20, 2023
799c12a
fix spring test after sync stack migration
srnagar Sep 20, 2023
507db27
log the test proxy response on error
srnagar Sep 20, 2023
baaf27d
fix header sanitizer in Service Bus
srnagar Sep 22, 2023
5ad9038
fix regex for quantum job tests
srnagar Sep 24, 2023
b29abdd
Put a semaphore around starting the proxy.
billwert Sep 25, 2023
531a43c
make the methods of `TestProxyManager` static and synchronized.
billwert Sep 27, 2023
189b6a3
Use the working directory to search for the root of the repo from.
billwert Sep 29, 2023
7d2ed33
add missing sanitizer setup
billwert Oct 6, 2023
3b8518e
base class has this so it's not needed here
billwert Oct 6, 2023
ceb9d57
removed this in merge accidentally
billwert Oct 6, 2023
10da9a6
resolve merge conflicts
srnagar Oct 9, 2023
6dc76f6
Merge branch 'main' into eg-sync-stack-testproxy
srnagar Oct 10, 2023
dca5125
pin azure-core version
srnagar Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eventgrid/azure-messaging-eventgrid/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
### Other Changes

#### Dependency Updates
-

- Upgraded `azure-core` from `1.42.0` to version `1.43.0`.
- Upgraded `azure-core-http-netty` from `1.13.6` to version `1.13.7`.

Expand Down
6 changes: 6 additions & 0 deletions sdk/eventgrid/azure-messaging-eventgrid/assets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/eventgrid/azure-messaging-eventgrid",
"Tag": "java/eventgrid/azure-messaging-eventgrid_7bb36a1579"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.policy.AddHeadersFromContextPolicy;
import com.azure.core.http.rest.Response;
import com.azure.core.models.CloudEvent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventgrid.implementation.Constants;
Expand All @@ -39,7 +36,6 @@
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;
Expand Down Expand Up @@ -155,7 +151,6 @@
@ServiceClient(builder = EventGridPublisherClientBuilder.class, isAsync = true)
public final class EventGridPublisherAsyncClient<T> {

private static final String PARTNER_CHANNEL_HEADER_NAME = "aeg-channel-name";
private final String hostname;

private final EventGridPublisherClientImpl impl;
Expand Down Expand Up @@ -312,25 +307,9 @@ Mono<Response<Void>> sendEventsWithResponse(Iterable<T> events, String channelNa
if (context == null) {
context = Context.NONE;
}
if (!CoreUtils.isNullOrEmpty(channelName)) {
String requestHttpHeadersKey = AddHeadersFromContextPolicy.AZURE_REQUEST_HTTP_HEADERS_KEY;
Map<Object, Object> keyValues = context.getValues();
if (keyValues != null && keyValues.containsKey(requestHttpHeadersKey)) {
// if the given Context instance already contains custom headers,
// add partner channel header to HttpHeaders
Object value = keyValues.get(requestHttpHeadersKey);
if (value instanceof HttpHeaders) {
HttpHeaders headers = (HttpHeaders) value;
headers.add(PARTNER_CHANNEL_HEADER_NAME, channelName);
}
} else {
context = context.addData(requestHttpHeadersKey,
new HttpHeaders().add(PARTNER_CHANNEL_HEADER_NAME, channelName));
}
}

if (this.eventClass == CloudEvent.class) {
return this.sendCloudEventsWithResponse((Iterable<CloudEvent>) events, context);
return this.sendCloudEventsWithResponse((Iterable<CloudEvent>) events, channelName, context);
} else if (this.eventClass == EventGridEvent.class) {
return this.sendEventGridEventsWithResponse((Iterable<EventGridEvent>) events, context);
} else {
Expand Down Expand Up @@ -395,15 +374,15 @@ Mono<Response<Void>> sendEventGridEventsWithResponse(Iterable<EventGridEvent> ev
.flatMap(list -> this.impl.publishEventGridEventsWithResponseAsync(this.hostname, list, finalContext));
}

Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, String channelName, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, null, finalContext));
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, channelName, finalContext));
}

Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<BinaryData> events, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,21 @@
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.models.CloudEvent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import com.fasterxml.jackson.databind.util.RawValue;

import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder}
Expand Down Expand Up @@ -124,10 +134,19 @@
*/
@ServiceClient(builder = EventGridPublisherClientBuilder.class)
public final class EventGridPublisherClient<T> {
private final ClientLogger logger = new ClientLogger(EventGridPublisherClient.class);

private final EventGridPublisherAsyncClient<T> asyncClient;
EventGridPublisherClient(EventGridPublisherAsyncClient<T> client) {
this.asyncClient = client;
private final EventGridPublisherClientImpl impl;
private final String hostname;
private final Class<T> eventClass;
EventGridPublisherClient(HttpPipeline pipeline, String hostname, EventGridServiceVersion serviceVersion,
Class<T> eventClass) {
this.hostname = hostname;
this.eventClass = eventClass;
this.impl = new EventGridPublisherClientImplBuilder()
.pipeline(pipeline)
.apiVersion(serviceVersion.getVersion())
.buildClient();
}

/**
Expand Down Expand Up @@ -173,8 +192,45 @@ public static String generateSas(String endpoint, AzureKeyCredential keyCredenti
* @throws NullPointerException if events is {@code null}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
@SuppressWarnings("unchecked")
public void sendEvents(Iterable<T> events) {
asyncClient.sendEvents(events).block();
if (this.eventClass == CloudEvent.class) {
this.sendCloudEvents((Iterable<CloudEvent>) events);
} else if (this.eventClass == EventGridEvent.class) {
this.sendEventGridEvents((Iterable<EventGridEvent>) events);
} else {
this.sendCustomEvents((Iterable<BinaryData>) events);
}
}

private void sendCustomEvents(Iterable<BinaryData> events) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}
List<Object> objectEvents = StreamSupport.stream(events.spliterator(), false)
.map(event -> (Object) new RawValue(event.toString()))
.collect(Collectors.toList());
this.impl.publishCustomEventEvents(this.hostname, objectEvents);
}

private void sendEventGridEvents(Iterable<EventGridEvent> events) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}
List<com.azure.messaging.eventgrid.implementation.models.EventGridEvent> eventGridEvents = StreamSupport.stream(events.spliterator(), false)
.map(EventGridEvent::toImpl)
.collect(Collectors.toList());
this.impl.publishEventGridEvents(this.hostname, eventGridEvents);
}

private void sendCloudEvents(Iterable<CloudEvent> events) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}
List<CloudEvent> cloudEvents = StreamSupport.stream(events.spliterator(), false)
.collect(Collectors.toList());
this.impl.publishCloudEventEvents(this.hostname, cloudEvents, null);

}

/**
Expand All @@ -187,9 +243,10 @@ public void sendEvents(Iterable<T> events) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendEventsWithResponse(Iterable<T> events, Context context) {
return asyncClient.sendEventsWithResponse(events, context).block();
return this.sendEventsWithResponse(events, null, context);
}


/**
* Publishes the given events to the set topic or domain and gives the response issued by EventGrid.
* @param events the events to publish.
Expand All @@ -202,9 +259,46 @@ public Response<Void> sendEventsWithResponse(Iterable<T> events, Context context
* @throws NullPointerException if events is {@code null}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendEventsWithResponse(Iterable<T> events, String channelName,
Context context) {
return asyncClient.sendEventsWithResponse(events, channelName, context).block();
@SuppressWarnings("unchecked")
public Response<Void> sendEventsWithResponse(Iterable<T> events, String channelName, Context context) {
if (this.eventClass == CloudEvent.class) {
return this.sendCloudEventsWithResponse((Iterable<CloudEvent>) events, channelName, context);
} else if (this.eventClass == EventGridEvent.class) {
return this.sendEventGridEventsWithResponse((Iterable<EventGridEvent>) events, context);
} else {
return this.sendCustomEventsWithResponse((Iterable<BinaryData>) events, context);
}
}

private Response<Void> sendCustomEventsWithResponse(Iterable<BinaryData> events, Context context) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}
List<Object> objectEvents = StreamSupport.stream(events.spliterator(), false)
.map(event -> (Object) new RawValue(event.toString()))
.collect(Collectors.toList());
return this.impl.publishCustomEventEventsWithResponse(this.hostname, objectEvents, context);
}

private Response<Void> sendEventGridEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}

List<com.azure.messaging.eventgrid.implementation.models.EventGridEvent> eventGridEvents = StreamSupport.stream(events.spliterator(), false)
.map(EventGridEvent::toImpl)
.collect(Collectors.toList());
return this.impl.publishEventGridEventsWithResponse(this.hostname, eventGridEvents, context);
}

private Response<Void> sendCloudEventsWithResponse(Iterable<CloudEvent> events, String channelName, Context context) {
if (events == null) {
throw logger.logExceptionAsError(new NullPointerException("'events' cannot be null."));
}

List<CloudEvent> cloudEvents = StreamSupport.stream(events.spliterator(), false)
.collect(Collectors.toList());
return this.impl.publishCloudEventEventsWithResponse(this.hostname, cloudEvents, channelName, context);
}

/**
Expand All @@ -215,7 +309,7 @@ public Response<Void> sendEventsWithResponse(Iterable<T> events, String channelN
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void sendEvent(T event) {

asyncClient.sendEvent(event).block();
List<T> events = Collections.singletonList(event);
this.sendEvents(events);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,21 @@ public EventGridPublisherClientBuilder() {
*/
private <T> EventGridPublisherAsyncClient<T> buildAsyncClient(Class<T> eventClass) {
Objects.requireNonNull(endpoint, "'endpoint' is required and can not be null.");

return new EventGridPublisherAsyncClient<T>((httpPipeline != null ? httpPipeline : getHttpPipeline()),
endpoint,
getEventGridServiceVersion(),
eventClass);
}

private EventGridServiceVersion getEventGridServiceVersion() {
EventGridServiceVersion buildServiceVersion = serviceVersion == null
? EventGridServiceVersion.getLatest()
: serviceVersion;
return buildServiceVersion;
}

if (httpPipeline != null) {
return new EventGridPublisherAsyncClient<T>(httpPipeline, endpoint, buildServiceVersion, eventClass);
}

private HttpPipeline getHttpPipeline() {
Configuration buildConfiguration = (configuration == null)
? Configuration.getGlobalConfiguration()
: configuration;
Expand Down Expand Up @@ -216,20 +223,21 @@ private <T> EventGridPublisherAsyncClient<T> buildAsyncClient(Class<T> eventClas
.clientOptions(clientOptions)
.tracer(tracer)
.build();


return new EventGridPublisherAsyncClient<T>(buildPipeline, endpoint, buildServiceVersion, eventClass);
return buildPipeline;
}

/**
* Build a publisher client with synchronous publishing methods and the current settings. Endpoint and a credential
* must be set (either keyCredential or sharedAccessSignatureCredential), all other settings have defaults and/or are optional.
* Note that currently the asynchronous client created by the method above is the recommended version for higher
* performance, as the synchronous client simply blocks on the same asynchronous calls.
* @return a publisher client with synchronous publishing methods.
*/
private <T> EventGridPublisherClient<T> buildClient(Class<T> eventClass) {
return new EventGridPublisherClient<T>(buildAsyncClient(eventClass));
Objects.requireNonNull(endpoint, "'endpoint' is required and can not be null.");

return new EventGridPublisherClient<T>((httpPipeline != null ? httpPipeline : getHttpPipeline()),
endpoint,
getEventGridServiceVersion(),
eventClass);
}

@Override
Expand Down
Loading