diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index c9200f744bac8..488bec433021b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -36,8 +36,8 @@ com.azure - azure-core - 1.6.0 + azure-core-experimental + 1.0.0-beta.1 com.azure diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java index 5656e9e588bc9..0c00c696453e3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java @@ -3,38 +3,9 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.AmqpMessageConstant; -import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.implementation.AmqpConstants; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.TracerProvider; -import com.azure.core.util.Context; -import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.tracing.ProcessKind; -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.message.Message; -import reactor.core.publisher.Signal; - -import java.nio.BufferOverflowException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import java.util.Optional; - -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; -import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; -import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; -import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; -import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; -import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; /** * A class for aggregating {@link EventData} into a single, size-limited, batch. It is treated as a single message when @@ -45,59 +16,10 @@ * @see EventHubClientBuilder See EventHubClientBuilder for examples of building an asynchronous or synchronous * producer. */ -public final class EventDataBatch { - private final ClientLogger logger = new ClientLogger(EventDataBatch.class); - private final Object lock = new Object(); - private final int maxMessageSize; - private final String partitionKey; - private final ErrorContextProvider contextProvider; - private final List events; - private final byte[] eventBytes; - private final String partitionId; - private int sizeInBytes; - private final TracerProvider tracerProvider; - private final String entityPath; - private final String hostname; - +public final class EventDataBatch extends EventDataBatchBase { EventDataBatch(int maxMessageSize, String partitionId, String partitionKey, ErrorContextProvider contextProvider, TracerProvider tracerProvider, String entityPath, String hostname) { - this.maxMessageSize = maxMessageSize; - this.partitionKey = partitionKey; - this.partitionId = partitionId; - this.contextProvider = contextProvider; - this.events = new LinkedList<>(); - this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB - this.eventBytes = new byte[maxMessageSize]; - this.tracerProvider = tracerProvider; - this.entityPath = entityPath; - this.hostname = hostname; - } - - /** - * Gets the number of {@link EventData events} in the batch. - * - * @return The number of {@link EventData events} in the batch. - */ - public int getCount() { - return events.size(); - } - - /** - * Gets the maximum size, in bytes, of the {@link EventDataBatch}. - * - * @return The maximum size, in bytes, of the {@link EventDataBatch}. - */ - public int getMaxSizeInBytes() { - return maxMessageSize; - } - - /** - * Gets the size of the {@link EventDataBatch} in bytes. - * - * @return the size of the {@link EventDataBatch} in bytes. - */ - public int getSizeInBytes() { - return this.sizeInBytes; + super(maxMessageSize, partitionId, partitionKey, contextProvider, tracerProvider, entityPath, hostname); } /** @@ -110,177 +32,6 @@ public int getSizeInBytes() { * @throws AmqpException if {@code eventData} is larger than the maximum size of the {@link EventDataBatch}. */ public boolean tryAdd(final EventData eventData) { - if (eventData == null) { - throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null")); - } - EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData; - - final int size; - try { - size = getSize(event, events.isEmpty()); - } catch (BufferOverflowException exception) { - throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, - String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", - maxMessageSize / 1024), - contextProvider.getErrorContext())); - } - - synchronized (lock) { - if (this.sizeInBytes + size > this.maxMessageSize) { - return false; - } - - this.sizeInBytes += size; - } - - this.events.add(event); - return true; - } - - /** - * Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the message. - * - * @param eventData The Event to add tracing span for. - * @return the updated event data object. - */ - private EventData traceMessageSpan(EventData eventData) { - Optional eventContextData = eventData.getContext().getData(SPAN_CONTEXT_KEY); - if (eventContextData.isPresent()) { - // if message has context (in case of retries), don't start a message span or add a new context - return eventData; - } else { - // Starting the span makes the sampling decision (nothing is logged at this time) - Context eventContext = eventData.getContext() - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE) - .addData(ENTITY_PATH_KEY, this.entityPath) - .addData(HOST_NAME_KEY, this.hostname); - Context eventSpanContext = tracerProvider.startSpan(eventContext, ProcessKind.MESSAGE); - Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); - if (eventDiagnosticIdOptional.isPresent()) { - eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString()); - tracerProvider.endSpan(eventSpanContext, Signal.complete()); - eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext); - } - } - - return eventData; - } - - List getEvents() { - return events; - } - - String getPartitionKey() { - return partitionKey; - } - - String getPartitionId() { - return partitionId; - } - - private int getSize(final EventData eventData, final boolean isFirst) { - Objects.requireNonNull(eventData, "'eventData' cannot be null."); - - final Message amqpMessage = createAmqpMessage(eventData, partitionKey); - int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size - eventSize += 16; // data section overhead - - if (isFirst) { - amqpMessage.setBody(null); - amqpMessage.setApplicationProperties(null); - amqpMessage.setProperties(null); - amqpMessage.setDeliveryAnnotations(null); - - eventSize += amqpMessage.encode(this.eventBytes, 0, maxMessageSize); - } - - return eventSize; - } - - /* - * Creates the AMQP message represented by the event data - */ - private Message createAmqpMessage(EventData event, String partitionKey) { - final Message message = Proton.message(); - - if (event.getProperties() != null && !event.getProperties().isEmpty()) { - final ApplicationProperties applicationProperties = new ApplicationProperties(event.getProperties()); - message.setApplicationProperties(applicationProperties); - } - - if (event.getSystemProperties() != null) { - event.getSystemProperties().forEach((key, value) -> { - if (EventData.RESERVED_SYSTEM_PROPERTIES.contains(key)) { - return; - } - - final AmqpMessageConstant constant = AmqpMessageConstant.fromString(key); - - if (constant != null) { - switch (constant) { - case MESSAGE_ID: - message.setMessageId(value); - break; - case USER_ID: - message.setUserId((byte[]) value); - break; - case TO: - message.setAddress((String) value); - break; - case SUBJECT: - message.setSubject((String) value); - break; - case REPLY_TO: - message.setReplyTo((String) value); - break; - case CORRELATION_ID: - message.setCorrelationId(value); - break; - case CONTENT_TYPE: - message.setContentType((String) value); - break; - case CONTENT_ENCODING: - message.setContentEncoding((String) value); - break; - case ABSOLUTE_EXPIRY_TIME: - message.setExpiryTime((long) value); - break; - case CREATION_TIME: - message.setCreationTime((long) value); - break; - case GROUP_ID: - message.setGroupId((String) value); - break; - case GROUP_SEQUENCE: - message.setGroupSequence((long) value); - break; - case REPLY_TO_GROUP_ID: - message.setReplyToGroupId((String) value); - break; - default: - throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US, - "Property is not a recognized reserved property name: %s", key))); - } - } else { - final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null) - ? new MessageAnnotations(new HashMap<>()) - : message.getMessageAnnotations(); - messageAnnotations.getValue().put(Symbol.getSymbol(key), value); - message.setMessageAnnotations(messageAnnotations); - } - }); - } - - if (partitionKey != null) { - final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null) - ? new MessageAnnotations(new HashMap<>()) - : message.getMessageAnnotations(); - messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey); - message.setMessageAnnotations(messageAnnotations); - } - - message.setBody(new Data(new Binary(event.getBody()))); - - return message; + return super.tryAdd(eventData); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatchBase.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatchBase.java new file mode 100644 index 0000000000000..6964ec79c1719 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatchBase.java @@ -0,0 +1,284 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.AmqpMessageConstant; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.AmqpConstants; +import com.azure.core.amqp.implementation.ErrorContextProvider; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.ProcessKind; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; +import reactor.core.publisher.Signal; + +import java.nio.BufferOverflowException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; +import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; + +/** + * Base class containing common implementation for batch sending. + * + * Implemented by {@link ObjectBatch} and {@link EventDataBatch}. + */ +abstract class EventDataBatchBase { + private final ClientLogger logger = new ClientLogger(this.getClass()); + private final Object lock = new Object(); + private final int maxMessageSize; + private final String partitionKey; + private final ErrorContextProvider contextProvider; + private final List events; + private final byte[] eventBytes; + private final String partitionId; + private int sizeInBytes; + private final TracerProvider tracerProvider; + private final String entityPath; + private final String hostname; + + EventDataBatchBase(int maxMessageSize, String partitionId, String partitionKey, + ErrorContextProvider contextProvider, TracerProvider tracerProvider, String entityPath, + String hostname) { + this.maxMessageSize = maxMessageSize; + this.partitionKey = partitionKey; + this.partitionId = partitionId; + this.contextProvider = contextProvider; + this.events = new LinkedList<>(); + this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB + this.eventBytes = new byte[maxMessageSize]; + this.tracerProvider = tracerProvider; + this.entityPath = entityPath; + this.hostname = hostname; + } + + /** + * Gets the number of {@link EventData events} in the batch. + * + * @return The number of {@link EventData events} in the batch. + */ + public int getCount() { + return events.size(); + } + + /** + * Gets the maximum size, in bytes, of the {@link EventDataBatch}. + * + * @return The maximum size, in bytes, of the {@link EventDataBatch}. + */ + public int getMaxSizeInBytes() { + return maxMessageSize; + } + + /** + * Gets the size of the {@link EventDataBatch} in bytes. + * + * @return the size of the {@link EventDataBatch} in bytes. + */ + public int getSizeInBytes() { + return this.sizeInBytes; + } + + /** + * Tries to add an {@link EventData event} to the batch. + * + * @param eventData The {@link EventData} to add to the batch. + * @return {@code true} if the event could be added to the batch; {@code false} if the event was too large to fit in + * the batch. + * @throws IllegalArgumentException if {@code eventData} is {@code null}. + * @throws AmqpException if {@code eventData} is larger than the maximum size of the {@link EventDataBatch}. + */ + public boolean tryAdd(final EventData eventData) { + if (eventData == null) { + throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null")); + } + EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData; + + final int size; + try { + size = getSize(event, events.isEmpty()); + } catch (BufferOverflowException exception) { + throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, + String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", + maxMessageSize / 1024), + contextProvider.getErrorContext())); + } + + synchronized (lock) { + if (this.sizeInBytes + size > this.maxMessageSize) { + return false; + } + + this.sizeInBytes += size; + } + + this.events.add(event); + return true; + } + + + /** + * Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the message. + * + * @param eventData The Event to add tracing span for. + * @return the updated event data object. + */ + EventData traceMessageSpan(EventData eventData) { + Optional eventContextData = eventData.getContext().getData(SPAN_CONTEXT_KEY); + if (eventContextData.isPresent()) { + // if message has context (in case of retries), don't start a message span or add a new context + return eventData; + } else { + // Starting the span makes the sampling decision (nothing is logged at this time) + Context eventContext = eventData.getContext() + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE) + .addData(ENTITY_PATH_KEY, this.entityPath) + .addData(HOST_NAME_KEY, this.hostname); + Context eventSpanContext = tracerProvider.startSpan(eventContext, ProcessKind.MESSAGE); + Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); + if (eventDiagnosticIdOptional.isPresent()) { + eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString()); + tracerProvider.endSpan(eventSpanContext, Signal.complete()); + eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext); + } + } + + return eventData; + } + + List getEvents() { + return events; + } + + String getPartitionKey() { + return partitionKey; + } + + String getPartitionId() { + return partitionId; + } + + int getSize(final EventData eventData, final boolean isFirst) { + Objects.requireNonNull(eventData, "'eventData' cannot be null."); + + final Message amqpMessage = createAmqpMessage(eventData, partitionKey); + int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size + eventSize += 16; // data section overhead + + if (isFirst) { + amqpMessage.setBody(null); + amqpMessage.setApplicationProperties(null); + amqpMessage.setProperties(null); + amqpMessage.setDeliveryAnnotations(null); + + eventSize += amqpMessage.encode(this.eventBytes, 0, maxMessageSize); + } + + return eventSize; + } + + /* + * Creates the AMQP message represented by the event data + */ + private Message createAmqpMessage(EventData event, String partitionKey) { + final Message message = Proton.message(); + + if (event.getProperties() != null && !event.getProperties().isEmpty()) { + final ApplicationProperties applicationProperties = new ApplicationProperties(event.getProperties()); + message.setApplicationProperties(applicationProperties); + } + + if (event.getSystemProperties() != null) { + event.getSystemProperties().forEach((key, value) -> { + if (EventData.RESERVED_SYSTEM_PROPERTIES.contains(key)) { + return; + } + + final AmqpMessageConstant constant = AmqpMessageConstant.fromString(key); + + if (constant != null) { + switch (constant) { + case MESSAGE_ID: + message.setMessageId(value); + break; + case USER_ID: + message.setUserId((byte[]) value); + break; + case TO: + message.setAddress((String) value); + break; + case SUBJECT: + message.setSubject((String) value); + break; + case REPLY_TO: + message.setReplyTo((String) value); + break; + case CORRELATION_ID: + message.setCorrelationId(value); + break; + case CONTENT_TYPE: + message.setContentType((String) value); + break; + case CONTENT_ENCODING: + message.setContentEncoding((String) value); + break; + case ABSOLUTE_EXPIRY_TIME: + message.setExpiryTime((long) value); + break; + case CREATION_TIME: + message.setCreationTime((long) value); + break; + case GROUP_ID: + message.setGroupId((String) value); + break; + case GROUP_SEQUENCE: + message.setGroupSequence((long) value); + break; + case REPLY_TO_GROUP_ID: + message.setReplyToGroupId((String) value); + break; + default: + throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US, + "Property is not a recognized reserved property name: %s", key))); + } + } else { + final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null) + ? new MessageAnnotations(new HashMap<>()) + : message.getMessageAnnotations(); + messageAnnotations.getValue().put(Symbol.getSymbol(key), value); + message.setMessageAnnotations(messageAnnotations); + } + }); + } + + if (partitionKey != null) { + final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null) + ? new MessageAnnotations(new HashMap<>()) + : message.getMessageAnnotations(); + messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey); + message.setMessageAnnotations(messageAnnotations); + } + + message.setBody(new Data(new Binary(event.getBody()))); + + return message; + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java index 3a0598a08da51..27191d8d1f985 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java @@ -5,6 +5,7 @@ import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; @@ -32,9 +33,11 @@ class EventHubAsyncClient implements Closeable { private final boolean isSharedConnection; private final Runnable onClientClose; private final TracerProvider tracerProvider; + private final ObjectSerializer serializer; EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider, - MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose) { + MessageSerializer messageSerializer, ObjectSerializer serializer, Scheduler scheduler, + boolean isSharedConnection, Runnable onClientClose) { this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null."); this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); this.connectionProcessor = Objects.requireNonNull(connectionProcessor, @@ -43,6 +46,7 @@ class EventHubAsyncClient implements Closeable { this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null."); this.isSharedConnection = isSharedConnection; + this.serializer = serializer; } /** @@ -104,8 +108,8 @@ Mono getPartitionProperties(String partitionId) { */ EventHubProducerAsyncClient createProducer() { return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), - connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer, scheduler, - isSharedConnection, onClientClose); + connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer, + serializer, scheduler, isSharedConnection, onClientClose); } /** @@ -129,8 +133,8 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou } return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), - connectionProcessor, messageSerializer, consumerGroup, prefetchCount, scheduler, isSharedConnection, - onClientClose); + connectionProcessor, messageSerializer, serializer, consumerGroup, prefetchCount, scheduler, + isSharedConnection, onClientClose); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 009168ce8e933..76b66e4b5f17e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -20,6 +20,7 @@ import com.azure.core.annotation.ServiceClientBuilder; import com.azure.core.credential.TokenCredential; import com.azure.core.exception.AzureException; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.Configuration; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -135,6 +136,7 @@ public class EventHubClientBuilder { private String consumerGroup; private EventHubConnectionProcessor eventHubConnectionProcessor; private int prefetchCount; + private ObjectSerializer serializer; /** * Keeps track of the open clients that were created from this builder when there is a shared connection. @@ -361,6 +363,18 @@ public EventHubClientBuilder prefetchCount(int prefetchCount) { return this; } + /** + * Set ObjectSerializer implementation to be used for creating ObjectBatch. + * + * @param serializer ObjectSerializer implementation + * + * @return updated builder instance + */ + public EventHubClientBuilder serializer(ObjectSerializer serializer) { + this.serializer = serializer; + return this; + } + /** * Package-private method that sets the scheduler for the created Event Hub client. * @@ -485,7 +499,7 @@ EventHubAsyncClient buildAsyncClient() { final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); - return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, scheduler, + return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, serializer, scheduler, isSharedConnection.get(), this::onClientClose); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java index 335fc7d3a9735..ae986f147a2c0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java @@ -11,6 +11,7 @@ import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; @@ -70,6 +71,7 @@ public class EventHubConsumerAsyncClient implements Closeable { private final String eventHubName; private final EventHubConnectionProcessor connectionProcessor; private final MessageSerializer messageSerializer; + private final ObjectSerializer serializer; private final String consumerGroup; private final int prefetchCount; private final Scheduler scheduler; @@ -83,12 +85,15 @@ public class EventHubConsumerAsyncClient implements Closeable { new ConcurrentHashMap<>(); EventHubConsumerAsyncClient(String fullyQualifiedNamespace, String eventHubName, - EventHubConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, String consumerGroup, - int prefetchCount, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClosed) { + EventHubConnectionProcessor connectionProcessor, + MessageSerializer messageSerializer, ObjectSerializer serializer, + String consumerGroup, int prefetchCount, Scheduler scheduler, + boolean isSharedConnection, Runnable onClientClosed) { this.fullyQualifiedNamespace = fullyQualifiedNamespace; this.eventHubName = eventHubName; this.connectionProcessor = connectionProcessor; this.messageSerializer = messageSerializer; + this.serializer = serializer; this.consumerGroup = consumerGroup; this.prefetchCount = prefetchCount; this.scheduler = scheduler; @@ -364,7 +369,7 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName, new AmqpReceiveLinkProcessor(prefetchCount, retryPolicy, connectionProcessor)); return new EventHubPartitionAsyncConsumer(linkMessageProcessor, messageSerializer, getFullyQualifiedNamespace(), - getEventHubName(), consumerGroup, partitionId, initialPosition, - receiveOptions.getTrackLastEnqueuedEventProperties(), scheduler); + getEventHubName(), consumerGroup, partitionId, serializer, + initialPosition, receiveOptions.getTrackLastEnqueuedEventProperties(), scheduler); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java index d0b070343d4d6..950d4c532674c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java @@ -4,6 +4,7 @@ package com.azure.messaging.eventhubs; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor; import com.azure.messaging.eventhubs.models.EventPosition; @@ -39,13 +40,16 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { private final Scheduler scheduler; private final EmitterProcessor emitterProcessor; private final EventPosition initialPosition; + private final ObjectSerializer serializer; private volatile Long currentOffset; EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor, - MessageSerializer messageSerializer, String fullyQualifiedNamespace, String eventHubName, String consumerGroup, - String partitionId, AtomicReference> currentEventPosition, - boolean trackLastEnqueuedEventProperties, Scheduler scheduler) { + MessageSerializer messageSerializer, String fullyQualifiedNamespace, + String eventHubName, String consumerGroup, String partitionId, + ObjectSerializer serializer, + AtomicReference> currentEventPosition, + boolean trackLastEnqueuedEventProperties, Scheduler scheduler) { this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(), "'currentEventPosition.get().get()' cannot be null."); this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor; @@ -54,6 +58,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { this.eventHubName = eventHubName; this.consumerGroup = consumerGroup; this.partitionId = partitionId; + this.serializer = serializer; this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties; this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null."); @@ -138,6 +143,6 @@ private PartitionEvent onMessageReceived(Message message) { final PartitionContext partitionContext = new PartitionContext(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); - return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get()); + return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get(), this.serializer); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index f2a81b0ba5bd5..f7906c6a573a5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -15,6 +15,7 @@ import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -110,6 +111,7 @@ public class EventHubProducerAsyncClient implements Closeable { private final Scheduler scheduler; private final boolean isSharedConnection; private final Runnable onClientClose; + private final ObjectSerializer serializer; /** * Creates a new instance of this {@link EventHubProducerAsyncClient} that can send messages to a single partition @@ -117,8 +119,10 @@ public class EventHubProducerAsyncClient implements Closeable { * load balance the messages amongst available partitions. */ EventHubProducerAsyncClient(String fullyQualifiedNamespace, String eventHubName, - EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider, - MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose) { + EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, + TracerProvider tracerProvider, MessageSerializer messageSerializer, + ObjectSerializer serializer, Scheduler scheduler, boolean isSharedConnection, + Runnable onClientClose) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null."); @@ -132,6 +136,7 @@ public class EventHubProducerAsyncClient implements Closeable { this.retryPolicy = getRetryPolicy(retryOptions); this.scheduler = scheduler; this.isSharedConnection = isSharedConnection; + this.serializer = serializer; } /** @@ -208,6 +213,11 @@ public Mono createBatch(CreateBatchOptions options) { return monoError(logger, new NullPointerException("'options' cannot be null.")); } + Mono optionsError = validateBatchOptions(options); + if (optionsError != null) { + return optionsError; + } + final String partitionKey = options.getPartitionKey(); final String partitionId = options.getPartitionId(); final int batchMaxSize = options.getMaximumSizeInBytes(); @@ -248,6 +258,71 @@ public Mono createBatch(CreateBatchOptions options) { })); } + /** + * Creates an {@link ObjectBatch} that can fit as many serialized objects as events as the transport allows. + * @param objectType type of object in the batch + * @param object type + * + * @return A new {@link ObjectBatch} that can fit as many serialized objects as events as the transport allows. + */ + public Mono> createBatch(Class objectType) { + return createBatch(objectType, DEFAULT_BATCH_OPTIONS); + } + + /** + * Creates an {@link ObjectBatch} configured with the options specified. + * + * @param objectType type of object in the batch + * @param object type + * @param options A set of options used to configure the {@link ObjectBatch}. + * @return A new {@link ObjectBatch} that can fit as many events as the transport allows. + * @throws NullPointerException if {@code options} is null. + */ + public Mono> createBatch(Class objectType, CreateBatchOptions options) { + if (objectType == null) { + return monoError(logger, new IllegalArgumentException("'objectType' cannot be null.")); + } + if (serializer == null) { + return monoError(logger, + new NullPointerException("No serializer set for performing object serialization for ObjectBatch.")); + } + if (options == null) { + return monoError(logger, new NullPointerException("'options' cannot be null.")); + } + + Mono> optionsError = validateBatchOptions(options); + if (optionsError != null) { + return optionsError; + } + + final String partitionKey = options.getPartitionKey(); + final String partitionId = options.getPartitionId(); + final int batchMaxSize = options.getMaximumSizeInBytes(); + + return getSendLink(partitionId) + .flatMap(link -> link.getLinkSize() + .flatMap(size -> { + final int maximumLinkSize = size > 0 + ? size + : MAX_MESSAGE_LENGTH_BYTES; + + if (batchMaxSize > maximumLinkSize) { + return monoError(logger, + new IllegalArgumentException(String.format(Locale.US, + "BatchOptions.maximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", + batchMaxSize, maximumLinkSize))); + } + + final int batchSize = batchMaxSize > 0 + ? batchMaxSize + : maximumLinkSize; + + return Mono.just(new ObjectBatch<>(batchSize, partitionId, partitionKey, objectType, + link::getErrorContext, tracerProvider, serializer, + link.getEntityPath(), link.getHostname())); + })); + } + /** * Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size * allowed, an exception will be triggered and the send will fail. @@ -510,6 +585,22 @@ private Mono getSendLink(String partitionId) { .flatMap(connection -> connection.createSendLink(linkName, entityPath, retryOptions)); } + private Mono validateBatchOptions(CreateBatchOptions options) { + if (!CoreUtils.isNullOrEmpty(options.getPartitionKey()) + && !CoreUtils.isNullOrEmpty(options.getPartitionId())) { + return monoError(logger, new IllegalArgumentException(String.format(Locale.US, + "CreateBatchOptions.getPartitionKey() and CreateBatchOptions.getPartitionId() are both set. " + + "Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'", + options.getPartitionKey(), options.getPartitionId()))); + } else if (!CoreUtils.isNullOrEmpty(options.getPartitionKey()) + && options.getPartitionKey().length() > MAX_PARTITION_KEY_LENGTH) { + return monoError(logger, new IllegalArgumentException(String.format(Locale.US, + "Partition key '%s' exceeds the maximum allowed length: '%s'.", options.getPartitionKey(), + MAX_PARTITION_KEY_LENGTH))); + } + return null; + } + /** * Disposes of the {@link EventHubProducerAsyncClient}. If the client had a dedicated connection, the underlying * connection is also closed. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/ObjectBatch.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/ObjectBatch.java new file mode 100644 index 0000000000000..587e98d31f68d --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/ObjectBatch.java @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.ErrorContextProvider; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.experimental.serializer.ObjectSerializer; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayOutputStream; +import java.util.Objects; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * A class for aggregating Java objects into a single, size-limited, batch. Objects are serialized into EventData + * objects and are added to the batch. It is treated as a single message when sent to the Azure Event Hubs service. + * + * @param type of objects in the batch. Multi-type batches are not permitted. + */ +public final class ObjectBatch extends EventDataBatchBase { + private final ClientLogger logger = new ClientLogger(ObjectBatch.class); + private final Class batchType; + private final ObjectSerializer serializer; + + ObjectBatch(int maxMessageSize, String partitionId, String partitionKey, Class batchType, + ErrorContextProvider contextProvider, TracerProvider tracerProvider, + ObjectSerializer serializer, String entityPath, String hostname) { + super(maxMessageSize, partitionId, partitionKey, contextProvider, tracerProvider, entityPath, hostname); + this.batchType = Objects.requireNonNull(batchType, "'batchType' cannot be null."); + this.serializer = Objects.requireNonNull(serializer, "'serializer' cannot be null."); + } + + /** + * Tries to asynchronously serialize an object into an EventData payload and add the EventData to the batch. + * + * @param object The object to add to the batch. + * @return {@code true} if the object could be added to the batch; {@code false} if the serialized + * object was too large to fit in the batch. + * @throws IllegalArgumentException if object is {@code null}. + * @throws AmqpException if serialized object as {@link EventData} is larger than the maximum size + * of the {@link EventDataBatch}. + */ + public Mono tryAdd(final T object) { + if (object == null) { + return monoError(logger, new IllegalArgumentException("object cannot be null")); + } + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + return serializer.serialize(outputStream, object).map(s -> tryAdd(new EventData(s.toByteArray()))); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java index 838025b1eff4e..cc7f9b2dcf61c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java @@ -4,10 +4,16 @@ package com.azure.messaging.eventhubs.models; import com.azure.core.annotation.Immutable; +import com.azure.core.experimental.serializer.ObjectSerializer; +import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventData; +import reactor.core.publisher.Mono; +import java.io.ByteArrayInputStream; import java.util.Objects; +import static com.azure.core.util.FluxUtil.monoError; + /** * A container for {@link EventData} along with the partition information for this event data. */ @@ -17,6 +23,21 @@ public class PartitionEvent { private final PartitionContext partitionContext; private final EventData eventData; private final LastEnqueuedEventProperties lastEnqueuedEventProperties; + private final ObjectSerializer serializer; + private final ClientLogger logger = new ClientLogger(PartitionEvent.class); + + /** + * Creates an instance of PartitionEvent. + * + * @param partitionContext The partition information associated with the event data. + * @param eventData The event data received from the Event Hub. + * @param lastEnqueuedEventProperties The properties of the last enqueued event in the partition. + * @throws NullPointerException if {@code partitionContext} or {@code eventData} is {@code null}. + */ + public PartitionEvent(final PartitionContext partitionContext, final EventData eventData, + LastEnqueuedEventProperties lastEnqueuedEventProperties) { + this(partitionContext, eventData, lastEnqueuedEventProperties, null); + } /** * Creates an instance of PartitionEvent. @@ -24,13 +45,15 @@ public class PartitionEvent { * @param partitionContext The partition information associated with the event data. * @param eventData The event data received from the Event Hub. * @param lastEnqueuedEventProperties The properties of the last enqueued event in the partition. + * @param serializer ObjectSerializer implementation for deserializing event data payload. May be null. * @throws NullPointerException if {@code partitionContext} or {@code eventData} is {@code null}. */ public PartitionEvent(final PartitionContext partitionContext, final EventData eventData, - LastEnqueuedEventProperties lastEnqueuedEventProperties) { + LastEnqueuedEventProperties lastEnqueuedEventProperties, ObjectSerializer serializer) { this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null"); this.eventData = Objects.requireNonNull(eventData, "'eventData' cannot be null"); this.lastEnqueuedEventProperties = lastEnqueuedEventProperties; + this.serializer = serializer; } /** @@ -59,4 +82,23 @@ public EventData getData() { public LastEnqueuedEventProperties getLastEnqueuedEventProperties() { return lastEnqueuedEventProperties; } + + /** + * Deserializes event payload into object. + * + * @param objectType Class object of type T + * @param object type for deserialization + * @return deserialized object as type T + */ + public Mono getDeserializedObject(Class objectType) { + if (this.serializer == null) { + return monoError(logger, + new NullPointerException("No serializer set for deserializing EventData payload.")); + } + if (objectType == null) { + return monoError(logger, new IllegalArgumentException("objectType cannot be null.")); + } + + return serializer.deserialize(new ByteArrayInputStream(eventData.getBody()), objectType); + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java index 0386dee075a72..e743ad96cb64a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java @@ -3,6 +3,7 @@ module com.azure.messaging.eventhubs { requires transitive com.azure.core; + requires transitive com.azure.core.experimental; requires transitive com.azure.core.amqp; requires com.microsoft.azure.qpid.protonj.extensions; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchBaseTest.java similarity index 98% rename from sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java rename to sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchBaseTest.java index ea85afedd71f0..8a8a529ff7ab7 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchBaseTest.java @@ -20,7 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; -public class EventDataBatchTest { +public class EventDataBatchBaseTest { private static final String PARTITION_KEY = "PartitionIDCopyFromProducerOption"; @Mock diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java index c37f1686c710d..ce9461c31e179 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java @@ -136,7 +136,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS .subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-name", connectionOptions.getRetry())); - consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null, CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed); } @@ -156,7 +156,7 @@ void teardown() { @Test void lastEnqueuedEventInformationIsNull() { final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, parallelScheduler, false, onClientClosed); + connectionProcessor, messageSerializer, null, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, parallelScheduler, false, onClientClosed); final int numberOfEvents = 10; when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); final int numberToReceive = 3; @@ -179,7 +179,7 @@ void lastEnqueuedEventInformationIsNull() { void lastEnqueuedEventInformationCreated() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, Schedulers.parallel(), false, onClientClosed); + connectionProcessor, messageSerializer, null, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, Schedulers.parallel(), false, onClientClosed); final int numberOfEvents = 10; final ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); @@ -232,7 +232,7 @@ void receivesNumberOfEventsAllowsBlock() throws InterruptedException { // Scheduling on elastic to simulate a user passed in scheduler (this is the default in EventHubClientBuilder). final EventHubConsumerAsyncClient myConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.elastic(), false, onClientClosed); + connectionProcessor, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.elastic(), false, onClientClosed); final Flux eventsFlux = myConsumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) .take(numberOfEvents); @@ -294,7 +294,7 @@ void returnsNewListener() { any(ReceiveOptions.class))).thenReturn(Mono.just(link2), Mono.just(link3)); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, testScheduler, false, onClientClosed); + eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, testScheduler, false, onClientClosed); // Act & Assert StepVerifier.create(asyncClient.receiveFromPartition(PARTITION_ID, EventPosition.earliest()).take(numberOfEvents)) @@ -517,7 +517,7 @@ void receivesMultiplePartitions() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed); + eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed); EmitterProcessor processor2 = EmitterProcessor.create(); FluxSink processor2sink = processor2.sink(); @@ -592,7 +592,7 @@ void receivesMultiplePartitionsWhenOneCloses() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); + eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); EmitterProcessor processor2 = EmitterProcessor.create(); FluxSink processor2sink = processor2.sink(); @@ -652,7 +652,7 @@ void doesNotCloseSharedConnection() { EventHubConnectionProcessor eventHubConnection = Flux.create(sink -> sink.next(connection1)) .subscribeWith(new EventHubConnectionProcessor(HOSTNAME, EVENT_HUB_NAME, retryOptions)); EventHubConsumerAsyncClient sharedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), true, onClientClosed); + eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), true, onClientClosed); // Act sharedConsumer.close(); @@ -672,7 +672,7 @@ void closesDedicatedConnection() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubConsumerAsyncClient dedicatedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); + hubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); // Act dedicatedConsumer.close(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java index b05a875f23617..6aa9c9cb85fdb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java @@ -118,7 +118,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS })); asyncConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); + connectionProcessor, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); consumer = new EventHubConsumerClient(asyncConsumer, Duration.ofSeconds(10)); } @@ -142,8 +142,8 @@ public static void dispose() { public void lastEnqueuedEventInformationIsNull() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( - HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, - PREFETCH, Schedulers.parallel(), false, onClientClosed); + HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null, + CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; sendMessages(sink, numberOfEvents, PARTITION_ID); @@ -172,8 +172,8 @@ public void lastEnqueuedEventInformationCreated() { // Arrange final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( - HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, - Schedulers.parallel(), false, onClientClosed); + HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null, CONSUMER_GROUP, + PREFETCH, Schedulers.parallel(), false, onClientClosed); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java index 3510200950493..64cd339cbd64e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.experimental.serializer.ObjectSerializer; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor; @@ -30,9 +31,12 @@ import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import java.io.InputStream; +import java.io.OutputStream; import java.time.Duration; import java.time.Instant; import java.util.Date; @@ -117,7 +121,7 @@ void receivesMessages(boolean trackLastEnqueuedProperties) { // Arrange linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection)); consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME, - CONSUMER_GROUP, PARTITION_ID, currentPosition, trackLastEnqueuedProperties, Schedulers.parallel()); + CONSUMER_GROUP, PARTITION_ID, null, currentPosition, trackLastEnqueuedProperties, Schedulers.parallel()); final EventData event1 = new EventData("Foo"); final EventData event2 = new EventData("Bar"); @@ -165,7 +169,7 @@ void receiveMultipleTimes() { // Arrange linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection)); consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME, - CONSUMER_GROUP, PARTITION_ID, currentPosition, false, Schedulers.parallel()); + CONSUMER_GROUP, PARTITION_ID, null, currentPosition, false, Schedulers.parallel()); final Message message3 = mock(Message.class); final String secondOffset = "54"; @@ -224,6 +228,57 @@ void receiveMultipleTimes() { Assertions.assertFalse(actual.isInclusive()); } + @Test + void receiveAndDeserialize() { + // just a test value + Object o = 0; + ObjectSerializer testSerializer = new ObjectSerializer() { + @Override + public Mono deserialize(InputStream stream, Class clazz) { + if (clazz.isInstance(o)) { + return Mono.just(clazz.cast(o)); + } + return null; + } + + @Override + public Mono serialize(S stream, Object value) { + return null; + } + }; + + // Arrange + linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection)); + consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME, + CONSUMER_GROUP, PARTITION_ID, testSerializer, currentPosition, false, Schedulers.parallel()); + + final EventData event = new EventData("Foo"); + final LastEnqueuedEventProperties last = new LastEnqueuedEventProperties(10L, 15L, + Instant.ofEpochMilli(1243454), Instant.ofEpochMilli(1240004)); + + when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event); + when(messageSerializer.deserialize(same(message1), eq(LastEnqueuedEventProperties.class))).thenReturn(last); + + // Act & Assert + StepVerifier.create(consumer.receive()) + .then(() -> { + messageProcessorSink.next(message1); + }) + .assertNext(partitionEvent -> { + verifyPartitionContext(partitionEvent.getPartitionContext()); + verifyLastEnqueuedInformation(false, last, + partitionEvent.getLastEnqueuedEventProperties()); + Assertions.assertSame(event, partitionEvent.getData()); + Assertions.assertSame(Integer.class.cast(o), partitionEvent.getDeserializedObject(Integer.class).block()); + }) + .thenCancel() + .verify(); + + // The emitter processor is not closed until the partition consumer is. + Assertions.assertFalse(linkProcessor.isTerminated()); + Assertions.assertSame(originalPosition, currentPosition.get().get()); + } + /** * Verifies that the consumer closes and completes any listeners on a shutdown signal. @@ -233,7 +288,7 @@ void listensToShutdownSignals() throws InterruptedException { // Arrange linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection)); consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME, - CONSUMER_GROUP, PARTITION_ID, currentPosition, false, Schedulers.parallel()); + CONSUMER_GROUP, PARTITION_ID, null, currentPosition, false, Schedulers.parallel()); final Message message3 = mock(Message.class); final String secondOffset = "54"; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java index 85eaacce57ff4..6ab8b2d20f3a9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java @@ -146,7 +146,7 @@ void setup(TestInfo testInfo) { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, testScheduler, false, onClientClosed); + tracerProvider, messageSerializer, null, testScheduler, false, onClientClosed); when(sendLink.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -238,7 +238,7 @@ void sendSingleMessageWithBlock() throws InterruptedException { final Semaphore semaphore = new Semaphore(1); // In our actual client builder, we allow this. final EventHubProducerAsyncClient flexibleProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, testScheduler, false, onClientClosed); // EC is the prefix they use when creating a link that sends to the service round-robin. @@ -319,7 +319,7 @@ void sendStartSpanSingleMessage() { final SendOptions sendOptions = new SendOptions() .setPartitionId(partitionId); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); when(connection.createSendLink( @@ -377,7 +377,7 @@ void sendMessageRetrySpanTest() { TracerProvider tracerProvider = new TracerProvider(tracers); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final String failureKey = "fail"; final EventData testData = new EventData("test"); @@ -516,7 +516,7 @@ void startMessageSpansOnCreateBatch() { final List tracers = Collections.singletonList(tracer1); TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final AmqpSendLink link = mock(AmqpSendLink.class); @@ -822,7 +822,7 @@ void doesNotCloseSharedConnection() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient sharedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), true, onClientClosed); // Act @@ -841,7 +841,7 @@ void closesDedicatedConnection() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); // Act @@ -860,7 +860,7 @@ void closesDedicatedConnectionOnlyOnce() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); // Act @@ -897,7 +897,7 @@ void reopensOnFailure() { new EventHubConnectionProcessor(EVENT_HUB_NAME, connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -972,7 +972,7 @@ void closesOnNonTransientFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1048,7 +1048,7 @@ void resendMessageOnTransientLinkFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index 9e0455d89957b..02ec09aaeb2c1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -105,7 +105,7 @@ public void setup() { .subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); } @@ -157,7 +157,7 @@ public void sendStartSpanSingleMessage() { final List tracers = Collections.singletonList(tracer1); final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); @@ -219,7 +219,7 @@ public void sendMessageRetrySpanTest() { .thenReturn(Mono.just(sendLink)); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)) .addContext(SPAN_CONTEXT_KEY, Context.NONE); @@ -267,7 +267,7 @@ public void sendEventsExceedsBatchSize() { when(sendLink.getLinkSize()).thenReturn(Mono.just(1024)); TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); //Act & Assert @@ -369,7 +369,7 @@ public void startsMessageSpanOnEventBatch() { final List tracers = Collections.singletonList(tracer1); final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), + connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java index a79be245862d5..86941734582ba 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java @@ -139,7 +139,7 @@ private static Stream checkpointStoreSupplier() { private PartitionEvent getEvent(EventData event) { PartitionContext context = new PartitionContext("ns", "foo", "bar", "baz"); - return new PartitionEvent(context, event, null); + return new PartitionEvent(context, event, null, null); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 05888a75f6cb4..ae309f330f9e2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -474,7 +474,7 @@ public void testSingleEventReceiveHeartBeat() throws InterruptedException { private PartitionEvent getEvent(EventData event) { PartitionContext context = new PartitionContext("test-ns", "foo", "bar", "baz"); - return new PartitionEvent(context, event, null); + return new PartitionEvent(context, event, null, null); } private static final class TestPartitionProcessor extends PartitionProcessor { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ObjectBatchTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ObjectBatchTest.java new file mode 100644 index 0000000000000..3cff1916bd214 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ObjectBatchTest.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.implementation.ErrorContextProvider; +import com.azure.core.experimental.serializer.ObjectSerializer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; + +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ObjectBatchTest { + @Mock + private ErrorContextProvider errorContextProvider; + + @BeforeEach + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void nullObject() { + assertThrows(IllegalArgumentException.class, () -> { + final ObjectBatch batch = new ObjectBatch<>( + 1024, + null, + null, + Object.class, + errorContextProvider, + null, + new ObjectSerializer() { + @Override + public Mono deserialize(InputStream stream, Class clazz) { + return null; + } + + @Override + public Mono serialize(S stream, Object value) { + return null; + } + }, + null, + null); + batch.tryAdd(null); + }); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java index 698aee42498ea..4e5c09b72b313 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java @@ -113,7 +113,7 @@ public void testSingleEventProcessor() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); PartitionBasedLoadBalancer partitionBasedLoadBalancer = createPartitionLoadBalancer("owner1"); @@ -149,7 +149,7 @@ public void testTwoEventProcessors() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); PartitionBasedLoadBalancer partitionBasedLoadBalancer1 = createPartitionLoadBalancer("owner1"); @@ -185,7 +185,7 @@ public void testPartitionStealing() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); PartitionBasedLoadBalancer partitionBasedLoadBalancer1 = createPartitionLoadBalancer("owner1"); @@ -225,7 +225,7 @@ public void testMoreEventProcessorsThanPartitions() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); List loadBalancers = new ArrayList<>(); @@ -263,7 +263,7 @@ public void testEventProcessorInactive() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); final int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); List loadBalancers = new ArrayList<>(); @@ -422,7 +422,7 @@ public void testEmptyOwnerId() { .thenReturn(Flux.interval(Duration.ofSeconds(1)).map(index -> { final PartitionContext partitionContext = new PartitionContext("ns", "foo", "bar", "bazz"); final int i = index.intValue() % eventDataList.size(); - return new PartitionEvent(partitionContext, eventDataList.get(i), null); + return new PartitionEvent(partitionContext, eventDataList.get(i), null, null); })); String ownerName = "owner1"; diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroAsyncSerializer.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroAsyncSerializer.java index 7d8edfb020bac..9d11d6bd76317 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroAsyncSerializer.java +++ b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroAsyncSerializer.java @@ -3,7 +3,6 @@ package com.azure.data.schemaregistry.avro; -import com.azure.data.schemaregistry.AbstractDataSerializer; import com.azure.data.schemaregistry.SerializationException; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -14,7 +13,7 @@ /** * Asynchronous registry-based serializer implementation. */ -public class SchemaRegistryAvroAsyncSerializer extends AbstractDataSerializer { +public class SchemaRegistryAvroAsyncSerializer { private static final int DEFAULT_THREAD_POOL_SIZE = 8; private final SchemaRegistryAvroSerializer serializer;