Skip to content

Commit

Permalink
[Event Hubs Client] Integrate v5.3.0-beta.1 Release Branch (Azure#15162)
Browse files Browse the repository at this point in the history
The focus of these changes is to integrate the release branch for Event Hubs
v5.3.0-beta.1.

The release introduces an option for the various event consumers allowing
the prefetch cache to be filled based on a size-based heuristic rather than
a count of events.  This feature is considered a special case, helpful in
scenarios where the size of events being read is not able to be known or
predicted upfront and limiting resource use is valued over consistent and
predictable performance.
  • Loading branch information
jsquire committed Sep 15, 2020
1 parent 3bbe65d commit 2e323b9
Show file tree
Hide file tree
Showing 34 changed files with 563 additions and 120 deletions.
10 changes: 9 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Release History

## 5.3.0-beta.1 (Unreleased)
## 5.3.0-beta.2 (Unreleased)

## 5.3.0-beta.1 (2020-09-15)

### Changes

#### New Features

- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.

## 5.2.0 (2020-09-08)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public EventProcessorClientOptions() { }
public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.3.0-beta.1</Version>
<Version>5.3.0-beta.2</Version>
<ApiCompatVersion>5.2.0</ApiCompatVersion>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
Expand All @@ -12,12 +12,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.3.0-beta.1" /><!-- This override will be removed when v5.3.0 is released for GA -->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="System.Reflection.TypeExtensions" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
<PackageReference Include="System.Threading.Channels" />
<PackageReference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,8 @@ private static EventProcessorOptions CreateOptions(EventProcessorClientOptions c
MaximumWaitTime = clientOptions.MaximumWaitTime,
TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties,
LoadBalancingStrategy = clientOptions.LoadBalancingStrategy,
PrefetchCount = clientOptions.PrefetchCount
PrefetchCount = clientOptions.PrefetchCount,
PrefetchSizeInBytes = clientOptions.PrefetchSizeInBytes
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class EventProcessorClientOptions
/// <summary>The prefetch count to use when reading events.</summary>
private int _prefetchCount = 300;

/// <summary>The prefetch size limit to use for the partition receiver.</summary>
private long? _prefetchSizeInBytes = default;

/// <summary>The set of options to use for configuring the connection to the Event Hubs service.</summary>
private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions();

Expand Down Expand Up @@ -135,9 +138,9 @@ public int CacheEventCount
}

/// <summary>
/// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to
/// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than
/// readers needing to wait for service operations to complete.
/// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to
/// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from
/// from a local cache rather than waiting on a service request.
/// </summary>
///
/// <value>
Expand All @@ -147,8 +150,8 @@ public int CacheEventCount
/// </value>
///
/// <remarks>
/// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The
/// larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service.
/// The larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O.
///
/// For scenarios where the size of events is small and many events are flowing through the system, using a larger
Expand All @@ -172,6 +175,38 @@ public int PrefetchCount
}
}

/// <summary>
/// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to
/// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from
/// from a local cache rather than waiting on a service request.
/// </summary>
///
/// <value>
/// <para>When set to <c>null</c>, the option is considered disabled; otherwise, it will be considered enabled and take
/// precedence over any value specified for the <see cref="PrefetchCount" />The <see cref="PrefetchSizeInBytes" /> is an
/// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using
/// the <see cref="PrefetchCount" /> over this option where possible for more accurate control and more predictable throughput.</para>
///
/// <para>This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or
/// smaller than the number of bytes specified, and will always contain at least one event when the <see cref="PrefetchSizeInBytes" />
/// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate
/// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well.</para>
/// </value>
///
public long? PrefetchSizeInBytes
{
get => _prefetchSizeInBytes;

set
{
if (value.HasValue)
{
Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes));
}
_prefetchSizeInBytes = value;
}
}

/// <summary>
/// Gets or sets the options used for configuring the connection to the Event Hubs service.
/// </summary>
Expand Down Expand Up @@ -246,6 +281,7 @@ internal EventProcessorClientOptions Clone() =>
_maximumWaitTime = _maximumWaitTime,
_cacheEventCount = _cacheEventCount,
_prefetchCount = _prefetchCount,
_prefetchSizeInBytes = PrefetchSizeInBytes,
_connectionOptions = ConnectionOptions.Clone(),
_retryOptions = RetryOptions.Clone()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Microsoft.Azure.Management.EventHub" />
<PackageReference Include="Microsoft.Azure.Management.Storage" />
<PackageReference Include="Microsoft.Azure.Management.ResourceManager" />
<PackageReference Include="Microsoft.Azure.Services.AppAuthentication" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="System.Net.WebSockets.Client" />
<PackageReference Include="System.ValueTuple" />
<PackageReference Include="Moq" />
<PackageReference Include="NUnit" />
<PackageReference Include="NUnit3TestAdapter" />
<PackageReference Include="Moq" />
<PackageReference Include="Polly" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
<PackageReference Include="Microsoft.Azure.Management.EventHub" />
<PackageReference Include="System.Net.WebSockets.Client" />
<PackageReference Include="System.Threading.Tasks.Extensions" />
<PackageReference Include="Microsoft.Azure.Management.Storage" />
<PackageReference Include="Microsoft.Azure.Management.ResourceManager" />
<PackageReference Include="Microsoft.Azure.Services.AppAuthentication" />
<PackageReference Include="System.ValueTuple" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void CloneProducesACopy()
MaximumWaitTime = TimeSpan.FromMinutes(65),
CacheEventCount = 1,
PrefetchCount = 0,
PrefetchSizeInBytes = 200,
RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(1), Delay = TimeSpan.FromMinutes(4) },
ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }
};
Expand All @@ -46,6 +47,7 @@ public void CloneProducesACopy()
Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match.");
Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache size of the clone should match.");
Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match.");
Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch byte size of the clone should match.");
Assert.That(clone.ConnectionOptions.TransportType, Is.EqualTo(options.ConnectionOptions.TransportType), "The connection options of the clone should copy properties.");
Assert.That(clone.ConnectionOptions, Is.Not.SameAs(options.ConnectionOptions), "The connection options of the clone should be a copy, not the same instance.");
Assert.That(clone.RetryOptions.IsEquivalentTo(options.RetryOptions), Is.True, "The retry options of the clone should be considered equal.");
Expand Down Expand Up @@ -113,6 +115,39 @@ public void PrefetchCountAllowsZero()
Assert.That(() => new EventProcessorClientOptions { PrefetchCount = 0 }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesIsValidated()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = -1 }, Throws.InstanceOf<ArgumentException>());
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesAllowsZero()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesAllowsNull()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = null }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.ConnectionOptions" />
/// property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void assertOptionsMatch(EventProcessorOptions expected,
Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor.");
}

var clientOptions = new EventProcessorClientOptions
Expand All @@ -131,7 +132,9 @@ void assertOptionsMatch(EventProcessorOptions expected,
RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
Identifier = "OMG, HAI!",
MaximumWaitTime = TimeSpan.FromDays(54),
TrackLastEnqueuedEventProperties = true
TrackLastEnqueuedEventProperties = true,
PrefetchCount = 5,
PrefetchSizeInBytes = 500
};

var expectedOptions = InvokeCreateOptions(clientOptions);
Expand Down Expand Up @@ -1408,7 +1411,8 @@ public void ClientOptionsCanBeTranslated()
MaximumWaitTime = TimeSpan.FromDays(54),
TrackLastEnqueuedEventProperties = true,
LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
PrefetchCount = 9990
PrefetchCount = 9990,
PrefetchSizeInBytes = 400
};

var defaultOptions = new EventProcessorOptions();
Expand All @@ -1424,6 +1428,7 @@ public void ClientOptionsCanBeTranslated()
Assert.That(processorOptions.TrackLastEnqueuedEventProperties, Is.EqualTo(clientOptions.TrackLastEnqueuedEventProperties), "The flag for last event tracking should have been set.");
Assert.That(processorOptions.LoadBalancingStrategy, Is.EqualTo(clientOptions.LoadBalancingStrategy), "The load balancing strategy should have been set.");
Assert.That(processorOptions.PrefetchCount, Is.EqualTo(clientOptions.PrefetchCount), "The prefetch count should have been set.");
Assert.That(processorOptions.PrefetchSizeInBytes, Is.EqualTo(clientOptions.PrefetchSizeInBytes), "The prefetch byte size should have been set.");

Assert.That(processorOptions.DefaultStartingPosition, Is.EqualTo(defaultOptions.DefaultStartingPosition), "The default starting position should not have been set.");
Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(defaultOptions.LoadBalancingUpdateInterval), "The load balancing interval should not have been set.");
Expand Down
10 changes: 9 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Release History

## 5.3.0-beta.1 (Unreleased)
## 5.3.0-beta.2 (Unreleased)

## 5.3.0-beta.1 (2020-09-15)

### Changes

#### New Features

- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.

## 5.2.0 (2020-09-08)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public ReadEventOptions() { }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public long? OwnerLevel { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down Expand Up @@ -268,6 +269,7 @@ public EventProcessorOptions() { }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down Expand Up @@ -358,6 +360,7 @@ public PartitionReceiverOptions() { }
public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } }
public long? OwnerLevel { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ public override TransportProducer CreateProducer(string partitionId,
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested. If <c>null</c> a default will be used.</param>
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
///
/// <returns>A <see cref="TransportConsumer" /> configured in the requested manner.</returns>
///
Expand All @@ -433,7 +434,8 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties,
long? ownerLevel,
uint? prefetchCount)
uint? prefetchCount,
long? prefetchSizeInBytes)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));

Expand All @@ -446,6 +448,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
trackLastEnqueuedEventProperties,
ownerLevel,
prefetchCount,
prefetchSizeInBytes,
ConnectionScope,
MessageConverter,
retryPolicy
Expand Down
Loading

0 comments on commit 2e323b9

Please sign in to comment.