Skip to content

Commit

Permalink
[Storage][DataMovement] Adjust local file buffer size (#47043)
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Nov 12, 2024
1 parent 94018d3 commit 338bbe7
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ internal class JobPartPlanFile : IDisposable
/// </summary>
public readonly SemaphoreSlim WriteLock;

private const int DefaultBufferSize = 81920;

private JobPartPlanFile()
{
WriteLock = new SemaphoreSlim(1);
Expand Down Expand Up @@ -66,7 +64,10 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
{
using (FileStream fileStream = File.Create(result.FileName.ToString()))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
await headerStream.CopyToAsync(
fileStream,
DataMovementConstants.DefaultStreamCopyBufferSize,
cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ internal class JobPlanFile : IDisposable
/// </summary>
public readonly SemaphoreSlim WriteLock;

private const int DefaultBufferSize = 81920;

private JobPlanFile(string id, string filePath)
{
Id = id;
Expand All @@ -63,7 +61,10 @@ public static async Task<JobPlanFile> CreateJobPlanFileAsync(
{
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
await headerStream.CopyToAsync(
fileStream,
DataMovementConstants.DefaultStreamCopyBufferSize,
cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ protected internal override async Task CopyFromStreamAsync(
{
fileStream.Seek(position, SeekOrigin.Begin);
}
await stream.CopyToAsync(
fileStream,
(int)streamLength,
cancellationToken)
.ConfigureAwait(false);

int bufferSize = Math.Min((int)streamLength, DataMovementConstants.DefaultStreamCopyBufferSize);
await stream.CopyToAsync(fileStream, bufferSize, cancellationToken).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public override async Task<Stream> ReadJobPlanFileAsync(
int length,
CancellationToken cancellationToken = default)
{
int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize;
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, maxArraySize);
int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize;
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferSize);

CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
Expand All @@ -164,7 +164,7 @@ public override async Task<Stream> ReadJobPlanFileAsync(
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
{
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
}

copiedStream.Position = 0;
Expand Down Expand Up @@ -193,16 +193,16 @@ public override async Task<Stream> ReadJobPartPlanFileAsync(
{
if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile))
{
int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize;
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, maxArraySize);
int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize;
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferSize);

await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath))
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
{
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
}

copiedStream.Position = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class DataMovementConstants
internal const int MaxJobPartReaders = 64;
internal const int MaxJobChunkTasks = 3000;
internal const int StatusCheckInSec = 10;
internal const int DefaultArrayPoolArraySize = 4 * 1024;
internal const int DefaultStreamCopyBufferSize = 81920; // Use the .NET default

internal const long DefaultInitialTransferSize = 32 * Constants.MB;
internal const long DefaultChunkSize = 4 * Constants.MB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public async Task WriteChunkToTempFile(string chunkFilePath, Stream source)
{
await source.CopyToAsync(
fileStream,
Constants.DefaultDownloadCopyBufferSize,
DataMovementConstants.DefaultStreamCopyBufferSize,
_cancellationToken)
.ConfigureAwait(false);
}
Expand Down

0 comments on commit 338bbe7

Please sign in to comment.