Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Result for AzureStorageCheckpointLeaseManager GetAllLeases #346

Merged
merged 6 commits into from
Nov 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Newtonsoft.Json;
using WindowsAzure.Storage;
Expand All @@ -19,7 +18,7 @@ class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager

static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2);
readonly CloudStorageAccount cloudStorageAccount;
readonly string leaseContainerName = null;
readonly string leaseContainerName;
readonly string storageBlobPrefix;
BlobRequestOptions renewRequestOptions;
OperationContext operationContext = null;
Expand Down Expand Up @@ -115,7 +114,7 @@ public Task<bool> CreateCheckpointStoreIfNotExistsAsync()

public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
{
AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = null;
if (lease != null && !string.IsNullOrEmpty(lease.Offset))
{
Expand All @@ -126,7 +125,7 @@ public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
};
}

return checkpoint;
return checkpoint;
}

[Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Expand All @@ -137,8 +136,8 @@ public Task UpdateCheckpointAsync(Checkpoint checkpoint)

public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber);

return checkpoint;
Expand Down Expand Up @@ -220,7 +219,7 @@ public async Task<bool> DeleteLeaseStoreAsync()
}
while (innerContinuationToken != null);
}
else if (blob is CloudBlockBlob)
else if (blob is CloudBlockBlob)
{
try
{
Expand All @@ -241,73 +240,75 @@ public async Task<bool> DeleteLeaseStoreAsync()

public async Task<Lease> GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease retval = null;
AzureBlobLease retval = null;

CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);

if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false))
{
{
retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}
}

return retval;
}

public IEnumerable<Task<Lease>> GetAllLeases()
{
List<Task<Lease>> leaseFutures = new List<Task<Lease>>();
IEnumerable<string> partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().Result;
IEnumerable<string> partitionIds =
this.host.PartitionManager.GetPartitionIdsAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
Copy link
Member

@serkantkaraca serkantkaraca Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Below is same as Task.Run
this.host.PartitionManager.GetPartitionIdsAsync().ConfigureAwait(false).GetAwaiter().GetResult(); #Resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@serkantkaraca done , also changed to yield return insted I think is ok for now if I found more of these async call in a sync context I can create some helper method to Run this keeping in considerations things like the library Nito.Asyc https://github.com/StephenCleary/AsyncEx/blob/db32fd5db0d1051e867b36ae039ea13d2c36eb91/src/Nito.AsyncEx.Tasks/SynchronousTaskExtensions.cs#L11


foreach (string id in partitionIds)
{
leaseFutures.Add(GetLeaseAsync(id));
yield return GetLeaseAsync(id);
}

return leaseFutures;
}

public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease returnLease;
try
{
AzureBlobLease returnLease;
try
{
CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
returnLease = new AzureBlobLease(partitionId, leaseBlob);
string jsonLease = JsonConvert.SerializeObject(returnLease);

ProcessorEventSource.Log.AzureStorageManagerInfo(
this.host.HostName,
partitionId,
"CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName +
"CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName +
" consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix);
await leaseBlob.UploadTextAsync(
jsonLease,
null,
AccessCondition.GenerateIfNoneMatchCondition("*"),
null,
jsonLease,
null,
AccessCondition.GenerateIfNoneMatchCondition("*"),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists ||
catch (StorageException se)
{
if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists ||
se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMissing) // occurs when somebody else already has leased the blob
{
{
// The blob already exists.
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Lease already exists");
returnLease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
}
else
{
else
{
ProcessorEventSource.Log.AzureStorageManagerError(
this.host.HostName,
partitionId,
"CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.leaseContainerName +
" consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix,
se.ToString());
throw;
throw;
}
}
return returnLease;
}

return returnLease;
}

public Task DeleteLeaseAsync(Lease lease)
Expand All @@ -328,7 +329,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
bool retval = true;
string newLeaseId = Guid.NewGuid().ToString();
string partitionId = lease.PartitionId;
try
try
{
string newToken;
await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false);
Expand All @@ -348,9 +349,9 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease");
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
}
else
Expand Down Expand Up @@ -380,12 +381,12 @@ await leaseBlob.UploadTextAsync(
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return retval;

return retval;
}

public Task<bool> RenewLeaseAsync(Lease lease)
Expand All @@ -398,19 +399,19 @@ async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
CloudBlockBlob leaseBlob = lease.Blob;
string partitionId = lease.PartitionId;

try
try
{
await leaseBlob.RenewLeaseAsync(
AccessCondition.GenerateLeaseCondition(lease.Token),
this.renewRequestOptions,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;

return true;
}

public Task<bool> ReleaseLeaseAsync(Lease lease)
Expand All @@ -425,7 +426,7 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
CloudBlockBlob leaseBlob = lease.Blob;
string partitionId = lease.PartitionId;

try
try
{
string leaseId = lease.Token;
AzureBlobLease releasedCopy = new AzureBlobLease(lease)
Expand All @@ -434,19 +435,19 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
Owner = string.Empty
};
await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(releasedCopy),
null,
AccessCondition.GenerateLeaseCondition(leaseId),
JsonConvert.SerializeObject(releasedCopy),
null,
AccessCondition.GenerateLeaseCondition(leaseId),
null,
this.operationContext).ConfigureAwait(false);
await leaseBlob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(leaseId)).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;

return true;
}

public Task<bool> UpdateLeaseAsync(Lease lease)
Expand All @@ -456,7 +457,7 @@ public Task<bool> UpdateLeaseAsync(Lease lease)

async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
{
if (lease == null)
if (lease == null)
{
return false;
}
Expand All @@ -465,7 +466,7 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Updating lease");

string token = lease.Token;
if (string.IsNullOrEmpty(token))
if (string.IsNullOrEmpty(token))
{
return false;
}
Expand All @@ -474,23 +475,23 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
await this.RenewLeaseAsync(lease).ConfigureAwait(false);

CloudBlockBlob leaseBlob = lease.Blob;
try
try
{
string jsonToUpload = JsonConvert.SerializeObject(lease);
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, $"Raw JSON uploading: {jsonToUpload}");
await leaseBlob.UploadTextAsync(
jsonToUpload,
null,
AccessCondition.GenerateLeaseCondition(token),
jsonToUpload,
null,
AccessCondition.GenerateLeaseCondition(token),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;
}

return true;
}

async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
Expand All @@ -499,8 +500,8 @@ async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Raw JSON downloaded: " + jsonLease);
AzureBlobLease rehydrated = (AzureBlobLease)JsonConvert.DeserializeObject(jsonLease, typeof(AzureBlobLease));
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
return blobLease;
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
return blobLease;
}

Exception HandleStorageException(string partitionId, StorageException se)
Expand Down