Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Idempotent Producer Client #15125

Merged
merged 1 commit into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
public override int GetHashCode() { throw null; }
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Producer.PartitionPublishingProperties> ReadPartitionPublishingPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
Expand All @@ -490,6 +491,12 @@ public PartitionPublishingOptions() { }
public short? OwnerLevel { get { throw null; } set { } }
public long? ProducerGroupId { get { throw null; } set { } }
public int? StartingSequenceNumber { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class PartitionPublishingProperties
{
Expand All @@ -498,6 +505,12 @@ protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnab
public int? LastPublishedSequenceNumber { get { throw null; } }
public short? OwnerLevel { get { throw null; } }
public long? ProducerGroupId { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class SendEventOptions
{
Expand Down
23 changes: 16 additions & 7 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ internal class AmqpEventBatch : TransportEventBatch
public override long SizeInBytes => _sizeBytes;

/// <summary>
/// The publishing sequence number assigned to the first event in the batch at the time
/// the batch was successfully published.
/// A flag that indicates whether space should be reserved for a publishing
/// sequence number when the event size is measured. If <c>false</c>, a sequence
/// number is not used in size calculations.
/// </summary>
///
/// <remarks>
/// The starting published sequence number is only populated and relevant when certain features
/// The sequence number is only populated and relevant when certain features
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public override int? StartingPublishedSequenceNumber { get; set; }
public override bool ReserveSpaceForSequenceNumber { get; }

/// <summary>
/// The count of events contained in the batch.
Expand Down Expand Up @@ -93,9 +94,11 @@ internal class AmqpEventBatch : TransportEventBatch
///
/// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.</param>
/// <param name="options">The set of options to apply to the batch.</param>
/// <param name="reserveSpaceForSequenceNumber">A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If <c>false</c>, a sequence number is not used in size calculations.</param>
///
public AmqpEventBatch(AmqpMessageConverter messageConverter,
CreateBatchOptions options)
CreateBatchOptions options,
bool reserveSpaceForSequenceNumber)
{
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Argument.AssertNotNull(options, nameof(options));
Expand All @@ -104,13 +107,13 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
MessageConverter = messageConverter;
Options = options;
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber;

// Initialize the size by reserving space for the batch envelope.

using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
ReservedSize = envelope.SerializedMessageSize;
_sizeBytes = ReservedSize;

}

/// <summary>
Expand All @@ -127,7 +130,12 @@ public override bool TryAdd(EventData eventData)
Argument.AssertNotNull(eventData, nameof(eventData));
Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch));

AmqpMessage eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);
if (ReserveSpaceForSequenceNumber)
{
eventData.PendingPublishSequenceNumber = int.MaxValue;
}

var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);

try
{
Expand All @@ -152,6 +160,7 @@ public override bool TryAdd(EventData eventData)
}
finally
{
eventData.PendingPublishSequenceNumber = default;
eventMessage?.Dispose();
}
}
Expand Down
17 changes: 15 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -277,9 +278,9 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
// default to the maximum size allowed by the link.

options.MaximumSizeInBytes ??= MaximumMessageSize;

Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes));
return new AmqpEventBatch(MessageConverter, options);

return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures));
}

/// <summary>
Expand Down Expand Up @@ -574,6 +575,18 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
return link;
}

/// <summary>
/// Determines if measuring a sequence number is required to accurately calculate
/// the size of an event.
/// </summary>
///
/// <param name="activeFeatures">The set of features which are active for the producer.</param>
///
/// <returns><c>true</c> if a sequence number should be measured; otherwise, <c>false</c>.</returns>
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious - what criteria determines whether or not this attribute should be used?

Copy link
Member Author

@jsquire jsquire Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no objective criteria that I'm using. In this case, these are very simple methods that I'd consider writing as a just a set of statements that repeat in a few places, but am trying to do better about avoiding that repetition. Since the methods are stateless, simple, and used in a couple of places, hinting to the compiler seemed reasonable. To be transparent, I did not notice any difference in timings with or without them, so I'm guessing the compiler is optimizing the same regardless (or these are so trivial that the jumping isn't a meaningful measure).

private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0);

/// <summary>
/// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,17 @@ internal abstract class TransportEventBatch : IDisposable
public abstract long SizeInBytes { get; }

/// <summary>
/// The publishing sequence number assigned to the first event in the batch at the time
/// the batch was successfully published.
/// A flag that indicates whether space should be reserved for a publishing
/// sequence number when the event size is measured. If <c>false</c>, a sequence
/// number is not used in size calculations.
/// </summary>
///
/// <value>
/// The sequence number of the first event in the batch, if the batch was successfully
/// published by a sequence-aware producer. If the producer was not configured to apply
/// sequence numbering or if the batch has not yet been successfully published, this member
/// will be <c>null</c>.
///</value>
///
/// <remarks>
/// The starting published sequence number is only populated and relevant when certain features
/// The sequence number is only populated and relevant when certain features
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public abstract int? StartingPublishedSequenceNumber { get; set; }
public abstract bool ReserveSpaceForSequenceNumber { get; }

/// <summary>
/// The count of events contained in the batch.
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ protected EventData(ReadOnlyMemory<byte> eventBody,
/// Transitions the pending publishing sequence number to the published sequence number.
/// </summary>
///
internal void CommitPublishingSequenceNumber()
internal void CommitPublishingState()
{
PublishedSequenceNumber = PendingPublishSequenceNumber;
PendingPublishSequenceNumber = default;
Expand Down
28 changes: 14 additions & 14 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ public long? MaximumSizeInBytes
}
}

/// <summary>
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
/// </summary>
///
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
///
internal CreateBatchOptions Clone() =>
new CreateBatchOptions
{
PartitionId = PartitionId,
PartitionKey = PartitionKey,
_maximumSizeInBytes = MaximumSizeInBytes
};

/// <summary>
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
/// </summary>
Expand Down Expand Up @@ -82,5 +68,19 @@ internal CreateBatchOptions Clone() =>
///
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => base.ToString();

/// <summary>
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
/// </summary>
///
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
///
internal new CreateBatchOptions Clone() =>
new CreateBatchOptions
{
PartitionId = PartitionId,
PartitionKey = PartitionKey,
_maximumSizeInBytes = _maximumSizeInBytes
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ public sealed class EventDataBatch : IDisposable
/// of the producer are enabled. For example, it is used by idempotent publishing.
/// </remarks>
///
public int? StartingPublishedSequenceNumber
{
get => InnerBatch.StartingPublishedSequenceNumber;
internal set => InnerBatch.StartingPublishedSequenceNumber = value;
}
public int? StartingPublishedSequenceNumber { get; internal set; }

/// <summary>
/// The count of events contained in the batch.
Expand Down
Loading