Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ internal sealed class StreamPipeReader : PipeReader

private CancellationTokenSource? _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;

private BufferSegment? _readHead;
private int _readIndex;
Expand Down Expand Up @@ -231,12 +230,6 @@ private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationTo
}
}

if (_isStreamCompleted)
{
ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
return new ValueTask<ReadResult>(completedResult);
}

return Core(this, minimumSize, tokenSource, cancellationToken);

#if NET
Expand All @@ -253,6 +246,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz
using (reg)
{
var isCanceled = false;
bool isCompleted = false;
try
{
// This optimization only makes sense if we don't have anything buffered
Expand All @@ -277,7 +271,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz

if (length == 0)
{
reader._isStreamCompleted = true;
isCompleted = true;
break;
}
} while (minimumSize != null && reader._bufferedBytes < minimumSize);
Expand All @@ -302,7 +296,7 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSiz
}
}

return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, isCompleted);
}
}
}
Expand Down Expand Up @@ -362,11 +356,6 @@ public override async Task CopyToAsync(PipeWriter destination, CancellationToken
}
}

if (_isStreamCompleted)
{
return;
}

await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -423,11 +412,6 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can
}
}

if (_isStreamCompleted)
{
return;
}

await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -465,7 +449,7 @@ public override bool TryRead(out ReadResult result)
private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
{
bool isCancellationRequested = source.IsCancellationRequested;
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
if (isCancellationRequested || (_bufferedBytes > 0 && !_examinedEverything))
{
if (isCancellationRequested)
{
Expand All @@ -474,7 +458,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu

ReadOnlySequence<byte> buffer = GetCurrentReadOnlySequence();

result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted);
result = new ReadResult(buffer, isCancellationRequested, false);
return true;
}

Expand Down
82 changes: 34 additions & 48 deletions src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, i
[Fact]
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
{
var stream = new ThrowAfterZeroByteReadStream();
byte[] helloBytes = "Hello World"u8.ToArray();
var stream = new MemoryStream(helloBytes);
PipeReader reader = PipeReader.Create(stream);
ReadResult readResult = await reader.ReadAsync();
Assert.True(readResult.Buffer.IsEmpty);
Assert.Equal(helloBytes.Length, readResult.Buffer.Length);
reader.AdvanceTo(readResult.Buffer.End);

readResult = await reader.ReadAsync();
Assert.True(readResult.IsCompleted);
reader.AdvanceTo(readResult.Buffer.End);

Expand All @@ -166,12 +170,38 @@ public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
}

[Fact]
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
public async Task ReadAsyncAfterReceivingCompletedReadResultAndResettingStreamPositionWorks()
{
byte[] helloBytes = "Hello World"u8.ToArray();
var stream = new ThrowAfterZeroByteReadStream(helloBytes);
var stream = new MemoryStream(helloBytes);
PipeReader reader = PipeReader.Create(stream);
ReadResult readResult = await reader.ReadAsync();
Assert.Equal(helloBytes, readResult.Buffer.ToArray());
Assert.False(readResult.IsCompleted);
reader.AdvanceTo(readResult.Buffer.End);

readResult = await reader.ReadAsync();
Assert.True(readResult.IsCompleted);

// Reset the stream position to the beginning
stream.Position = 0;

readResult = await reader.ReadAsync();
Assert.Equal(helloBytes, readResult.Buffer.ToArray());
Assert.False(readResult.IsCompleted);
reader.AdvanceTo(readResult.Buffer.End);

readResult = await reader.ReadAsync();
Assert.True(readResult.IsCompleted);
reader.Complete();
}

[Fact]
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
{
byte[] helloBytes = "Hello World"u8.ToArray();
var stream = new MemoryStream(helloBytes);
PipeReader reader = PipeReader.Create(stream);

ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
Expand Down Expand Up @@ -669,50 +699,6 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
#endif
}

private class ThrowAfterZeroByteReadStream : MemoryStream
{
public ThrowAfterZeroByteReadStream()
{

}

public ThrowAfterZeroByteReadStream(byte[] buffer) : base(buffer)
{

}

private bool _throwOnNextCallToRead;
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (_throwOnNextCallToRead)
{
throw new Exception();
}
var bytes = await base.ReadAsync(buffer, offset, count, cancellationToken);
if (bytes == 0)
{
_throwOnNextCallToRead = true;
}
return bytes;
}

#if NET
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
if (_throwOnNextCallToRead)
{
throw new Exception();
}
var bytes = await base.ReadAsync(destination, cancellationToken);
if (bytes == 0)
{
_throwOnNextCallToRead = true;
}
return bytes;
}
#endif
}

private class DisposalTrackingStream : MemoryStream
{
public bool DisposeCalled { get; private set; }
Expand Down
Loading