Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/storage/stream position deadlock bug #14301

Merged
1 change: 1 addition & 0 deletions sdk/storage/Azure.Storage.Blobs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.6.0-preview.1 (Unreleased)
- Fixed bug where BlobClient.Upload(), BlockBlobClient.Upload(), AppendBlobClient.AppendBlock(), and PageBlobClient.UploadPages() would deadlock if the content stream's position was not 0.
- Fixed bug in BlobBaseClient.OpenRead() causing us to do more download called than necessary.

## 12.5.1 (2020-08-18)
Expand Down
3 changes: 2 additions & 1 deletion sdk/storage/Azure.Storage.Blobs/src/AppendBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,15 @@ internal async Task<Response<BlobAppendInfo>> AppendBlockInternal(
try
{
BlobErrors.VerifyHttpsCustomerProvidedKey(Uri, CustomerProvidedKey);
Errors.VerifyStreamPosition(content, nameof(content));

content = content?.WithNoDispose().WithProgress(progressHandler);
return await BlobRestClient.AppendBlob.AppendBlockAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
leaseId: conditions?.LeaseId,
Expand Down
7 changes: 5 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/BlockBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,14 @@ internal virtual async Task<Response<BlobContentInfo>> UploadInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));

return await BlobRestClient.BlockBlob.UploadAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
blobContentType: blobHttpHeaders?.ContentType,
blobContentEncoding: blobHttpHeaders?.ContentEncoding,
Expand Down Expand Up @@ -967,14 +969,15 @@ internal virtual async Task<Response<BlockInfo>> StageBlockInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));
content = content.WithNoDispose().WithProgress(progressHandler);
return await BlobRestClient.BlockBlob.StageBlockAsync(
ClientDiagnostics,
Pipeline,
Uri,
blockId: base64BlockId,
body: content,
contentLength: content.Length,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
leaseId: conditions?.LeaseId,
Expand Down
5 changes: 3 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/PageBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,15 +1083,16 @@ internal async Task<Response<PageInfo>> UploadPagesInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));
content = content?.WithNoDispose().WithProgress(progressHandler);
var range = new HttpRange(offset, content?.Length ?? null);
var range = new HttpRange(offset, (content?.Length - content?.Position) ?? null);

return await BlobRestClient.PageBlob.UploadPagesAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
timeout: default,
Expand Down
51 changes: 51 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/AppendBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,57 @@ public async Task AppendBlockAsync_ProgressReporting()
Assert.AreEqual(blobSize, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task AppendBlockAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
var data = GetRandomBuffer(blobSize);

// Act
using Stream stream = new MemoryStream(data);
stream.Position = stream.Length;

await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.AppendBlockAsync(stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task AppendBlockAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(blobSize);
byte[] expectedData = new byte[blobSize - position];
Array.Copy(data, position, expectedData, 0, blobSize - position);

// Act
using Stream stream = new MemoryStream(data)
{
Position = position
};
await blob.AppendBlockAsync(stream);

// Assert
Response<BlobDownloadInfo> result = await blob.DownloadAsync();
var dataResult = new MemoryStream();
await result.Value.Content.CopyToAsync(dataResult);
Assert.AreEqual(blobSize - position, dataResult.Length);
TestHelper.AssertSequenceEqual(expectedData, dataResult.ToArray());
}

[Test]
public async Task AppendBlockFromUriAsync_Min()
{
Expand Down
88 changes: 88 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,94 @@ public async Task UploadAsync_File_UploadsBlock()
Assert.AreEqual(BlobType.Block, properties.Value.BlobType);
}

[Test]
public async Task UploadAsync_Stream_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
byte[] data = GetRandomBuffer(size);

using Stream stream = new MemoryStream(data)
{
Position = size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
[TestCase(1)]
[TestCase(4)]
Expand Down
150 changes: 150 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlockBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,66 @@ public async Task StageBlockAsync_ProgressReporting()
Assert.AreEqual(100 * Constants.MB, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task StageBlockAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blockBlobName = GetNewBlobName();
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(blockBlobName));
byte[] data = GetRandomBuffer(Size);

using Stream stream = new MemoryStream(data)
{
Position = Size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.StageBlockAsync(
base64BlockId: ToBase64(GetNewBlockName()),
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task StageBlockAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

string blockId = ToBase64(GetNewBlockName());

// Act
await blob.StageBlockAsync(
blockId,
content: stream);

await blob.CommitBlockListAsync(new List<string>()
{
blockId
});

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task StageBlockFromUriAsync_Min()
{
Expand Down Expand Up @@ -2060,6 +2120,96 @@ public async Task UploadAsync_VersionId()
}
}

[Test]
public async Task UploadAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blockBlobName = GetNewBlobName();
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(blockBlobName));
byte[] data = GetRandomBuffer(Size);

using Stream stream = new MemoryStream(data)
{
Position = Size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task GetBlockBlobClient_AsciiName()
{
Expand Down
Loading