From ed5902c610ed1c814f54321e95434afe0eb9f710 Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 09:25:35 -0800 Subject: [PATCH 1/9] initial change --- .../src/Message/PipelineResponse.cs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index c42a744efa33b..679d9c7b51f45 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -3,6 +3,7 @@ using System.Buffers; using System.ClientModel.Internal; +using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -35,28 +36,24 @@ public abstract class PipelineResponse : IDisposable /// public abstract Stream? ContentStream { get; set; } + private byte[]? _contentBytes; public virtual BinaryData Content { get { - if (ContentStream == null) + if (ContentStream is null) { return s_emptyBinaryData; } - if (!TryGetBufferedContent(out MemoryStream bufferedContent)) + if (_contentBytes is not null) { - throw new InvalidOperationException($"The response is not buffered."); + return BinaryData.FromBytes(_contentBytes); } - if (bufferedContent.TryGetBuffer(out ArraySegment segment)) - { - return new BinaryData(segment.AsMemory()); - } - else - { - return new BinaryData(bufferedContent.ToArray()); - } + BufferContent(); + Debug.Assert(_contentBytes is not null); + return BinaryData.FromBytes(_contentBytes!); } } @@ -123,6 +120,9 @@ private async Task BufferContentSyncOrAsync(TimeSpan? timeout, CancellationToken responseContentStream.Dispose(); bufferStream.Position = 0; ContentStream = bufferStream; + + // TODO: Come back and optimize - this is only for POC at this stage. + _contentBytes= bufferStream.ToArray(); } private static async Task CopyToAsync(Stream source, Stream destination, TimeSpan timeout, CancellationTokenSource cancellationTokenSource) From ff9b43e4ebb3a7c3e480bcd69102cc3c0fc38636 Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 09:33:33 -0800 Subject: [PATCH 2/9] fix --- .../src/Message/PipelineResponse.cs | 2 +- .../tests/Message/PipelineResponseTests.cs | 16 ++++++++-------- .../Pipeline/ClientPipelineFunctionalTests.cs | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index 679d9c7b51f45..9ccb2255f7d99 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -100,7 +100,7 @@ internal async Task BufferContentAsync(TimeSpan? timeout = default, Cancellation private async Task BufferContentSyncOrAsync(TimeSpan? timeout, CancellationTokenSource? cts, bool async) { Stream? responseContentStream = ContentStream; - if (responseContentStream == null || TryGetBufferedContent(out _)) + if (responseContentStream == null || _contentBytes is not null) { // No need to buffer content. return; diff --git a/sdk/core/System.ClientModel/tests/Message/PipelineResponseTests.cs b/sdk/core/System.ClientModel/tests/Message/PipelineResponseTests.cs index a77a667de2b5e..cc6943974b62b 100644 --- a/sdk/core/System.ClientModel/tests/Message/PipelineResponseTests.cs +++ b/sdk/core/System.ClientModel/tests/Message/PipelineResponseTests.cs @@ -33,14 +33,14 @@ public void ContentPropertyGetsContent() Assert.AreEqual("body content", responseWithBody.Content.ToString()); } - [Test] - public void ContentPropertyThrowsForNonMemoryStream() - { - var response = new MockPipelineResponse(200); - response.ContentStream = new ThrowingStream(); - - Assert.Throws(() => { BinaryData d = response.Content; }); - } + //[Test] + //public void ContentPropertyThrowsForNonMemoryStream() + //{ + // var response = new MockPipelineResponse(200); + // response.ContentStream = new ThrowingStream(); + + // Assert.Throws(() => { BinaryData d = response.Content; }); + //} #region Helpers diff --git a/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs b/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs index 86e3bc1d831fe..42dd13bc16be3 100644 --- a/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs +++ b/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs @@ -174,7 +174,7 @@ public async Task NonBufferedFailedResponseStreamDisposed() await pipeline.SendSyncOrAsync(message, IsAsync); Assert.AreEqual(message.Response!.ContentStream!.CanSeek, false); - Assert.Throws(() => { var content = message.Response.Content; }); + //Assert.Throws(() => { var content = message.Response.Content; }); } Assert.Greater(reqNum, requestCount); @@ -269,7 +269,7 @@ public async Task TimesOutNonBufferedBodyReads() Assert.AreEqual(message.Response!.Status, 200); var responseContentStream = message.Response.ContentStream; - Assert.Throws(() => { var content = message.Response.Content; }); + //Assert.Throws(() => { var content = message.Response.Content; }); var buffer = new byte[10]; Assert.AreEqual(1, await responseContentStream!.ReadAsync(buffer, 0, 1)); var exception = Assert.ThrowsAsync(async () => await responseContentStream.ReadAsync(buffer, 0, 10)); From 3759444da9954cc61fff5c7612490a2b909321df Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 09:45:06 -0800 Subject: [PATCH 3/9] fix and nit --- sdk/core/Azure.Core/api/Azure.Core.net461.cs | 1 - sdk/core/Azure.Core/api/Azure.Core.net472.cs | 1 - sdk/core/Azure.Core/api/Azure.Core.net6.0.cs | 1 - .../Azure.Core/api/Azure.Core.netstandard2.0.cs | 1 - sdk/core/Azure.Core/src/Response.cs | 8 -------- .../tests/HttpPipelineFunctionalTests.cs | 6 +++--- sdk/core/Azure.Core/tests/ResponseTests.cs | 16 ++++++++-------- 7 files changed, 11 insertions(+), 23 deletions(-) diff --git a/sdk/core/Azure.Core/api/Azure.Core.net461.cs b/sdk/core/Azure.Core/api/Azure.Core.net461.cs index d5df397dda3dc..5c50a9e7f64e1 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net461.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net461.cs @@ -239,7 +239,6 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); diff --git a/sdk/core/Azure.Core/api/Azure.Core.net472.cs b/sdk/core/Azure.Core/api/Azure.Core.net472.cs index d5df397dda3dc..5c50a9e7f64e1 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net472.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net472.cs @@ -239,7 +239,6 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); diff --git a/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs b/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs index b5a657f013dce..5738c84f20f3f 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs @@ -239,7 +239,6 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); diff --git a/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs b/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs index d5df397dda3dc..5c50a9e7f64e1 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs @@ -239,7 +239,6 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); diff --git a/sdk/core/Azure.Core/src/Response.cs b/sdk/core/Azure.Core/src/Response.cs index 6efa478996bbd..b2d367461d6ab 100644 --- a/sdk/core/Azure.Core/src/Response.cs +++ b/sdk/core/Azure.Core/src/Response.cs @@ -29,14 +29,6 @@ public abstract class Response : PipelineResponse // TODO: is is possible to not new-slot this? public new virtual ResponseHeaders Headers => new ResponseHeaders(this); - /// - /// Gets the contents of HTTP response, if it is available. - /// - /// - /// Throws when is not a . - /// - public new virtual BinaryData Content => base.Content; - /// /// TBD. /// diff --git a/sdk/core/Azure.Core/tests/HttpPipelineFunctionalTests.cs b/sdk/core/Azure.Core/tests/HttpPipelineFunctionalTests.cs index d18d0f4084d19..acd91cf523870 100644 --- a/sdk/core/Azure.Core/tests/HttpPipelineFunctionalTests.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineFunctionalTests.cs @@ -95,7 +95,7 @@ public async Task NonBufferedExtractedStreamReadableAfterMessageDisposed() await ExecuteRequest(message, httpPipeline); Assert.False(message.Response.ContentStream.CanSeek); - Assert.Throws(() => { var content = message.Response.Content; }); + //Assert.Throws(() => { var content = message.Response.Content; }); extractedStream = message.ExtractResponseContent(); } @@ -149,7 +149,7 @@ public async Task NonBufferedFailedResponsesAreDisposedOf() await ExecuteRequest(message, httpPipeline); Assert.AreEqual(message.Response.ContentStream.CanSeek, false); - Assert.Throws(() => { var content = message.Response.Content; }); + //Assert.Throws(() => { var content = message.Response.Content; }); extractedStream = message.ExtractResponseContent(); } @@ -568,7 +568,7 @@ public async Task TimeoutsUnbufferedBodyReads() Assert.AreEqual(message.Response.Status, 200); var responseContentStream = message.Response.ContentStream; - Assert.Throws(() => { var content = message.Response.Content; }); + //Assert.Throws(() => { var content = message.Response.Content; }); var buffer = new byte[10]; Assert.AreEqual(1, await responseContentStream.ReadAsync(buffer, 0, 1)); var exception = Assert.ThrowsAsync(async () => await responseContentStream.ReadAsync(buffer, 0, 10)); diff --git a/sdk/core/Azure.Core/tests/ResponseTests.cs b/sdk/core/Azure.Core/tests/ResponseTests.cs index 32d5bbd3c43f6..1fb609aa4f62a 100644 --- a/sdk/core/Azure.Core/tests/ResponseTests.cs +++ b/sdk/core/Azure.Core/tests/ResponseTests.cs @@ -126,14 +126,14 @@ public void ContentPropertyGetsContent() Assert.AreEqual("body content", responseWithBody.Content.ToString()); } - [Test] - public void ContentPropertyThrowsForNonMemoryStream() - { - var response = new MockResponse(200); - response.ContentStream = new ThrowingStream(); - - Assert.Throws(() => { BinaryData d = response.Content; }); - } + //[Test] + //public void ContentPropertyThrowsForNonMemoryStream() + //{ + // var response = new MockResponse(200); + // response.ContentStream = new ThrowingStream(); + + // Assert.Throws(() => { BinaryData d = response.Content; }); + //} [Test] public void ContentPropertyWorksForMemoryStreamsWithPrivateBuffers() From f68318181dd7d193a8fd28c258f0285a4b6d599b Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 10:45:56 -0800 Subject: [PATCH 4/9] fix --- .../Azure.Core}/tests/HttpPipelineMessageTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename sdk/{identity/Azure.Identity => core/Azure.Core}/tests/HttpPipelineMessageTest.cs (97%) diff --git a/sdk/identity/Azure.Identity/tests/HttpPipelineMessageTest.cs b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs similarity index 97% rename from sdk/identity/Azure.Identity/tests/HttpPipelineMessageTest.cs rename to sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs index 04522d533351e..39d16853fe8b6 100644 --- a/sdk/identity/Azure.Identity/tests/HttpPipelineMessageTest.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs @@ -102,7 +102,7 @@ public void ContentPropertyThrowsResponseIsExtracted() Stream stream = message.ExtractResponseContent(); Assert.AreSame(memoryStream, stream); - Assert.Throws(() => { var x = response.Content; }); + //Assert.Throws(() => { var x = response.Content; }); } } } From e98c5f29c24a17a887eed0d68bed365ae13978cf Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 12:50:23 -0800 Subject: [PATCH 5/9] fix --- sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs index 39d16853fe8b6..bacb056ca044c 100644 --- a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs @@ -101,7 +101,7 @@ public void ContentPropertyThrowsResponseIsExtracted() Stream stream = message.ExtractResponseContent(); - Assert.AreSame(memoryStream, stream); + //Assert.AreSame(memoryStream, stream); //Assert.Throws(() => { var x = response.Content; }); } } From ae8bf382f563593e85a7d79b3ef5dabce4abbb1f Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 13:32:03 -0800 Subject: [PATCH 6/9] change --- .../tests/HttpPipelineMessageTest.cs | 30 +++++++++++++++++++ .../src/Message/PipelineResponse.cs | 14 ++------- .../HttpClientPipelineTransport.Response.cs | 4 +-- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs index bacb056ca044c..783b1f88d8467 100644 --- a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs @@ -3,6 +3,8 @@ using System; using System.IO; +using System.Threading.Tasks; +using System.Threading; using Azure.Core.Pipeline; using Azure.Core.TestFramework; using Moq; @@ -104,5 +106,33 @@ public void ContentPropertyThrowsResponseIsExtracted() //Assert.AreSame(memoryStream, stream); //Assert.Throws(() => { var x = response.Content; }); } + + [Test] + [Ignore("Working on resolution; TODO: solve it so this works")] + public async Task UnbufferedStreamAccessibleAfterMessageDisposed() + { + var streamBytes = new byte[0x1000]; + new Random().NextBytes(streamBytes); + + HttpPipeline pipeline = HttpPipelineBuilder.Build(new TestClientOptions() { Retry = { NetworkTimeout = Timeout.InfiniteTimeSpan } }); + + using TestServer testServer = new(async context => + { + await context.Response.Body.WriteAsync(streamBytes, 0, streamBytes.Length).ConfigureAwait(false); + }); + + Response response; + using (HttpMessage message = pipeline.CreateMessage()) + { + message.Request.Uri.Reset(testServer.Address); + message.BufferResponse = false; + + await pipeline.SendAsync(message, default).ConfigureAwait(false); + response = message.Response; + } + + Assert.NotNull(response.ContentStream); + Assert.AreEqual(streamBytes.Length, response.ContentStream.Length); + } } } diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index 9ccb2255f7d99..7c6185cdd99f4 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -37,6 +37,8 @@ public abstract class PipelineResponse : IDisposable public abstract Stream? ContentStream { get; set; } private byte[]? _contentBytes; + internal bool IsBuffered => _contentBytes != null; + public virtual BinaryData Content { get @@ -79,18 +81,6 @@ public virtual BinaryData Content // Same value as Stream.CopyTo uses by default private const int DefaultCopyBufferSize = 81920; - internal bool TryGetBufferedContent(out MemoryStream bufferedContent) - { - if (ContentStream is MemoryStream content) - { - bufferedContent = content; - return true; - } - - bufferedContent = default!; - return false; - } - internal void BufferContent(TimeSpan? timeout = default, CancellationTokenSource? cts = default) => BufferContentSyncOrAsync(timeout, cts, async: false).EnsureCompleted(); diff --git a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs index dc7c8e2f5228e..107ae79dd5ccc 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs @@ -88,7 +88,7 @@ protected virtual void Dispose(bool disposing) // intentionally left the network stream undisposed. var contentStream = _contentStream; - if (contentStream is not null && !TryGetBufferedContent(out _)) + if (contentStream is not null && !IsBuffered) { contentStream?.Dispose(); _contentStream = null; @@ -99,4 +99,4 @@ protected virtual void Dispose(bool disposing) } #endregion } -} \ No newline at end of file +} From 6a14044f5b466bd82613894884b021a47ca4ce9e Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 14:57:14 -0800 Subject: [PATCH 7/9] idea --- sdk/core/Azure.Core/src/HttpMessage.cs | 80 ---------------- .../tests/HttpPipelineMessageTest.cs | 1 + .../src/Message/PipelineMessage.cs | 93 ++++++++++++++++++- .../src/Message/PipelineResponse.cs | 7 +- .../HttpClientPipelineTransport.Response.cs | 2 +- 5 files changed, 98 insertions(+), 85 deletions(-) diff --git a/sdk/core/Azure.Core/src/HttpMessage.cs b/sdk/core/Azure.Core/src/HttpMessage.cs index 467b5dafa317a..3bc649548ab95 100644 --- a/sdk/core/Azure.Core/src/HttpMessage.cs +++ b/sdk/core/Azure.Core/src/HttpMessage.cs @@ -4,7 +4,6 @@ using System; using System.ClientModel.Primitives; using System.Collections.Generic; -using System.IO; using System.Threading; using Azure.Core.Pipeline; @@ -145,84 +144,5 @@ public void SetProperty(string name, object value) /// private class MessagePropertyKey { } #endregion - - /// - /// Returns the response content stream and releases it ownership to the caller. - /// - /// After calling this method, any attempt to use the - /// or - /// properties on will result in an exception being thrown. - /// - /// The content stream, or null if - /// did not have content set. - public Stream? ExtractResponseContent() - { - if (!HasResponse) - { - return null; - } - - switch (Response.ContentStream) - { - case ResponseShouldNotBeUsedStream responseContent: - return responseContent.Original; - case Stream stream: - Response.ContentStream = new ResponseShouldNotBeUsedStream(Response.ContentStream); - return stream; - default: - return null; - } - } - - private class ResponseShouldNotBeUsedStream : Stream - { - public Stream Original { get; } - - public ResponseShouldNotBeUsedStream(Stream original) - { - Original = original; - } - - private static Exception CreateException() - { - return new InvalidOperationException("The operation has called ExtractResponseContent and will provide the stream as part of its response type."); - } - - public override void Flush() - { - throw CreateException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - throw CreateException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw CreateException(); - } - - public override void SetLength(long value) - { - throw CreateException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw CreateException(); - } - - public override bool CanRead => throw CreateException(); - public override bool CanSeek => throw CreateException(); - public override bool CanWrite => throw CreateException(); - public override long Length => throw CreateException(); - - public override long Position - { - get => throw CreateException(); - set => throw CreateException(); - } - } } } diff --git a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs index 783b1f88d8467..5c59322676fd7 100644 --- a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs @@ -129,6 +129,7 @@ public async Task UnbufferedStreamAccessibleAfterMessageDisposed() await pipeline.SendAsync(message, default).ConfigureAwait(false); response = message.Response; + response.ContentStream = message.ExtractResponseContent(); } Assert.NotNull(response.ContentStream); diff --git a/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs b/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs index bf7953b00b5a5..5862254bf3f25 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.ClientModel.Internal; +using System.IO; using System.Threading; namespace System.ClientModel.Primitives; @@ -126,6 +127,87 @@ internal void AssertResponse() } } + /// + /// Returns the response content stream and releases it ownership to the caller. + /// + /// After calling this method, any attempt to use the + /// or + /// properties on will result in an exception being thrown. + /// + /// The content stream, or null if + /// did not have content set. + public Stream? ExtractResponseContent() + { + if (Response is null) + { + return null; + } + + SetProperty(typeof(PipelineResponse), true); + + switch (Response.ContentStream) + { + case ResponseShouldNotBeUsedStream responseContent: + return responseContent.Original; + case Stream stream: + Response.ContentStream = new ResponseShouldNotBeUsedStream(Response.ContentStream); + return stream; + default: + return null; + } + } + + internal class ResponseShouldNotBeUsedStream : Stream + { + public Stream Original { get; } + + public ResponseShouldNotBeUsedStream(Stream original) + { + Original = original; + } + + private static Exception CreateException() + { + return new InvalidOperationException("The operation has called ExtractResponseContent and will provide the stream as part of its response type."); + } + + public override void Flush() + { + throw CreateException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw CreateException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw CreateException(); + } + + public override void SetLength(long value) + { + throw CreateException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw CreateException(); + } + + public override bool CanRead => throw CreateException(); + public override bool CanSeek => throw CreateException(); + public override bool CanWrite => throw CreateException(); + public override long Length => throw CreateException(); + + public override long Position + { + get => throw CreateException(); + set => throw CreateException(); + } + } + #region IDisposable protected virtual void Dispose(bool disposing) @@ -135,15 +217,22 @@ protected virtual void Dispose(bool disposing) var request = Request; request?.Dispose(); - _propertyBag.Dispose(); - var response = Response; if (response != null) { + if (response.ContentStream is not null && !response.IsBuffered && + TryGetProperty(typeof(PipelineResponse), out object? extracted) && + extracted is bool contentExtracted) + { + response.ContentExtracted = contentExtracted; + } + response.Dispose(); Response = null; } + _propertyBag.Dispose(); + _disposed = true; } } diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index 7c6185cdd99f4..78d3d0af20467 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -39,6 +39,8 @@ public abstract class PipelineResponse : IDisposable private byte[]? _contentBytes; internal bool IsBuffered => _contentBytes != null; + internal bool ContentExtracted { get; set; } + public virtual BinaryData Content { get @@ -112,7 +114,7 @@ private async Task BufferContentSyncOrAsync(TimeSpan? timeout, CancellationToken ContentStream = bufferStream; // TODO: Come back and optimize - this is only for POC at this stage. - _contentBytes= bufferStream.ToArray(); + _contentBytes = bufferStream.ToArray(); } private static async Task CopyToAsync(Stream source, Stream destination, TimeSpan timeout, CancellationTokenSource cancellationTokenSource) @@ -126,7 +128,8 @@ private static async Task CopyToAsync(Stream source, Stream destination, TimeSpa #pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationTokenSource.Token).ConfigureAwait(false); #pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets - if (bytesRead == 0) break; + if (bytesRead == 0) + break; await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationTokenSource.Token).ConfigureAwait(false); } } diff --git a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs index 107ae79dd5ccc..0d3b46fafd4f9 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs @@ -88,7 +88,7 @@ protected virtual void Dispose(bool disposing) // intentionally left the network stream undisposed. var contentStream = _contentStream; - if (contentStream is not null && !IsBuffered) + if (contentStream is not null && !IsBuffered && !ContentExtracted) { contentStream?.Dispose(); _contentStream = null; From f443563bfc403da1cf201d70df135bf811127b94 Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 17:17:14 -0800 Subject: [PATCH 8/9] roll back to reevaluate a different decision branch --- sdk/core/Azure.Core/src/HttpMessage.cs | 80 ++++++++++++++++ .../tests/HttpPipelineMessageTest.cs | 31 ------- .../src/Message/PipelineMessage.cs | 93 +------------------ .../src/Message/PipelineResponse.cs | 21 +++-- .../HttpClientPipelineTransport.Response.cs | 4 +- 5 files changed, 98 insertions(+), 131 deletions(-) diff --git a/sdk/core/Azure.Core/src/HttpMessage.cs b/sdk/core/Azure.Core/src/HttpMessage.cs index 3bc649548ab95..467b5dafa317a 100644 --- a/sdk/core/Azure.Core/src/HttpMessage.cs +++ b/sdk/core/Azure.Core/src/HttpMessage.cs @@ -4,6 +4,7 @@ using System; using System.ClientModel.Primitives; using System.Collections.Generic; +using System.IO; using System.Threading; using Azure.Core.Pipeline; @@ -144,5 +145,84 @@ public void SetProperty(string name, object value) /// private class MessagePropertyKey { } #endregion + + /// + /// Returns the response content stream and releases it ownership to the caller. + /// + /// After calling this method, any attempt to use the + /// or + /// properties on will result in an exception being thrown. + /// + /// The content stream, or null if + /// did not have content set. + public Stream? ExtractResponseContent() + { + if (!HasResponse) + { + return null; + } + + switch (Response.ContentStream) + { + case ResponseShouldNotBeUsedStream responseContent: + return responseContent.Original; + case Stream stream: + Response.ContentStream = new ResponseShouldNotBeUsedStream(Response.ContentStream); + return stream; + default: + return null; + } + } + + private class ResponseShouldNotBeUsedStream : Stream + { + public Stream Original { get; } + + public ResponseShouldNotBeUsedStream(Stream original) + { + Original = original; + } + + private static Exception CreateException() + { + return new InvalidOperationException("The operation has called ExtractResponseContent and will provide the stream as part of its response type."); + } + + public override void Flush() + { + throw CreateException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw CreateException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw CreateException(); + } + + public override void SetLength(long value) + { + throw CreateException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw CreateException(); + } + + public override bool CanRead => throw CreateException(); + public override bool CanSeek => throw CreateException(); + public override bool CanWrite => throw CreateException(); + public override long Length => throw CreateException(); + + public override long Position + { + get => throw CreateException(); + set => throw CreateException(); + } + } } } diff --git a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs index 5c59322676fd7..bacb056ca044c 100644 --- a/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs +++ b/sdk/core/Azure.Core/tests/HttpPipelineMessageTest.cs @@ -3,8 +3,6 @@ using System; using System.IO; -using System.Threading.Tasks; -using System.Threading; using Azure.Core.Pipeline; using Azure.Core.TestFramework; using Moq; @@ -106,34 +104,5 @@ public void ContentPropertyThrowsResponseIsExtracted() //Assert.AreSame(memoryStream, stream); //Assert.Throws(() => { var x = response.Content; }); } - - [Test] - [Ignore("Working on resolution; TODO: solve it so this works")] - public async Task UnbufferedStreamAccessibleAfterMessageDisposed() - { - var streamBytes = new byte[0x1000]; - new Random().NextBytes(streamBytes); - - HttpPipeline pipeline = HttpPipelineBuilder.Build(new TestClientOptions() { Retry = { NetworkTimeout = Timeout.InfiniteTimeSpan } }); - - using TestServer testServer = new(async context => - { - await context.Response.Body.WriteAsync(streamBytes, 0, streamBytes.Length).ConfigureAwait(false); - }); - - Response response; - using (HttpMessage message = pipeline.CreateMessage()) - { - message.Request.Uri.Reset(testServer.Address); - message.BufferResponse = false; - - await pipeline.SendAsync(message, default).ConfigureAwait(false); - response = message.Response; - response.ContentStream = message.ExtractResponseContent(); - } - - Assert.NotNull(response.ContentStream); - Assert.AreEqual(streamBytes.Length, response.ContentStream.Length); - } } } diff --git a/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs b/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs index 5862254bf3f25..bf7953b00b5a5 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineMessage.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using System.ClientModel.Internal; -using System.IO; using System.Threading; namespace System.ClientModel.Primitives; @@ -127,87 +126,6 @@ internal void AssertResponse() } } - /// - /// Returns the response content stream and releases it ownership to the caller. - /// - /// After calling this method, any attempt to use the - /// or - /// properties on will result in an exception being thrown. - /// - /// The content stream, or null if - /// did not have content set. - public Stream? ExtractResponseContent() - { - if (Response is null) - { - return null; - } - - SetProperty(typeof(PipelineResponse), true); - - switch (Response.ContentStream) - { - case ResponseShouldNotBeUsedStream responseContent: - return responseContent.Original; - case Stream stream: - Response.ContentStream = new ResponseShouldNotBeUsedStream(Response.ContentStream); - return stream; - default: - return null; - } - } - - internal class ResponseShouldNotBeUsedStream : Stream - { - public Stream Original { get; } - - public ResponseShouldNotBeUsedStream(Stream original) - { - Original = original; - } - - private static Exception CreateException() - { - return new InvalidOperationException("The operation has called ExtractResponseContent and will provide the stream as part of its response type."); - } - - public override void Flush() - { - throw CreateException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - throw CreateException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw CreateException(); - } - - public override void SetLength(long value) - { - throw CreateException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw CreateException(); - } - - public override bool CanRead => throw CreateException(); - public override bool CanSeek => throw CreateException(); - public override bool CanWrite => throw CreateException(); - public override long Length => throw CreateException(); - - public override long Position - { - get => throw CreateException(); - set => throw CreateException(); - } - } - #region IDisposable protected virtual void Dispose(bool disposing) @@ -217,22 +135,15 @@ protected virtual void Dispose(bool disposing) var request = Request; request?.Dispose(); + _propertyBag.Dispose(); + var response = Response; if (response != null) { - if (response.ContentStream is not null && !response.IsBuffered && - TryGetProperty(typeof(PipelineResponse), out object? extracted) && - extracted is bool contentExtracted) - { - response.ContentExtracted = contentExtracted; - } - response.Dispose(); Response = null; } - _propertyBag.Dispose(); - _disposed = true; } } diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index 78d3d0af20467..9ccb2255f7d99 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -37,10 +37,6 @@ public abstract class PipelineResponse : IDisposable public abstract Stream? ContentStream { get; set; } private byte[]? _contentBytes; - internal bool IsBuffered => _contentBytes != null; - - internal bool ContentExtracted { get; set; } - public virtual BinaryData Content { get @@ -83,6 +79,18 @@ public virtual BinaryData Content // Same value as Stream.CopyTo uses by default private const int DefaultCopyBufferSize = 81920; + internal bool TryGetBufferedContent(out MemoryStream bufferedContent) + { + if (ContentStream is MemoryStream content) + { + bufferedContent = content; + return true; + } + + bufferedContent = default!; + return false; + } + internal void BufferContent(TimeSpan? timeout = default, CancellationTokenSource? cts = default) => BufferContentSyncOrAsync(timeout, cts, async: false).EnsureCompleted(); @@ -114,7 +122,7 @@ private async Task BufferContentSyncOrAsync(TimeSpan? timeout, CancellationToken ContentStream = bufferStream; // TODO: Come back and optimize - this is only for POC at this stage. - _contentBytes = bufferStream.ToArray(); + _contentBytes= bufferStream.ToArray(); } private static async Task CopyToAsync(Stream source, Stream destination, TimeSpan timeout, CancellationTokenSource cancellationTokenSource) @@ -128,8 +136,7 @@ private static async Task CopyToAsync(Stream source, Stream destination, TimeSpa #pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationTokenSource.Token).ConfigureAwait(false); #pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets - if (bytesRead == 0) - break; + if (bytesRead == 0) break; await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationTokenSource.Token).ConfigureAwait(false); } } diff --git a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs index 0d3b46fafd4f9..dc7c8e2f5228e 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs @@ -88,7 +88,7 @@ protected virtual void Dispose(bool disposing) // intentionally left the network stream undisposed. var contentStream = _contentStream; - if (contentStream is not null && !IsBuffered && !ContentExtracted) + if (contentStream is not null && !TryGetBufferedContent(out _)) { contentStream?.Dispose(); _contentStream = null; @@ -99,4 +99,4 @@ protected virtual void Dispose(bool disposing) } #endregion } -} +} \ No newline at end of file From a8ccee14221e6f171d27092f5673dd6219fe3eaa Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 1 Feb 2024 17:38:48 -0800 Subject: [PATCH 9/9] what breaks if we don't dispose the response ContentStream at all? --- .../HttpClientPipelineTransport.Response.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs index dc7c8e2f5228e..3cf16e6301acb 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs @@ -87,16 +87,16 @@ protected virtual void Dispose(bool disposing) // not disposed, because the entity that replaced the response content // intentionally left the network stream undisposed. - var contentStream = _contentStream; - if (contentStream is not null && !TryGetBufferedContent(out _)) - { - contentStream?.Dispose(); - _contentStream = null; - } + //var contentStream = _contentStream; + //if (contentStream is not null && !TryGetBufferedContent(out _)) + //{ + // contentStream?.Dispose(); + // _contentStream = null; + //} _disposed = true; } } #endregion } -} \ No newline at end of file +}