Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for EH + serializer integration #12205

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6e895f2
create draft PR
arerlend Jun 15, 2020
544a6e7
fix object batch
arerlend Jun 16, 2020
7e9ae15
temp object serializer
arerlend Jul 1, 2020
b5a1dec
Merge branch 'master' into arerlend.schemaregistry.eh-integration
arerlend Jul 1, 2020
82eb0ff
dep on core experimental
arerlend Jul 1, 2020
eda212d
temp
arerlend Jul 1, 2020
6a1d7e7
getDeserializedObject()
arerlend Jul 2, 2020
3560da2
null object in object batch test
arerlend Jul 2, 2020
7a7bce3
remove old SR dep
arerlend Jul 2, 2020
89e6cca
remove temporary interfaces
arerlend Jul 6, 2020
f3acc8c
rename objectSerializer builder method to serializer
arerlend Jul 6, 2020
c95145c
single class imports
arerlend Jul 6, 2020
4eed6e0
fix serializer builder javadoc
arerlend Jul 6, 2020
f78469a
rename abstract batch impl
arerlend Jul 6, 2020
c5d39fd
add EventDataBatchBase javadoc
arerlend Jul 6, 2020
7a0fc35
mono TryAdd
arerlend Jul 6, 2020
c9c1f29
fix modifiers
arerlend Jul 6, 2020
6ea6bb6
remove send mode
arerlend Jul 6, 2020
4fdfa93
fix object batch javadoc
arerlend Jul 6, 2020
b976d2d
partition event deserialize to async
arerlend Jul 6, 2020
114f68d
uncomment azure core
arerlend Jul 6, 2020
45c819a
javadoc syntax
arerlend Jul 6, 2020
533df1e
remove temp interface
arerlend Jul 6, 2020
570fbea
Merge branch 'master' into arerlend.schemaregistry.eh-integration
arerlend Jul 7, 2020
d295b83
only core-experimental, not core
arerlend Jul 7, 2020
28c5f63
checkstyle
arerlend Jul 7, 2020
2e66eb1
update partition event deserialize
arerlend Jul 7, 2020
8947c09
monoError for object batch
arerlend Jul 8, 2020
7496de8
rename objectSerializer to serializer
arerlend Jul 8, 2020
03693c9
add serializer null check in object batch creation
arerlend Jul 8, 2020
447522a
cleanup
arerlend Jul 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.5.1</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<artifactId>azure-core-experimental</artifactId>
<version>1.0.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-experimental;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Signal;

import java.nio.BufferOverflowException;
import java.util.*;
arerlend marked this conversation as resolved.
Show resolved Hide resolved

import static com.azure.core.util.tracing.Tracer.*;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;

arerlend marked this conversation as resolved.
Show resolved Hide resolved
public abstract class Batch {
arerlend marked this conversation as resolved.
Show resolved Hide resolved
private final ClientLogger logger = new ClientLogger(this.getClass());
private final Object lock = new Object();
private final int maxMessageSize;
private final String partitionKey;
private final ErrorContextProvider contextProvider;
private final List<EventData> events;
private final byte[] eventBytes;
private final String partitionId;
private int sizeInBytes;
protected final TracerProvider tracerProvider;
arerlend marked this conversation as resolved.
Show resolved Hide resolved
private final String entityPath;
private final String hostname;

Batch(int maxMessageSize, String partitionId, String partitionKey, ErrorContextProvider contextProvider,
TracerProvider tracerProvider, String entityPath, String hostname) {
this.maxMessageSize = maxMessageSize;
this.partitionKey = partitionKey;
this.partitionId = partitionId;
this.contextProvider = contextProvider;
this.events = new LinkedList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
this.hostname = hostname;
}

/**
* Gets the number of {@link EventData events} in the batch.
*
* @return The number of {@link EventData events} in the batch.
*/
public int getCount() {
return events.size();
}

/**
* Gets the maximum size, in bytes, of the {@link EventDataBatch}.
*
* @return The maximum size, in bytes, of the {@link EventDataBatch}.
*/
public int getMaxSizeInBytes() {
return maxMessageSize;
}

/**
* Gets the size of the {@link EventDataBatch} in bytes.
*
* @return the size of the {@link EventDataBatch} in bytes.
*/
public int getSizeInBytes() {
return this.sizeInBytes;
}

/**
* Tries to add an {@link EventData event} to the batch.
*
* @param eventData The {@link EventData} to add to the batch.
* @return {@code true} if the event could be added to the batch; {@code false} if the event was too large to fit in
* the batch.
* @throws IllegalArgumentException if {@code eventData} is {@code null}.
* @throws AmqpException if {@code eventData} is larger than the maximum size of the {@link EventDataBatch}.
*/
public boolean tryAdd(final EventData eventData) {
if (eventData == null) {
throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null"));
}
EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData;

final int size;
try {
size = getSize(event, events.isEmpty());
} catch (BufferOverflowException exception) {
throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb",
maxMessageSize / 1024),
contextProvider.getErrorContext()));
}

synchronized (lock) {
if (this.sizeInBytes + size > this.maxMessageSize) {
return false;
}

this.sizeInBytes += size;
}

this.events.add(event);
return true;
}


/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the message.
*
* @param eventData The Event to add tracing span for.
* @return the updated event data object.
*/
EventData traceMessageSpan(EventData eventData) {
Optional<Object> eventContextData = eventData.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return eventData;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context eventContext = eventData.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, this.entityPath)
.addData(HOST_NAME_KEY, this.hostname);
Context eventSpanContext = tracerProvider.startSpan(eventContext, ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}

return eventData;
}

List<EventData> getEvents() {
return events;
}

String getPartitionKey() {
return partitionKey;
}

String getPartitionId() {
return partitionId;
}

int getSize(final EventData eventData, final boolean isFirst) {
Objects.requireNonNull(eventData, "'eventData' cannot be null.");

final Message amqpMessage = createAmqpMessage(eventData, partitionKey);
int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size
eventSize += 16; // data section overhead

if (isFirst) {
amqpMessage.setBody(null);
amqpMessage.setApplicationProperties(null);
amqpMessage.setProperties(null);
amqpMessage.setDeliveryAnnotations(null);

eventSize += amqpMessage.encode(this.eventBytes, 0, maxMessageSize);
}

return eventSize;
}

/*
* Creates the AMQP message represented by the event data
*/
private Message createAmqpMessage(EventData event, String partitionKey) {
final Message message = Proton.message();

if (event.getProperties() != null && !event.getProperties().isEmpty()) {
final ApplicationProperties applicationProperties = new ApplicationProperties(event.getProperties());
message.setApplicationProperties(applicationProperties);
}

if (event.getSystemProperties() != null) {
event.getSystemProperties().forEach((key, value) -> {
if (EventData.RESERVED_SYSTEM_PROPERTIES.contains(key)) {
return;
}

final AmqpMessageConstant constant = AmqpMessageConstant.fromString(key);

if (constant != null) {
switch (constant) {
case MESSAGE_ID:
message.setMessageId(value);
break;
case USER_ID:
message.setUserId((byte[]) value);
break;
case TO:
message.setAddress((String) value);
break;
case SUBJECT:
message.setSubject((String) value);
break;
case REPLY_TO:
message.setReplyTo((String) value);
break;
case CORRELATION_ID:
message.setCorrelationId(value);
break;
case CONTENT_TYPE:
message.setContentType((String) value);
break;
case CONTENT_ENCODING:
message.setContentEncoding((String) value);
break;
case ABSOLUTE_EXPIRY_TIME:
message.setExpiryTime((long) value);
break;
case CREATION_TIME:
message.setCreationTime((long) value);
break;
case GROUP_ID:
message.setGroupId((String) value);
break;
case GROUP_SEQUENCE:
message.setGroupSequence((long) value);
break;
case REPLY_TO_GROUP_ID:
message.setReplyToGroupId((String) value);
break;
default:
throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US,
"Property is not a recognized reserved property name: %s", key)));
}
} else {
final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();
messageAnnotations.getValue().put(Symbol.getSymbol(key), value);
message.setMessageAnnotations(messageAnnotations);
}
});
}

if (partitionKey != null) {
final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();
messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
message.setMessageAnnotations(messageAnnotations);
}

message.setBody(new Data(new Binary(event.getBody())));

return message;
}
}
Loading