Skip to content

Commit

Permalink
[C#] Azure Storage Device improvements (#357)
Browse files Browse the repository at this point in the history
* Blob device improvements (from DF)
* Max Azure Page Blob size set at 8 GB
  • Loading branch information
badrishc authored Oct 25, 2020
1 parent 5d24bb8 commit 7e6c5fc
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 74 deletions.
152 changes: 114 additions & 38 deletions cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ public class AzureStorageDevice : StorageDeviceBase
private readonly string blobName;
private readonly bool underLease;

internal BlobRequestOptions BlobRequestOptions { get; private set; }
internal BlobRequestOptions BlobRequestOptionsWithoutRetry { get; private set; }
internal BlobRequestOptions BlobRequestOptionsWithRetry { get; private set; }

// Page Blobs permit blobs of max size 8 TB, but the emulator permits only 2 GB
private const long MAX_BLOB_SIZE = (long)(2 * 10e8);
// Azure Page Blobs have a fixed sector size of 512 bytes.
private const uint PAGE_BLOB_SECTOR_SIZE = 512;

// Max upload size must be at most 4MB
// we use an even smaller value to improve retry/timeout behavior in highly contended situations
private const uint MAX_UPLOAD_SIZE = 1024 * 1024;

// Max Azure Page Blob size (used when segment size is not specified): we set this at 8 GB
private const long MAX_PAGEBLOB_SIZE = 8L * 1024 * 1024 * 1024;

// Whether blob files are deleted on close
private readonly bool deleteOnClose;

Expand All @@ -55,7 +62,8 @@ public AzureStorageDevice(CloudBlobDirectory cloudBlobDirectory, string blobName
this.deleteOnClose = deleteOnClose;

this.BlobManager = blobManager ?? new DefaultBlobManager(this.underLease, this.blobDirectory);
this.BlobRequestOptions = BlobManager.GetBlobRequestOptions();
this.BlobRequestOptionsWithoutRetry = BlobManager.GetBlobRequestOptionsWithoutRetry();
this.BlobRequestOptionsWithRetry = BlobManager.GetBlobRequestOptionsWithRetry();

StartAsync().Wait();
}
Expand Down Expand Up @@ -89,7 +97,8 @@ public AzureStorageDevice(string connectionString, string containerName, string
this.deleteOnClose = deleteOnClose;

this.BlobManager = blobManager ?? new DefaultBlobManager(this.underLease, this.blobDirectory);
this.BlobRequestOptions = BlobManager.GetBlobRequestOptions();
this.BlobRequestOptionsWithoutRetry = BlobManager.GetBlobRequestOptionsWithoutRetry();
this.BlobRequestOptionsWithRetry = BlobManager.GetBlobRequestOptionsWithRetry();

StartAsync().Wait();
}
Expand All @@ -109,7 +118,8 @@ private async Task StartAsync()
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}
var response = await this.blobDirectory.ListBlobsSegmentedAsync(useFlatBlobListing: false, blobListingDetails: BlobListingDetails.None, maxResults: 1000,
currentToken: continuationToken, options: this.BlobRequestOptions, operationContext: null).ConfigureAwait(false);
currentToken: continuationToken, options: this.BlobRequestOptionsWithRetry, operationContext: null)
.ConfigureAwait(BlobManager.ConfigureAwaitForStorage);

foreach (IListBlobItem item in response.Results)
{
Expand Down Expand Up @@ -219,21 +229,47 @@ private async Task WritePortionToBlobAsync(UnmanagedMemoryStream stream, CloudPa
{
try
{
if (this.underLease)
await BlobManager.AsyncStorageWriteMaxConcurrency.WaitAsync();

int numAttempts = 0;
long streamPosition = stream.Position;

while (true) // retry loop
{
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}
numAttempts++;
try
{
if (this.underLease)
{
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}

await blob.WritePagesAsync(stream, destinationAddress + offset,
contentChecksum: null, accessCondition: null, options: this.BlobRequestOptions, operationContext: null, cancellationToken: this.BlobManager.CancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
this.BlobManager?.HandleBlobError(nameof(WritePortionToBlobAsync), "could not write to page blob", blob?.Name, exception, true);
throw;
if (length > 0)
{
await blob.WritePagesAsync(stream, destinationAddress + offset,
contentChecksum: null, accessCondition: null, options: this.BlobRequestOptionsWithoutRetry, operationContext: null, cancellationToken: this.BlobManager.CancellationToken)
.ConfigureAwait(BlobManager.ConfigureAwaitForStorage);
}
break;
}
catch (StorageException e) when (this.underLease && IsTransientStorageError(e) && numAttempts < BlobManager.MaxRetries)
{
TimeSpan nextRetryIn = TimeSpan.FromSeconds(1 + Math.Pow(2, (numAttempts - 1)));
this.BlobManager?.HandleBlobError(nameof(WritePortionToBlobAsync), $"could not write to page blob, will retry in {nextRetryIn}s", blob?.Name, e, false);
await Task.Delay(nextRetryIn);
stream.Seek(streamPosition, SeekOrigin.Begin); // must go back to original position before retrying
continue;
}
catch (Exception exception) when (!IsFatal(exception))
{
this.BlobManager?.HandleBlobError(nameof(WritePortionToBlobAsync), "could not write to page blob", blob?.Name, exception, false);
throw;
}
};
}
finally
{
BlobManager.AsyncStorageWriteMaxConcurrency.Release();
stream.Dispose();
}
}
Expand All @@ -249,32 +285,55 @@ private async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, CloudPageBlob

try
{
if (this.underLease)
await BlobManager.AsyncStorageReadMaxConcurrency.WaitAsync();

int numAttempts = 0;

while (true) // retry loop
{
Debug.WriteLine($"confirm lease");
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
Debug.WriteLine($"confirm lease done");
}
numAttempts++;
try
{
if (this.underLease)
{
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}

Debug.WriteLine($"starting download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");
Debug.WriteLine($"starting download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");

await blob.DownloadRangeToStreamAsync(stream, sourceAddress, readLength,
accessCondition: null, options: this.BlobRequestOptions, operationContext: null, cancellationToken: this.BlobManager.CancellationToken);
if (readLength > 0)
{
await blob.DownloadRangeToStreamAsync(stream, sourceAddress, readLength,
accessCondition: null, options: this.BlobRequestOptionsWithoutRetry, operationContext: null, cancellationToken: this.BlobManager.CancellationToken)
.ConfigureAwait(BlobManager.ConfigureAwaitForStorage);
}

Debug.WriteLine($"finished download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");
Debug.WriteLine($"finished download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");

if (stream.Position != readLength)
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={readLength}, actual={stream.Position}");
if (stream.Position != readLength)
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={readLength}, actual={stream.Position}");
}
break;
}
catch (StorageException e) when (this.underLease && IsTransientStorageError(e) && numAttempts < BlobManager.MaxRetries)
{
TimeSpan nextRetryIn = TimeSpan.FromSeconds(1 + Math.Pow(2, (numAttempts - 1)));
this.BlobManager?.HandleBlobError(nameof(ReadFromBlobAsync), $"could not write to page blob, will retry in {nextRetryIn}s", blob?.Name, e, false);
await Task.Delay(nextRetryIn);
stream.Seek(0, SeekOrigin.Begin); // must go back to original position before retrying
continue;
}
catch (Exception exception) when (!IsFatal(exception))
{
this.BlobManager?.HandleBlobError(nameof(ReadFromBlobAsync), "could not read from page blob", blob?.Name, exception, false);
throw;
}
}
}
catch (Exception exception)
{
this.BlobManager?.HandleBlobError(nameof(ReadFromBlobAsync), "could not read from page blob", blob?.Name, exception, true);
throw new FasterException(nameof(ReadFromBlobAsync) + "could not read from page blob " + blob?.Name, exception);
}
finally
{
BlobManager.AsyncStorageReadMaxConcurrency.Release();
stream.Dispose();
}
}
Expand Down Expand Up @@ -330,7 +389,7 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti
// If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because
// page blobs are not backed by real pages on creation, and the given size is only a the physical limit of
// how large it can grow to.
var size = segmentSize == -1 ? MAX_BLOB_SIZE : segmentSize;
var size = segmentSize == -1 ? MAX_PAGEBLOB_SIZE : segmentSize;

// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call write.
Expand Down Expand Up @@ -372,18 +431,35 @@ private unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, u
});
}

const int maxPortionSizeForPageBlobWrites = 4 * 1024 * 1024; // 4 MB is a limit on page blob write portions, apparently

private async Task WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite)
{
long offset = 0;
while (numBytesToWrite > 0)
{
var length = Math.Min(numBytesToWrite, maxPortionSizeForPageBlobWrites);
var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE);
await this.WritePortionToBlobUnsafeAsync(blob, sourceAddress, destinationAddress, offset, length).ConfigureAwait(false);
numBytesToWrite -= length;
offset += length;
}
}

private static bool IsTransientStorageError(StorageException e)
{
return (e.RequestInformation.HttpStatusCode == 408) //408 Request Timeout
|| (e.RequestInformation.HttpStatusCode == 429) //429 Too Many Requests
|| (e.RequestInformation.HttpStatusCode == 500) //500 Internal Server Error
|| (e.RequestInformation.HttpStatusCode == 502) //502 Bad Gateway
|| (e.RequestInformation.HttpStatusCode == 503) //503 Service Unavailable
|| (e.RequestInformation.HttpStatusCode == 504); //504 Gateway Timeout
}

private static bool IsFatal(Exception exception)
{
if (exception is OutOfMemoryException || exception is StackOverflowException)
{
return true;
}
return false;
}
}
}
}
2 changes: 1 addition & 1 deletion cs/src/devices/AzureStorageDevice/BlobEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task CreateAsync(long size, CloudPageBlob pageBlob)
}

await pageBlob.CreateAsync(size,
accessCondition: null, options: this.blobManager.GetBlobRequestOptions(), operationContext: null, this.blobManager.CancellationToken);
accessCondition: null, options: this.blobManager.GetBlobRequestOptionsWithRetry(), operationContext: null, this.blobManager.CancellationToken);

// At this point the blob is fully created. After this line all consequent writers will write immediately. We just
// need to clear the queue of pending writers.
Expand Down
Loading

0 comments on commit 7e6c5fc

Please sign in to comment.