Skip to content

Commit

Permalink
Remove Result for AzureStorageCheckpointLeaseManager GetAllLeases (#346)
Browse files Browse the repository at this point in the history
* Remove Result for async call

* Get awaiter get result for GetAllLeases

* Remove useless using

* Remove useless initializator

* Replace Task Run Call

* Remove Task Run
  • Loading branch information
David Revoledo authored and serkantkaraca committed Nov 12, 2018
1 parent ee96547 commit 800b20c
Showing 1 changed file with 65 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Newtonsoft.Json;
using WindowsAzure.Storage;
Expand All @@ -19,7 +18,7 @@ class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager

static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2);
readonly CloudStorageAccount cloudStorageAccount;
readonly string leaseContainerName = null;
readonly string leaseContainerName;
readonly string storageBlobPrefix;
BlobRequestOptions renewRequestOptions;
OperationContext operationContext = null;
Expand Down Expand Up @@ -115,7 +114,7 @@ public Task<bool> CreateCheckpointStoreIfNotExistsAsync()

public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
{
AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = null;
if (lease != null && !string.IsNullOrEmpty(lease.Offset))
{
Expand All @@ -126,7 +125,7 @@ public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
};
}

return checkpoint;
return checkpoint;
}

[Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Expand All @@ -137,8 +136,8 @@ public Task UpdateCheckpointAsync(Checkpoint checkpoint)

public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber);

return checkpoint;
Expand Down Expand Up @@ -220,7 +219,7 @@ public async Task<bool> DeleteLeaseStoreAsync()
}
while (innerContinuationToken != null);
}
else if (blob is CloudBlockBlob)
else if (blob is CloudBlockBlob)
{
try
{
Expand All @@ -241,73 +240,75 @@ public async Task<bool> DeleteLeaseStoreAsync()

public async Task<Lease> GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease retval = null;
AzureBlobLease retval = null;

CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);

if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false))
{
{
retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}
}

return retval;
}

public IEnumerable<Task<Lease>> GetAllLeases()
{
List<Task<Lease>> leaseFutures = new List<Task<Lease>>();
IEnumerable<string> partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().Result;
IEnumerable<string> partitionIds =
this.host.PartitionManager.GetPartitionIdsAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();

foreach (string id in partitionIds)
{
leaseFutures.Add(GetLeaseAsync(id));
yield return GetLeaseAsync(id);
}

return leaseFutures;
}

public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease returnLease;
try
{
AzureBlobLease returnLease;
try
{
CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
returnLease = new AzureBlobLease(partitionId, leaseBlob);
string jsonLease = JsonConvert.SerializeObject(returnLease);

ProcessorEventSource.Log.AzureStorageManagerInfo(
this.host.HostName,
partitionId,
"CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName +
"CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName +
" consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix);
await leaseBlob.UploadTextAsync(
jsonLease,
null,
AccessCondition.GenerateIfNoneMatchCondition("*"),
null,
jsonLease,
null,
AccessCondition.GenerateIfNoneMatchCondition("*"),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists ||
catch (StorageException se)
{
if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists ||
se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMissing) // occurs when somebody else already has leased the blob
{
{
// The blob already exists.
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Lease already exists");
returnLease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
}
else
{
else
{
ProcessorEventSource.Log.AzureStorageManagerError(
this.host.HostName,
partitionId,
"CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.leaseContainerName +
" consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPrefix,
se.ToString());
throw;
throw;
}
}
return returnLease;
}

return returnLease;
}

public Task DeleteLeaseAsync(Lease lease)
Expand All @@ -328,7 +329,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
bool retval = true;
string newLeaseId = Guid.NewGuid().ToString();
string partitionId = lease.PartitionId;
try
try
{
string newToken;
await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false);
Expand All @@ -348,9 +349,9 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease");
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
}
else
Expand Down Expand Up @@ -380,12 +381,12 @@ await leaseBlob.UploadTextAsync(
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return retval;

return retval;
}

public Task<bool> RenewLeaseAsync(Lease lease)
Expand All @@ -398,19 +399,19 @@ async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
CloudBlockBlob leaseBlob = lease.Blob;
string partitionId = lease.PartitionId;

try
try
{
await leaseBlob.RenewLeaseAsync(
AccessCondition.GenerateLeaseCondition(lease.Token),
this.renewRequestOptions,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;

return true;
}

public Task<bool> ReleaseLeaseAsync(Lease lease)
Expand All @@ -425,7 +426,7 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
CloudBlockBlob leaseBlob = lease.Blob;
string partitionId = lease.PartitionId;

try
try
{
string leaseId = lease.Token;
AzureBlobLease releasedCopy = new AzureBlobLease(lease)
Expand All @@ -434,19 +435,19 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
Owner = string.Empty
};
await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(releasedCopy),
null,
AccessCondition.GenerateLeaseCondition(leaseId),
JsonConvert.SerializeObject(releasedCopy),
null,
AccessCondition.GenerateLeaseCondition(leaseId),
null,
this.operationContext).ConfigureAwait(false);
await leaseBlob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(leaseId)).ConfigureAwait(false);
}
catch (StorageException se)
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;

return true;
}

public Task<bool> UpdateLeaseAsync(Lease lease)
Expand All @@ -456,7 +457,7 @@ public Task<bool> UpdateLeaseAsync(Lease lease)

async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
{
if (lease == null)
if (lease == null)
{
return false;
}
Expand All @@ -465,7 +466,7 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Updating lease");

string token = lease.Token;
if (string.IsNullOrEmpty(token))
if (string.IsNullOrEmpty(token))
{
return false;
}
Expand All @@ -474,23 +475,23 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
await this.RenewLeaseAsync(lease).ConfigureAwait(false);

CloudBlockBlob leaseBlob = lease.Blob;
try
try
{
string jsonToUpload = JsonConvert.SerializeObject(lease);
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, $"Raw JSON uploading: {jsonToUpload}");
await leaseBlob.UploadTextAsync(
jsonToUpload,
null,
AccessCondition.GenerateLeaseCondition(token),
jsonToUpload,
null,
AccessCondition.GenerateLeaseCondition(token),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
catch (StorageException se)
{
throw HandleStorageException(partitionId, se);
}
return true;
}

return true;
}

async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
Expand All @@ -499,8 +500,8 @@ async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Raw JSON downloaded: " + jsonLease);
AzureBlobLease rehydrated = (AzureBlobLease)JsonConvert.DeserializeObject(jsonLease, typeof(AzureBlobLease));
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
return blobLease;
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
return blobLease;
}

Exception HandleStorageException(string partitionId, StorageException se)
Expand Down

0 comments on commit 800b20c

Please sign in to comment.