Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response.Content to "move buffering" #16

Draft
wants to merge 22 commits into
base: core2-move-buffering-to-transport
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net461.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net472.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
80 changes: 80 additions & 0 deletions sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Azure.Core.Buffers
{
internal static class AzureBaseBuffersExtensions
{
// Same value as Stream.CopyTo uses by default
private const int DefaultCopyBufferSize = 81920;

public static async Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
{
Argument.AssertNotNull(stream, nameof(stream));
Expand Down Expand Up @@ -87,5 +90,82 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence<byte> b
ArrayPool<byte>.Shared.Return(array);
}
}

public static async Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
while (true)
{
#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets
if (bytesRead == 0)
break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void CopyTo(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
Expand Down Expand Up @@ -41,6 +44,21 @@ public override Stream? ContentStream
set => _pipelineResponse.ContentStream = value;
}

public override BinaryData Content
{
get
{
ResetContentStreamPosition(_pipelineResponse);
return _pipelineResponse.Content;
}
}

public override BinaryData ReadContent(CancellationToken cancellationToken = default)
=> _pipelineResponse.ReadContent(cancellationToken);

public override async ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
=> await base.ReadContentAsync(cancellationToken).ConfigureAwait(false);

protected internal override bool ContainsHeader(string name)
=> _pipelineResponse.Headers.TryGetValue(name, out _);

Expand All @@ -61,8 +79,24 @@ protected internal override bool TryGetHeaderValues(string name, [NotNullWhen(tr
public override void Dispose()
{
PipelineResponse response = _pipelineResponse;
ResetContentStreamPosition(response);
response?.Dispose();
}

private void ResetContentStreamPosition(PipelineResponse response)
{
if (response.ContentStream is MemoryStream stream && stream.Position != 0)
{
// Azure.Core Response has a contract that ContentStream can be read
// without setting position back to 0. This means if ReadContent is
// called after such a read, the buffer will contain empty BinaryData.

// So that the ClientModel response implementations don't throw,
// set the position back to 0 if Azure.Core Response default
// ReadContent was called.
stream.Position = 0;
}
}
}
}
}
4 changes: 2 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public override async ValueTask ProcessAsync(HttpMessage message)
{
if (message.HasResponse)
{
throw new RequestFailedException(message.Response, e.InnerException);
throw await RequestFailedException.CreateAsync(message.Response, innerException: e.InnerException).ConfigureAwait(false);
}
else
{
throw new RequestFailedException(e.Message, e.InnerException);
throw new RequestFailedException(e.Message, innerException: e.InnerException);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public override Stream? ContentStream
}
}

// TODO: Implement Content and ReadContent

public override string ClientRequestId { get; set; }

public override void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPip
ContentTypeUtilities.TryGetTextEncoding(response.Headers.ContentType, out Encoding? responseTextEncoding);

bool wrapResponseContent = response.ContentStream != null &&
response.ContentStream?.CanSeek == false &&
message.BufferResponse == false &&
logWrapper.IsEnabled(isError);

double elapsed = (after - before) / (double)Stopwatch.Frequency;
Expand Down
Loading