diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index bb7fc1d..fa9ba29 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -5,7 +5,6 @@ namespace Microsoft.Azure.EventHubs.Processor { using System; using System.Collections.Generic; - using System.Text.RegularExpressions; using System.Threading.Tasks; using Newtonsoft.Json; using WindowsAzure.Storage; @@ -19,7 +18,7 @@ class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2); readonly CloudStorageAccount cloudStorageAccount; - readonly string leaseContainerName = null; + readonly string leaseContainerName; readonly string storageBlobPrefix; BlobRequestOptions renewRequestOptions; OperationContext operationContext = null; @@ -115,7 +114,7 @@ public Task CreateCheckpointStoreIfNotExistsAsync() public async Task GetCheckpointAsync(string partitionId) { - AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false); + AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false); Checkpoint checkpoint = null; if (lease != null && !string.IsNullOrEmpty(lease.Offset)) { @@ -126,7 +125,7 @@ public async Task GetCheckpointAsync(string partitionId) }; } - return checkpoint; + return checkpoint; } [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] @@ -137,8 +136,8 @@ public Task UpdateCheckpointAsync(Checkpoint checkpoint) public async Task CreateCheckpointIfNotExistsAsync(string partitionId) { - // Normally the lease will already be created, checkpoint store is initialized after lease store. - AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false); + // Normally the lease will already be created, checkpoint store is initialized after lease store. + AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false); Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber); return checkpoint; @@ -220,7 +219,7 @@ public async Task DeleteLeaseStoreAsync() } while (innerContinuationToken != null); } - else if (blob is CloudBlockBlob) + else if (blob is CloudBlockBlob) { try { @@ -241,35 +240,37 @@ public async Task DeleteLeaseStoreAsync() public async Task GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException { - AzureBlobLease retval = null; + AzureBlobLease retval = null; CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId); if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false)) - { + { retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false); - } + } return retval; } public IEnumerable> GetAllLeases() { - List> leaseFutures = new List>(); - IEnumerable partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().Result; + IEnumerable partitionIds = + this.host.PartitionManager.GetPartitionIdsAsync() + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + foreach (string id in partitionIds) { - leaseFutures.Add(GetLeaseAsync(id)); + yield return GetLeaseAsync(id); } - - return leaseFutures; } public async Task CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException { - AzureBlobLease returnLease; - try - { + AzureBlobLease returnLease; + try + { CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId); returnLease = new AzureBlobLease(partitionId, leaseBlob); string jsonLease = JsonConvert.SerializeObject(returnLease); @@ -277,37 +278,37 @@ public async Task CreateLeaseIfNotExistsAsync(string partitionId) // thro ProcessorEventSource.Log.AzureStorageManagerInfo( this.host.HostName, partitionId, - "CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName + + "CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName + " consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix); await leaseBlob.UploadTextAsync( - jsonLease, - null, - AccessCondition.GenerateIfNoneMatchCondition("*"), - null, + jsonLease, + null, + AccessCondition.GenerateIfNoneMatchCondition("*"), + null, this.operationContext).ConfigureAwait(false); } - catch (StorageException se) - { - if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists || + catch (StorageException se) + { + if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists || se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMissing) // occurs when somebody else already has leased the blob - { + { // The blob already exists. ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Lease already exists"); returnLease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false); } - else - { + else + { ProcessorEventSource.Log.AzureStorageManagerError( this.host.HostName, partitionId, "CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.leaseContainerName + " consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix, se.ToString()); - throw; + throw; } - } - - return returnLease; + } + + return returnLease; } public Task DeleteLeaseAsync(Lease lease) @@ -328,7 +329,7 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease) bool retval = true; string newLeaseId = Guid.NewGuid().ToString(); string partitionId = lease.PartitionId; - try + try { string newToken; await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false); @@ -348,9 +349,9 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease) ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease"); newToken = await leaseBlob.ChangeLeaseAsync( - newLeaseId, - AccessCondition.GenerateLeaseCondition(lease.Token), - null, + newLeaseId, + AccessCondition.GenerateLeaseCondition(lease.Token), + null, this.operationContext).ConfigureAwait(false); } else @@ -380,12 +381,12 @@ await leaseBlob.UploadTextAsync( null, this.operationContext).ConfigureAwait(false); } - catch (StorageException se) + catch (StorageException se) { throw HandleStorageException(partitionId, se); } - - return retval; + + return retval; } public Task RenewLeaseAsync(Lease lease) @@ -398,19 +399,19 @@ async Task RenewLeaseCoreAsync(AzureBlobLease lease) CloudBlockBlob leaseBlob = lease.Blob; string partitionId = lease.PartitionId; - try + try { await leaseBlob.RenewLeaseAsync( AccessCondition.GenerateLeaseCondition(lease.Token), this.renewRequestOptions, this.operationContext).ConfigureAwait(false); } - catch (StorageException se) + catch (StorageException se) { throw HandleStorageException(partitionId, se); } - - return true; + + return true; } public Task ReleaseLeaseAsync(Lease lease) @@ -425,7 +426,7 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) CloudBlockBlob leaseBlob = lease.Blob; string partitionId = lease.PartitionId; - try + try { string leaseId = lease.Token; AzureBlobLease releasedCopy = new AzureBlobLease(lease) @@ -434,19 +435,19 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) Owner = string.Empty }; await leaseBlob.UploadTextAsync( - JsonConvert.SerializeObject(releasedCopy), - null, - AccessCondition.GenerateLeaseCondition(leaseId), + JsonConvert.SerializeObject(releasedCopy), + null, + AccessCondition.GenerateLeaseCondition(leaseId), null, this.operationContext).ConfigureAwait(false); await leaseBlob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(leaseId)).ConfigureAwait(false); } - catch (StorageException se) + catch (StorageException se) { throw HandleStorageException(partitionId, se); } - - return true; + + return true; } public Task UpdateLeaseAsync(Lease lease) @@ -456,7 +457,7 @@ public Task UpdateLeaseAsync(Lease lease) async Task UpdateLeaseCoreAsync(AzureBlobLease lease) { - if (lease == null) + if (lease == null) { return false; } @@ -465,7 +466,7 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease) ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Updating lease"); string token = lease.Token; - if (string.IsNullOrEmpty(token)) + if (string.IsNullOrEmpty(token)) { return false; } @@ -474,23 +475,23 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease) await this.RenewLeaseAsync(lease).ConfigureAwait(false); CloudBlockBlob leaseBlob = lease.Blob; - try + try { string jsonToUpload = JsonConvert.SerializeObject(lease); ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, $"Raw JSON uploading: {jsonToUpload}"); await leaseBlob.UploadTextAsync( - jsonToUpload, - null, - AccessCondition.GenerateLeaseCondition(token), + jsonToUpload, + null, + AccessCondition.GenerateLeaseCondition(token), null, this.operationContext).ConfigureAwait(false); } - catch (StorageException se) - { + catch (StorageException se) + { throw HandleStorageException(partitionId, se); - } - - return true; + } + + return true; } async Task DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException @@ -499,8 +500,8 @@ async Task DownloadLeaseAsync(string partitionId, CloudBlockBlob ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Raw JSON downloaded: " + jsonLease); AzureBlobLease rehydrated = (AzureBlobLease)JsonConvert.DeserializeObject(jsonLease, typeof(AzureBlobLease)); - AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob); - return blobLease; + AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob); + return blobLease; } Exception HandleStorageException(string partitionId, StorageException se)