Skip to content

Commit

Permalink
Update to latest event hubs client SDK (#385)
Browse files Browse the repository at this point in the history
* update to latest event hubs client SDK.

* minor editing and renaming, catch startup cancellation, and ignore cancellation exceptions after cancellation

* address PR feedback

* Apply suggestions from code review

Co-authored-by: David Justo <david.justo.1996@gmail.com>

* optimization: cancel the wait when shutting down

* avoid race when opening partitions that are canceled immediately

* add test timeouts and fix handling of instant cancellation

* better logging of exceptions in partition shutdown

* adding more timeouts

* suggestion from PR review

Co-authored-by: David Justo <david.justo.1996@gmail.com>

* add more timeout wrappers

* remove dead code

* undo addition of redundant, coarse timeouts for tests that already have timeouts with nontrivial duration

* simplify code that seemed to be hanging in unit tests

* add statements to print progress to some of the unit tests, to facilitate analysis of hangs in CI

---------

Co-authored-by: David Justo <david.justo.1996@gmail.com>
  • Loading branch information
sebastianburckhardt and davidmrdavid authored Sep 26, 2024
1 parent 2fd17d0 commit 6e10bdf
Show file tree
Hide file tree
Showing 33 changed files with 1,681 additions and 1,360 deletions.
84 changes: 16 additions & 68 deletions src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ namespace DurableTask.Netherite
using System.Security.Cryptography;
using System.Web;
using DurableTask.Netherite.EventHubsTransport;
using Microsoft.Azure.EventHubs;
using Azure.Core;
using System.Runtime.CompilerServices;
using Microsoft.Azure.EventHubs.Processor;
using Newtonsoft.Json.Serialization;
using DurableTask.Netherite.Faster;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;

/// <summary>
/// Utilities for constructing various SDK objects from a connection information.
Expand Down Expand Up @@ -106,96 +105,45 @@ public static Azure.Storage.Blobs.BlobServiceClient GetAzureStorageV12BlobServic
}
}


/// <summary>
/// Creates an Event Hub client for the given connection info.
/// Creates an Event Hub connection.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="eventHub">The event hub name.</param>
/// <returns></returns>
public static EventHubClient CreateEventHubClient(this ConnectionInfo connectionInfo, string eventHub)
public static EventHubConnection CreateEventHubConnection(this ConnectionInfo connectionInfo, string eventHub)
{
if (connectionInfo.ConnectionString != null)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionInfo.ConnectionString)
{
EntityPath = eventHub
};
return EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
return new Azure.Messaging.EventHubs.EventHubConnection(connectionInfo.ConnectionString, eventHub);
}
else
{
Uri uri = new Uri($"sb://{connectionInfo.HostName}");
var tokenProvider = new EventHubsTokenProvider(connectionInfo);
return EventHubClient.CreateWithTokenProvider(uri, eventHub, tokenProvider);
return new Azure.Messaging.EventHubs.EventHubConnection(
fullyQualifiedNamespace: connectionInfo.HostName,
eventHubName: eventHub,
credential: connectionInfo.TokenCredential,
connectionOptions: null);
}
}

/// <summary>
/// Creates an event processor host for the given connection info.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="hostName">The host name.</param>
/// <param name="eventHubPath">The event hub name.</param>
/// <param name="consumerGroupName">The consumer group name.</param>
/// <param name="checkpointStorage">A connection info for the checkpoint storage.</param>
/// <param name="leaseContainerName">The name of the lease container.</param>
/// <param name="storageBlobPrefix">A prefix for storing the blobs.</param>
/// <param name="args">The constructor arguments.</param>
/// <returns>An event processor host.</returns>
public static async Task<EventProcessorHost> GetEventProcessorHostAsync(
this ConnectionInfo connectionInfo,
string hostName,
string eventHubPath,
string consumerGroupName,
ConnectionInfo checkpointStorage,
string leaseContainerName,
string storageBlobPrefix)
{
public static EventProcessorHost CreateEventProcessorHost(
this ConnectionInfo connectionInfo,
EventProcessorHost.ConstructorArguments args)
{
if (connectionInfo.ConnectionString != null)
{
return new EventProcessorHost(
hostName,
eventHubPath,
consumerGroupName,
connectionInfo.ConnectionString,
checkpointStorage.ConnectionString,
leaseContainerName,
storageBlobPrefix);
return new EventProcessorHost(args, connectionInfo.ConnectionString);
}
else
{
var storageAccount = await checkpointStorage.GetAzureStorageV11AccountAsync();
return new EventProcessorHost(
new Uri($"sb://{connectionInfo.HostName}"),
eventHubPath,
consumerGroupName,
(ITokenProvider) (new EventHubsTokenProvider(connectionInfo)),
storageAccount,
leaseContainerName,
storageBlobPrefix);
}
}

class EventHubsTokenProvider : Microsoft.Azure.EventHubs.ITokenProvider
{
readonly ConnectionInfo info;

public EventHubsTokenProvider(ConnectionInfo info)
{
this.info = info;
}

static TimeSpan NextRefresh(AccessToken token)
{
DateTimeOffset now = DateTimeOffset.UtcNow;
return token.ExpiresOn - now - TimeSpan.FromMinutes(1); // refresh it a bit early.
}

async Task<SecurityToken> ITokenProvider.GetTokenAsync(string appliesTo, TimeSpan timeout)
{
TokenRequestContext request = new(this.info.Scopes);
AccessToken accessToken = await this.info.TokenCredential.GetTokenAsync(request, CancellationToken.None);
return new JsonSecurityToken(accessToken.Token, appliesTo);
return new EventProcessorHost(args, connectionInfo.HostName, connectionInfo.TokenCredential);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.33.0" />
<PackageReference Include="Azure.Core" Version="1.38.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.8.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.2" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.16.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace DurableTask.Netherite
using System.Collections.Generic;
using System.Runtime;
using System.Text;
using Microsoft.Azure.EventHubs;
using Azure.Messaging.EventHubs;

/// <summary>
/// Encapsulates how the transport connection string setting is interpreted.
Expand Down Expand Up @@ -63,8 +63,8 @@ public static string EventHubsNamespaceName(string transportConnectionString)
{
try
{
var builder = new EventHubsConnectionStringBuilder(transportConnectionString);
var host = builder.Endpoint.Host;
var properties = EventHubsConnectionStringProperties.Parse(transportConnectionString);
var host = properties.Endpoint.Host;
return host.Substring(0, host.IndexOf('.'));
}
catch(Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public AzureStorageDevice(string blobName, BlobUtilsV12.BlobDirectory blockBlobD
this.pageBlobDirectory = pageBlobDirectory;
this.blobName = blobName;
this.PartitionErrorHandler = blobManager.PartitionErrorHandler;
this.PartitionErrorHandler.Token.Register(this.CancelAllRequests);
this.BlobManager = blobManager;
this.underLease = underLease;
this.hangCheckTimer = new Timer(this.DetectHangs, null, 0, 20000);
this.singleWriterSemaphore = underLease ? new SemaphoreSlim(1) : null;
this.limit = TimeSpan.FromSeconds(90);
this.PartitionErrorHandler.Token.Register(this.CancelAllRequests);
}

/// <inheritdoc/>
Expand All @@ -106,7 +106,7 @@ public async Task StartAsync()
var prefix = $"{this.blockBlobDirectory}{this.blobName}.";

string continuationToken = null;
IEnumerable<BlobItem> pageResults = null;
List<BlobItem> pageResults = null;

do
{
Expand All @@ -124,25 +124,15 @@ await this.BlobManager.PerformWithRetriesAsync(
{
var client = this.pageBlobDirectory.Client.WithRetries;

var enumerator = client.GetBlobsAsync(
Azure.AsyncPageable<BlobItem> pageable = client.GetBlobsAsync(
prefix: prefix,
cancellationToken: this.PartitionErrorHandler.Token)
.AsPages(continuationToken, 100)
.GetAsyncEnumerator(cancellationToken: this.PartitionErrorHandler.Token);
cancellationToken: this.PartitionErrorHandler.Token);

if (await enumerator.MoveNextAsync())
{
var page = enumerator.Current;
pageResults = page.Values;
continuationToken = page.ContinuationToken;
return page.Values.Count; // not accurate, in terms of bytes, but still useful for tracing purposes
}
else
{
pageResults = Enumerable.Empty<BlobItem>();
continuationToken = null;
return 0;
};
IAsyncEnumerable<Azure.Page<BlobItem>> pages = pageable.AsPages(continuationToken, 100);
Azure.Page<BlobItem> firstPage = await pages.FirstAsync();
pageResults = firstPage.Values.ToList();
continuationToken = firstPage.ContinuationToken;
return pageResults.Count; // not accurate, in terms of bytes, but still useful for tracing purposes
});

foreach (var item in pageResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -53,7 +52,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,

foreach (var eventData in hubMessages)
{
var seqno = eventData.SystemProperties.SequenceNumber;
var seqno = eventData.SequenceNumber;

if (nextPacketToReceive != null)
{
Expand All @@ -75,19 +74,19 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,

try
{
Packet.Deserialize(eventData.Body, out evt, out blobReference, guid);
Packet.Deserialize(eventData.EventBody.ToStream(), out evt, out blobReference, guid);
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet #{seqno} ({size} bytes)", this.traceContext, seqno, eventData.Body.Count);
this.traceHelper.LogError("{context} could not deserialize packet #{seqno} ({size} bytes)", this.traceContext, seqno, eventData.EventBody.ToMemory().Length);
throw;
}

if (blobReference == null)
{
if (evt == null)
{
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.Body.Count);
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.EventBody.ToMemory().Length);
ignoredPacketCount++;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Abstractions;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;

class BlobBatchSender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,8 @@

namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using Azure.Messaging.EventHubs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,12 +14,12 @@ class EventHubsClientSender
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;

public EventHubsClientSender(TransportAbstraction.IHost host, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
public EventHubsClientSender(TransportAbstraction.IHost host, Guid clientId, (EventHubConnection connection, string partitionId)[] partitions, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[partitions.Length];
for (int i = 0; i < partitions.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, clientId.ToByteArray(), senders[i], shutdownToken, traceHelper, settings);
this.channels[i] = new EventHubsSender<ClientEvent>(host, clientId.ToByteArray(), partitions[i].connection, partitions[i].partitionId, shutdownToken, traceHelper, settings);
}
}

Expand All @@ -44,7 +39,7 @@ public void Submit(ClientEvent toSend)

public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()).ToList());
}
}
}
Loading

0 comments on commit 6e10bdf

Please sign in to comment.