Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/storage/stream position deadlock bug #14301

Merged
1 change: 1 addition & 0 deletions sdk/storage/Azure.Storage.Blobs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.6.0-preview.1 (Unreleased)
- Fixed bug where BlobClient.Upload(), BlockBlobClient.Upload(), AppendBlobClient.AppendBlock(), and PageBlobClient.UploadPages() would deadlock if the content stream's position was not 0.

## 12.5.1 (2020-08-18)
- Bug in TaskExtensions.EnsureCompleted method that causes it to unconditionally throw an exception in the environments with synchronization context
Expand Down
3 changes: 2 additions & 1 deletion sdk/storage/Azure.Storage.Blobs/src/AppendBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,15 @@ internal async Task<Response<BlobAppendInfo>> AppendBlockInternal(
try
{
BlobErrors.VerifyHttpsCustomerProvidedKey(Uri, CustomerProvidedKey);
Errors.VerifyStreamPosition(content);

content = content?.WithNoDispose().WithProgress(progressHandler);
return await BlobRestClient.AppendBlob.AppendBlockAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: content?.Length - content?.Position ?? 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - consider adding explicit parens as most folks aren't as clear on precedence for newer operators like ?? and the interaction with lifted operators.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've fixed this everywhere.

version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
leaseId: conditions?.LeaseId,
Expand Down
8 changes: 5 additions & 3 deletions sdk/storage/Azure.Storage.Blobs/src/BlockBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,14 @@ internal virtual async Task<Response<BlobContentInfo>> UploadInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content);

return await BlobRestClient.BlockBlob.UploadAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: content?.Length - content?.Position ?? 0,
version: Version.ToVersionString(),
blobContentType: blobHttpHeaders?.ContentType,
blobContentEncoding: blobHttpHeaders?.ContentEncoding,
Expand Down Expand Up @@ -2190,9 +2192,9 @@ internal static PartitionedUploader<BlobUploadOptions, BlobContentInfo>.Behavior
progressHandler,
async,
cancellationToken).ConfigureAwait(false),
CommitPartitionedUpload = async (partitions, args, async, cancellationToken)
CommitPartitionedUpload = async (partitions, initialPosition, args, async, cancellationToken)
=> await client.CommitBlockListInternal(
partitions.Select(partition => StorageExtensions.GenerateBlockId(partition.Offset)),
partitions.Select(partition => StorageExtensions.GenerateBlockId(partition.Offset - initialPosition)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to wrap my head around this line in particular. Why does the blockId change based on the initialPosition? This whole PR is kind of like mapping between coordinate systems. We've got the data position relative to the local Stream and the data position relative to the blob in the cloud. I would have thought partitions would be in blob units rather than Stream units but maybe that's where I'm getting hung up?

args.HttpHeaders,
args.Metadata,
args.Tags,
Expand Down
5 changes: 3 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/PageBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,15 +1083,16 @@ internal async Task<Response<PageInfo>> UploadPagesInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content);
content = content?.WithNoDispose().WithProgress(progressHandler);
var range = new HttpRange(offset, content?.Length ?? null);
var range = new HttpRange(offset, content?.Length - content?.Position ?? null);

return await BlobRestClient.PageBlob.UploadPagesAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: content?.Length - content?.Position ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
timeout: default,
Expand Down
51 changes: 51 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/AppendBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,57 @@ public async Task AppendBlockAsync_ProgressReporting()
Assert.AreEqual(blobSize, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task AppendBlockAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
var data = GetRandomBuffer(blobSize);

// Act
using Stream stream = new MemoryStream(data);
stream.Position = stream.Length;

await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.AppendBlockAsync(stream),
e => Assert.AreEqual("stream.Position must be less than stream.Length", e.Message));
}

[Test]
public async Task AppendBlockAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(blobSize);
byte[] expectedData = new byte[blobSize - position];
Array.Copy(data, position, expectedData, 0, blobSize - position);

// Act
using Stream stream = new MemoryStream(data)
{
Position = position
};
await blob.AppendBlockAsync(stream);

// Assert
Response<BlobDownloadInfo> result = await blob.DownloadAsync();
var dataResult = new MemoryStream();
await result.Value.Content.CopyToAsync(dataResult);
Assert.AreEqual(blobSize - position, dataResult.Length);
TestHelper.AssertSequenceEqual(expectedData, dataResult.ToArray());
}

[Test]
public async Task AppendBlockFromUriAsync_Min()
{
Expand Down
88 changes: 88 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,94 @@ public async Task UploadAsync_File_UploadsBlock()
Assert.AreEqual(BlobType.Block, properties.Value.BlobType);
}

[Test]
public async Task UploadAsync_Stream_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
byte[] data = GetRandomBuffer(size);

using Stream stream = new MemoryStream(data)
{
Position = size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("stream.Position must be less than stream.Length", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
[TestCase(1)]
[TestCase(4)]
Expand Down
90 changes: 90 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlockBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,96 @@ public async Task UploadAsync_VersionId()
}
}

[Test]
public async Task UploadAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blockBlobName = GetNewBlobName();
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(blockBlobName));
byte[] data = GetRandomBuffer(Size);

using Stream stream = new MemoryStream(data)
{
Position = Size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("stream.Position must be less than stream.Length", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task GetBlockBlobClient_AsciiName()
{
Expand Down
55 changes: 55 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/PageBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,61 @@ await blob.UploadPagesAsync(
Assert.AreEqual(blobSize, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task UploadPagesAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
PageBlobClient blob = await CreatePageBlobClientAsync(test.Container, 4 * Constants.KB);
long size = Constants.KB;
byte[] data = GetRandomBuffer(size);

using Stream stream = new MemoryStream(data)
{
Position = size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadPagesAsync(
content: stream,
offset: 0),
e => Assert.AreEqual("stream.Position must be less than stream.Length", e.Message));
}

[Test]
public async Task UploadPagesAsync_NonZeroStreamPosition()
{
// Arrange
await using DisposingContainer test = await GetTestContainerAsync();

long size = Constants.KB;
long position = 512;
PageBlobClient blob = await CreatePageBlobClientAsync(test.Container, size - position);
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadPagesAsync(
content: stream,
offset: 0);

// Assert
Response<BlobDownloadInfo> response = await blob.DownloadAsync();

var actualData = new byte[512];
using var actualStream = new MemoryStream(actualData);
await response.Value.Content.CopyToAsync(actualStream);
TestHelper.AssertSequenceEqual(expectedData, actualData);
}

[Test]
public async Task ClearPagesAsync()
{
Expand Down
Loading