Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address .NET 5-specific suggestions for System.Net.Http.Json #34355

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
<Compile Include="System\Net\Http\Json\JsonContent.netcoreapp.cs" />
<Compile Include="System\Net\Http\Json\TranscodingReadStream.netcoreapp.cs" />
<Compile Include="System\Net\Http\Json\TranscodingWriteStream.netcoreapp.cs" />
<Reference Include="System.Net.Mail" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Primitives" />
<Reference Include="System.Runtime" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<Compile Include="System\ArraySegmentExtensions.netstandard.cs" />
<Compile Include="System\Net\Http\Json\TranscodingReadStream.netstandard.cs" />
<Compile Include="System\Net\Http\Json\TranscodingWriteStream.netstandard.cs" />
<Reference Include="System.Buffers" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

namespace System.Net.Http.Json
{
internal sealed class TranscodingReadStream : Stream
/// <summary>
/// Adds a transcode-to-UTF-8 layer to the read operations on another stream.
/// </summary>
internal sealed partial class TranscodingReadStream : Stream
{
private static readonly int OverflowBufferSize = Encoding.UTF8.GetMaxByteCount(1); // The most number of bytes used to represent a single UTF char

Expand All @@ -25,9 +28,10 @@ internal sealed class TranscodingReadStream : Stream
private readonly Decoder _decoder;
private readonly Encoder _encoder;

private ArraySegment<byte> _byteBuffer;
private ArraySegment<char> _charBuffer;
private ArraySegment<byte> _overflowBuffer;
private byte[] _pooledBytes;
private byte[] _pooledOverflowBytes;
private char[] _pooledChars;

private bool _disposed;

public TranscodingReadStream(Stream input, Encoding sourceEncoding)
Expand All @@ -36,15 +40,17 @@ public TranscodingReadStream(Stream input, Encoding sourceEncoding)

// The "count" in the buffer is the size of any content from a previous read.
// Initialize them to 0 since nothing has been read so far.
_byteBuffer = new ArraySegment<byte>(ArrayPool<byte>.Shared.Rent(MaxByteBufferSize), 0, count: 0);
_pooledBytes = ArrayPool<byte>.Shared.Rent(MaxByteBufferSize);

// Attempt to allocate a char buffer than can tolerate the worst-case scenario for this
// encoding. This would allow the byte -> char conversion to complete in a single call.
// The conversion process is tolerant of char buffer that is not large enough to convert all the bytes at once.
int maxCharBufferSize = sourceEncoding.GetMaxCharCount(MaxByteBufferSize);
_charBuffer = new ArraySegment<char>(ArrayPool<char>.Shared.Rent(maxCharBufferSize), 0, count: 0);
_pooledChars = ArrayPool<char>.Shared.Rent(maxCharBufferSize);

_pooledOverflowBytes = ArrayPool<byte>.Shared.Rent(OverflowBufferSize);

_overflowBuffer = new ArraySegment<byte>(ArrayPool<byte>.Shared.Rent(OverflowBufferSize), 0, count: 0);
InitializeBuffers();

_decoder = sourceEncoding.GetDecoder();
_encoder = Encoding.UTF8.GetEncoder();
Expand All @@ -61,136 +67,9 @@ public override long Position
set => throw new NotSupportedException();
}

internal int ByteBufferCount => _byteBuffer.Count;
internal int CharBufferCount => _charBuffer.Count;
internal int OverflowCount => _overflowBuffer.Count;

public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}

if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}

if (buffer.Length - offset < count)
{
throw new ArgumentException(SR.Argument_InvalidOffLen);
}

var readBuffer = new ArraySegment<byte>(buffer, offset, count);
return ReadAsyncCore(readBuffer, cancellationToken);
}

private async Task<int> ReadAsyncCore(ArraySegment<byte> readBuffer, CancellationToken cancellationToken)
{
if (readBuffer.Count == 0)
{
return 0;
}

if (_overflowBuffer.Count > 0)
{
int bytesToCopy = Math.Min(readBuffer.Count, _overflowBuffer.Count);
_overflowBuffer.Slice(0, bytesToCopy).CopyTo(readBuffer);

_overflowBuffer = _overflowBuffer.Slice(bytesToCopy);

// If we have any overflow bytes, avoid complicating the remainder of the code, by returning as
// soon as we copy any content.
return bytesToCopy;
}

bool shouldFlushEncoder = false;
// Only read more content from the input stream if we have exhausted all the buffered chars.
if (_charBuffer.Count == 0)
{
int bytesRead = await ReadInputChars(cancellationToken).ConfigureAwait(false);
shouldFlushEncoder = bytesRead == 0 && _byteBuffer.Count == 0;
}

bool completed = false;
int charsRead = default;
int bytesWritten = default;
// Since Convert() could fail if the destination buffer cannot fit at least one encoded char.
// If the destination buffer is smaller than GetMaxByteCount(1), we avoid encoding to the destination and we use the overflow buffer instead.
if (readBuffer.Count > OverflowBufferSize || _charBuffer.Count == 0)
{
_encoder.Convert(_charBuffer.Array!, _charBuffer.Offset, _charBuffer.Count, readBuffer.Array!, readBuffer.Offset, readBuffer.Count,
flush: shouldFlushEncoder, out charsRead, out bytesWritten, out completed);
}

_charBuffer = _charBuffer.Slice(charsRead);

if (completed || bytesWritten > 0)
{
return bytesWritten;
}

_encoder.Convert(_charBuffer.Array!, _charBuffer.Offset, _charBuffer.Count, _overflowBuffer.Array!, byteIndex: 0, _overflowBuffer.Array!.Length,
flush: shouldFlushEncoder, out int overFlowChars, out int overflowBytes, out completed);

Debug.Assert(overflowBytes > 0 && overFlowChars > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accommodate at least one char.");

_charBuffer = _charBuffer.Slice(overFlowChars);

// readBuffer: [ 0, 0, ], overflowBuffer: [ 7, 13, 34, ]
// Fill up the readBuffer to capacity, so the result looks like so:
// readBuffer: [ 7, 13 ], overflowBuffer: [ 34 ]
Debug.Assert(readBuffer.Count < overflowBytes);
_overflowBuffer.Array.AsSpan(0, readBuffer.Count).CopyTo(readBuffer);

Debug.Assert(_overflowBuffer.Array != null);

_overflowBuffer = new ArraySegment<byte>(_overflowBuffer.Array, readBuffer.Count, overflowBytes - readBuffer.Count);

Debug.Assert(_overflowBuffer.Count > 0);

return readBuffer.Count;
}

private async Task<int> ReadInputChars(CancellationToken cancellationToken)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content into
// the segment that follows.
Debug.Assert(_byteBuffer.Array != null);
Buffer.BlockCopy(_byteBuffer.Array, _byteBuffer.Offset, _byteBuffer.Array, 0, _byteBuffer.Count);

int offset = _byteBuffer.Count;
int count = _byteBuffer.Array.Length - _byteBuffer.Count;

int bytesRead = await _stream.ReadAsync(_byteBuffer.Array, offset, count, cancellationToken).ConfigureAwait(false);

_byteBuffer = new ArraySegment<byte>(_byteBuffer.Array, 0, offset + bytesRead);

Debug.Assert(_byteBuffer.Array != null);
Debug.Assert(_charBuffer.Array != null);
Debug.Assert(_charBuffer.Count == 0, "We should only expect to read more input chars once all buffered content is read");

_decoder.Convert(_byteBuffer.Array, _byteBuffer.Offset, _byteBuffer.Count, _charBuffer.Array, charIndex: 0, _charBuffer.Array.Length,
flush: bytesRead == 0, out int bytesUsed, out int charsUsed, out _);

// We flush only when the stream is exhausted and there are no pending bytes in the buffer.
Debug.Assert(bytesRead != 0 || _byteBuffer.Count - bytesUsed == 0);

_byteBuffer = _byteBuffer.Slice(bytesUsed);
_charBuffer = new ArraySegment<char>(_charBuffer.Array, 0, charsUsed);

return bytesRead;
}

public override void Flush()
=> throw new NotSupportedException();

Expand All @@ -209,17 +88,17 @@ protected override void Dispose(bool disposing)
{
_disposed = true;

Debug.Assert(_charBuffer.Array != null);
ArrayPool<char>.Shared.Return(_charBuffer.Array);
_charBuffer = default;
Debug.Assert(_pooledChars != null);
ArrayPool<char>.Shared.Return(_pooledChars);
_pooledChars = null!;

Debug.Assert(_byteBuffer.Array != null);
ArrayPool<byte>.Shared.Return(_byteBuffer.Array);
_byteBuffer = default;
Debug.Assert(_pooledBytes != null);
ArrayPool<byte>.Shared.Return(_pooledBytes);
_pooledBytes = null!;

Debug.Assert(_overflowBuffer.Array != null);
ArrayPool<byte>.Shared.Return(_overflowBuffer.Array);
_overflowBuffer = default;
Debug.Assert(_pooledOverflowBytes != null);
ArrayPool<byte>.Shared.Return(_pooledOverflowBytes);
_pooledOverflowBytes = null!;

_stream.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.Http.Json
{
internal sealed partial class TranscodingReadStream : Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need a private implementation for .NET 5; you should contribute this as part of #30260

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GrabYourPitchforks are you actively working on that issue, it is marked for 5.0, is that still the plan?
I want to assume that these TranscodingStreams aren't going to be the fix that closes above issue.

In that case, I think it makes more sense if I close this PR and wait for 30260 to close.

{
private Memory<byte> _byteBuffer;
private Memory<char> _charBuffer;
private Memory<byte> _overflowBuffer;

internal int ByteBufferCount => _byteBuffer.Length;
internal int CharBufferCount => _charBuffer.Length;
internal int OverflowCount => _overflowBuffer.Length;

private void InitializeBuffers() { }

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}

if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}

if (buffer.Length - offset < count)
{
throw new ArgumentException(SR.Argument_InvalidOffLen);
}
Comment on lines +26 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Memory ctor performs these checks; shouldn't need them here unless parameter names are different.


return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public async override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.IsEmpty)
{
return 0;
}

if (!_overflowBuffer.IsEmpty)
{
int bytesToCopy = Math.Min(buffer.Length, _overflowBuffer.Length);

_overflowBuffer.Slice(0, bytesToCopy).CopyTo(buffer);
_overflowBuffer = _overflowBuffer.Slice(bytesToCopy);

// If we have any overflow bytes, avoid complicating the remainder of the code, by returning as
// soon as we copy any content.
return bytesToCopy;
}

bool shouldFlushEncoder = false;
// Only read more content from the input stream if we have exhausted all the buffered chars.
if (_charBuffer.IsEmpty)
{
int bytesRead = await ReadInputChars(cancellationToken).ConfigureAwait(false);
shouldFlushEncoder = bytesRead == 0 && _byteBuffer.Length == 0;
}

bool completed = false;
int charsRead = default;
int bytesWritten = default;
// Since Convert() could fail if the destination buffer cannot fit at least one encoded char.
// If the destination buffer is smaller than GetMaxByteCount(1), we avoid encoding to the destination and we use the overflow buffer instead.
if (buffer.Length > OverflowBufferSize || _charBuffer.IsEmpty)
{
_encoder.Convert(_charBuffer.Span, buffer.Span, flush: shouldFlushEncoder, out charsRead, out bytesWritten, out completed);
}

_charBuffer = _charBuffer.Slice(charsRead);

if (completed || bytesWritten > 0)
{
return bytesWritten;
}

// If the buffer was too small, transcode to the overflow buffer.
_overflowBuffer = new Memory<byte>(_pooledOverflowBytes);
_encoder.Convert(_charBuffer.Span, _overflowBuffer.Span, flush: shouldFlushEncoder, out charsRead, out bytesWritten, out _);
Debug.Assert(bytesWritten > 0 && charsRead > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accommodate at least one char.");

_charBuffer = _charBuffer.Slice(charsRead);
_overflowBuffer = _overflowBuffer.Slice(0, bytesWritten);

Debug.Assert(buffer.Length < bytesWritten);
_overflowBuffer.Slice(0, buffer.Length).CopyTo(buffer);

_overflowBuffer = _overflowBuffer.Slice(buffer.Length);

return buffer.Length;
}

private async ValueTask<int> ReadInputChars(CancellationToken cancellationToken)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content into
// the space that follows.
ReadOnlyMemory<byte> previousBytes = _byteBuffer;
_byteBuffer = new Memory<byte>(_pooledBytes);
previousBytes.CopyTo(_byteBuffer);

int bytesRead = await _stream.ReadAsync(_byteBuffer.Slice(previousBytes.Length), cancellationToken).ConfigureAwait(false);

Debug.Assert(_charBuffer.IsEmpty, "We should only expect to read more input chars once all buffered content is read");

_charBuffer = new Memory<char>(_pooledChars);
_byteBuffer = _byteBuffer.Slice(0, previousBytes.Length + bytesRead);

_decoder.Convert(_byteBuffer.Span, _charBuffer.Span, flush: bytesRead == 0, out int bytesUsed, out int charsUsed, out _);

// We flush only when the stream is exhausted and there are no pending bytes in the buffer.
Debug.Assert(bytesRead != 0 || _byteBuffer.Length - bytesUsed == 0);

_byteBuffer = _byteBuffer.Slice(bytesUsed);
_charBuffer = _charBuffer.Slice(0, charsUsed);

return bytesRead;
}
}
}
Loading