From 5a4550fd6ebdc4c78fddbbd91d00243d7a74e97a Mon Sep 17 00:00:00 2001 From: NewellClark Date: Mon, 18 Jan 2021 00:17:08 -0500 Subject: [PATCH 1/6] System.IO.Compression.DeflateStream.CopyToStream -Implemented memory-based WriteAsync in DeflateStream.CopyToStream class. This required implementing a memory-based overload of System.IO.Inflater.SetInput(). Previously, Inflater used a GCHandle to pin the array that was passed into SetInput. I converted it to use a MemoryHandle, and changed the array-based overload of SetInput to delegate to the new Memory-based overload. --- .../Compression/DeflateZLib/DeflateStream.cs | 23 +++++++++--- .../IO/Compression/DeflateZLib/Inflater.cs | 35 ++++++++++++++++--- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs index c8cd4071f5bc00..773f158a3640dd 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs @@ -907,14 +907,14 @@ public void CopyFromSourceToDestination() } } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { // Validate inputs Debug.Assert(buffer != _arrayPoolBuffer); _deflateStream.EnsureNotDisposed(); if (count <= 0) { - return; + return Task.CompletedTask; } else if (count > buffer.Length - offset) { @@ -923,9 +923,22 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc throw new InvalidDataException(SR.GenericInvalidData); } - Debug.Assert(_deflateStream._inflater != null); - // Feed the data from base stream into the decompression engine. - _deflateStream._inflater.SetInput(buffer, offset, count); + return WriteAsyncCore(buffer.AsMemory().Slice(offset, count), cancellationToken).AsTask(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + _deflateStream.EnsureNotDisposed(); + + return WriteAsyncCore(buffer, cancellationToken); + } + + private async ValueTask WriteAsyncCore(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + Debug.Assert(_deflateStream._inflater is not null); + + // Feed the data from base stream into decompression engine. + _deflateStream._inflater.SetInput(buffer); // While there's more decompressed data available, forward it to the buffer stream. while (!_deflateStream._inflater.Finished()) diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs index f6d17eb58559d2..8b6f9013a59718 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.InteropServices; @@ -20,7 +21,7 @@ internal sealed class Inflater : IDisposable private bool _isDisposed; // Prevents multiple disposals private readonly int _windowBits; // The WindowBits parameter passed to Inflater construction private ZLibNative.ZLibStreamHandle _zlibStream; // The handle to the primary underlying zlib stream - private GCHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream + private InputBufferHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream private readonly long _uncompressedSize; private long _currentInflatedCount; @@ -182,18 +183,42 @@ public void SetInput(byte[] inputBuffer, int startIndex, int count) Debug.Assert(startIndex >= 0 && count >= 0 && count + startIndex <= inputBuffer.Length); Debug.Assert(!_inputBufferHandle.IsAllocated); - if (0 == count) + SetInput(inputBuffer.AsMemory().Slice(startIndex, count)); + } + + public void SetInput(ReadOnlyMemory inputBuffer) + { + Debug.Assert(NeedsInput(), "We have something left in previous input!"); + Debug.Assert(!_inputBufferHandle.IsAllocated); + + if (inputBuffer.Length == 0) return; lock (SyncLock) { - _inputBufferHandle = GCHandle.Alloc(inputBuffer, GCHandleType.Pinned); - _zlibStream.NextIn = _inputBufferHandle.AddrOfPinnedObject() + startIndex; - _zlibStream.AvailIn = (uint)count; + _inputBufferHandle = new InputBufferHandle(inputBuffer.Pin()); + _zlibStream.NextIn = _inputBufferHandle.Pointer; + _zlibStream.AvailIn = (uint)inputBuffer.Length; _finished = false; } } + private struct InputBufferHandle + { + private MemoryHandle _memoryHandle; + + public InputBufferHandle(MemoryHandle memoryHandle) + { + _memoryHandle = memoryHandle; + } + + public unsafe IntPtr Pointer => (IntPtr)_memoryHandle.Pointer; + + public bool IsAllocated => Pointer != IntPtr.Zero; + + public void Free() => _memoryHandle.Dispose(); + } + private void Dispose(bool disposing) { if (!_isDisposed) From e4c08b89be45cde5e160eea003c2694e6899100c Mon Sep 17 00:00:00 2001 From: NewellClark Date: Tue, 19 Jan 2021 11:16:33 -0500 Subject: [PATCH 2/6] Implement suggested changes --- .../IO/Compression/DeflateZLib/Inflater.cs | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs index 8b6f9013a59718..12d9d2861ec0e4 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs @@ -21,7 +21,7 @@ internal sealed class Inflater : IDisposable private bool _isDisposed; // Prevents multiple disposals private readonly int _windowBits; // The WindowBits parameter passed to Inflater construction private ZLibNative.ZLibStreamHandle _zlibStream; // The handle to the primary underlying zlib stream - private InputBufferHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream + private MemoryHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream private readonly long _uncompressedSize; private long _currentInflatedCount; @@ -111,7 +111,7 @@ public unsafe int InflateVerified(byte* bufPtr, int length) finally { // Before returning, make sure to release input buffer if necessary: - if (0 == _zlibStream.AvailIn && _inputBufferHandle.IsAllocated) + if (0 == _zlibStream.AvailIn && IsInputBufferHandleAllocated) { DeallocateInputBufferHandle(); } @@ -122,7 +122,7 @@ private unsafe void ReadOutput(byte* bufPtr, int length, out int bytesRead) { if (ReadInflateOutput(bufPtr, length, ZLibNative.FlushCode.NoFlush, out bytesRead) == ZLibNative.ErrorCode.StreamEnd) { - if (!NeedsInput() && IsGzipStream() && _inputBufferHandle.IsAllocated) + if (!NeedsInput() && IsGzipStream() && IsInputBufferHandleAllocated) { _finished = ResetStreamForLeftoverInput(); } @@ -143,7 +143,7 @@ private unsafe bool ResetStreamForLeftoverInput() { Debug.Assert(!NeedsInput()); Debug.Assert(IsGzipStream()); - Debug.Assert(_inputBufferHandle.IsAllocated); + Debug.Assert(IsInputBufferHandleAllocated); lock (SyncLock) { @@ -181,44 +181,28 @@ public void SetInput(byte[] inputBuffer, int startIndex, int count) Debug.Assert(NeedsInput(), "We have something left in previous input!"); Debug.Assert(inputBuffer != null); Debug.Assert(startIndex >= 0 && count >= 0 && count + startIndex <= inputBuffer.Length); - Debug.Assert(!_inputBufferHandle.IsAllocated); + Debug.Assert(!IsInputBufferHandleAllocated); SetInput(inputBuffer.AsMemory().Slice(startIndex, count)); } - public void SetInput(ReadOnlyMemory inputBuffer) + public unsafe void SetInput(ReadOnlyMemory inputBuffer) { Debug.Assert(NeedsInput(), "We have something left in previous input!"); - Debug.Assert(!_inputBufferHandle.IsAllocated); + Debug.Assert(!IsInputBufferHandleAllocated); - if (inputBuffer.Length == 0) + if (inputBuffer.IsEmpty) return; lock (SyncLock) { - _inputBufferHandle = new InputBufferHandle(inputBuffer.Pin()); - _zlibStream.NextIn = _inputBufferHandle.Pointer; + _inputBufferHandle = inputBuffer.Pin(); + _zlibStream.NextIn = (IntPtr)_inputBufferHandle.Pointer; _zlibStream.AvailIn = (uint)inputBuffer.Length; _finished = false; } } - private struct InputBufferHandle - { - private MemoryHandle _memoryHandle; - - public InputBufferHandle(MemoryHandle memoryHandle) - { - _memoryHandle = memoryHandle; - } - - public unsafe IntPtr Pointer => (IntPtr)_memoryHandle.Pointer; - - public bool IsAllocated => Pointer != IntPtr.Zero; - - public void Free() => _memoryHandle.Dispose(); - } - private void Dispose(bool disposing) { if (!_isDisposed) @@ -226,7 +210,7 @@ private void Dispose(bool disposing) if (disposing) _zlibStream.Dispose(); - if (_inputBufferHandle.IsAllocated) + if (IsInputBufferHandleAllocated) DeallocateInputBufferHandle(); _isDisposed = true; @@ -338,14 +322,16 @@ private ZLibNative.ErrorCode Inflate(ZLibNative.FlushCode flushCode) /// private void DeallocateInputBufferHandle() { - Debug.Assert(_inputBufferHandle.IsAllocated); + Debug.Assert(IsInputBufferHandleAllocated); lock (SyncLock) { _zlibStream.AvailIn = 0; _zlibStream.NextIn = ZLibNative.ZNullPtr; - _inputBufferHandle.Free(); + _inputBufferHandle.Dispose(); } } + + private unsafe bool IsInputBufferHandleAllocated => _inputBufferHandle.Pointer != (void*)0; } } From eb07c36017221ee5d75e96f1c52e5e429280506d Mon Sep 17 00:00:00 2001 From: NewellClark Date: Tue, 19 Jan 2021 11:29:27 -0500 Subject: [PATCH 3/6] Memorify RequestStream - Memory overrides for System.Net.RequestStream - Memory overrides for System.Net.NetworkStreamWrapper --- .../src/System/Net/NetworkStreamWrapper.cs | 10 ++++++++++ .../src/System/Net/RequestStream.cs | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/src/libraries/System.Net.Requests/src/System/Net/NetworkStreamWrapper.cs b/src/libraries/System.Net.Requests/src/System/Net/NetworkStreamWrapper.cs index f4799c227ed4fb..9f2cc673c5ffd8 100644 --- a/src/libraries/System.Net.Requests/src/System/Net/NetworkStreamWrapper.cs +++ b/src/libraries/System.Net.Requests/src/System/Net/NetworkStreamWrapper.cs @@ -189,6 +189,11 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel return _networkStream.ReadAsync(buffer, offset, count, cancellationToken); } + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return _networkStream.ReadAsync(buffer, cancellationToken); + } + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int size, AsyncCallback? callback, object? state) { return _networkStream.BeginWrite(buffer, offset, size, callback, state); @@ -204,6 +209,11 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati return _networkStream.WriteAsync(buffer, offset, count, cancellationToken); } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return _networkStream.WriteAsync(buffer, cancellationToken); + } + public override void Flush() { _networkStream.Flush(); diff --git a/src/libraries/System.Net.Requests/src/System/Net/RequestStream.cs b/src/libraries/System.Net.Requests/src/System/Net/RequestStream.cs index d5f5a26fe85e1d..5323c2ac836f04 100644 --- a/src/libraries/System.Net.Requests/src/System/Net/RequestStream.cs +++ b/src/libraries/System.Net.Requests/src/System/Net/RequestStream.cs @@ -105,6 +105,11 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati return _buffer.WriteAsync(buffer, offset, count, cancellationToken); } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return _buffer.WriteAsync(buffer, cancellationToken); + } + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? asyncCallback, object? asyncState) { ValidateBufferArguments(buffer, offset, count); From 27b9f3b3abe46e3a93f48eea22137620bf9f5722 Mon Sep 17 00:00:00 2001 From: NewellClark Date: Tue, 19 Jan 2021 14:12:40 -0500 Subject: [PATCH 4/6] Spanified ChunkedMemoryStream WriteAsync is implemented in terms of Write, so I went ahead and implemented Write(ReadOnlySpan). For some reason, AsSpan() isn't available in this file. --- .../src/System/IO/ChunkedMemoryStream.cs | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs b/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs index 98f0bacc18323c..57535a0dce8ce2 100644 --- a/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs +++ b/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs @@ -35,6 +35,13 @@ public byte[] ToArray() public override void Write(byte[] buffer, int offset, int count) { + Write(new ReadOnlySpan(buffer, offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + int offset = 0; + int count = buffer.Length; while (count > 0) { if (_currentChunk != null) @@ -43,7 +50,9 @@ public override void Write(byte[] buffer, int offset, int count) if (remaining > 0) { int toCopy = Math.Min(remaining, count); - Buffer.BlockCopy(buffer, offset, _currentChunk._buffer, _currentChunk._freeOffset, toCopy); + ReadOnlySpan source = buffer.Slice(offset, toCopy); + Span destination = new Span(_currentChunk._buffer, _currentChunk._freeOffset, toCopy); + source.CopyTo(destination); count -= toCopy; offset += toCopy; _totalLength += toCopy; @@ -67,6 +76,17 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati return Task.CompletedTask; } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + Write(buffer.Span); + return ValueTask.CompletedTask; + } + private void AppendChunk(long count) { int nextChunkLength = _currentChunk != null ? _currentChunk._buffer.Length * 2 : InitialChunkDefaultSize; From 5115f684a6cf070ef3fb600ba57e741e69f92518 Mon Sep 17 00:00:00 2001 From: NewellClark Date: Thu, 21 Jan 2021 19:04:41 -0500 Subject: [PATCH 5/6] Apply suggested changes --- .../Common/src/System/IO/ChunkedMemoryStream.cs | 16 ++++++---------- .../IO/Compression/DeflateZLib/DeflateStream.cs | 2 +- .../IO/Compression/DeflateZLib/Inflater.cs | 4 ++-- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs b/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs index 57535a0dce8ce2..3e070c065b563b 100644 --- a/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs +++ b/src/libraries/Common/src/System/IO/ChunkedMemoryStream.cs @@ -5,6 +5,7 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using System; namespace System.IO { @@ -40,28 +41,23 @@ public override void Write(byte[] buffer, int offset, int count) public override void Write(ReadOnlySpan buffer) { - int offset = 0; - int count = buffer.Length; - while (count > 0) + while (!buffer.IsEmpty) { if (_currentChunk != null) { int remaining = _currentChunk._buffer.Length - _currentChunk._freeOffset; if (remaining > 0) { - int toCopy = Math.Min(remaining, count); - ReadOnlySpan source = buffer.Slice(offset, toCopy); - Span destination = new Span(_currentChunk._buffer, _currentChunk._freeOffset, toCopy); - source.CopyTo(destination); - count -= toCopy; - offset += toCopy; + int toCopy = Math.Min(remaining, buffer.Length); + buffer.Slice(0, toCopy).CopyTo(new Span(_currentChunk._buffer, _currentChunk._freeOffset, toCopy)); + buffer = buffer.Slice(toCopy); _totalLength += toCopy; _currentChunk._freeOffset += toCopy; continue; } } - AppendChunk(count); + AppendChunk(buffer.Length); } } diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs index 773f158a3640dd..485cea5105ff19 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs @@ -920,7 +920,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati { // The buffer stream is either malicious or poorly implemented and returned a number of // bytes larger than the buffer supplied to it. - throw new InvalidDataException(SR.GenericInvalidData); + return Task.FromException(new InvalidDataException(SR.GenericInvalidData)); } return WriteAsyncCore(buffer.AsMemory().Slice(offset, count), cancellationToken).AsTask(); diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs index 12d9d2861ec0e4..00d241e4ddae20 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs @@ -21,7 +21,7 @@ internal sealed class Inflater : IDisposable private bool _isDisposed; // Prevents multiple disposals private readonly int _windowBits; // The WindowBits parameter passed to Inflater construction private ZLibNative.ZLibStreamHandle _zlibStream; // The handle to the primary underlying zlib stream - private MemoryHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream + private MemoryHandle _inputBufferHandle; // The handle to the buffer that provides input to _zlibStream private readonly long _uncompressedSize; private long _currentInflatedCount; @@ -183,7 +183,7 @@ public void SetInput(byte[] inputBuffer, int startIndex, int count) Debug.Assert(startIndex >= 0 && count >= 0 && count + startIndex <= inputBuffer.Length); Debug.Assert(!IsInputBufferHandleAllocated); - SetInput(inputBuffer.AsMemory().Slice(startIndex, count)); + SetInput(inputBuffer.AsMemory(startIndex, count)); } public unsafe void SetInput(ReadOnlyMemory inputBuffer) From cc0ec94d59995c4fb81583246e4fca9443d51e50 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Fri, 5 Mar 2021 14:31:20 +0100 Subject: [PATCH 6/6] Apply suggestions from code review --- .../src/System/IO/Compression/DeflateZLib/DeflateStream.cs | 2 +- .../src/System/IO/Compression/DeflateZLib/Inflater.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs index 485cea5105ff19..d9739b087b7041 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs @@ -923,7 +923,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati return Task.FromException(new InvalidDataException(SR.GenericInvalidData)); } - return WriteAsyncCore(buffer.AsMemory().Slice(offset, count), cancellationToken).AsTask(); + return WriteAsyncCore(buffer.AsMemory(offset, count), cancellationToken).AsTask(); } public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) diff --git a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs index 00d241e4ddae20..4544353ac5406b 100644 --- a/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs +++ b/src/libraries/System.IO.Compression/src/System/IO/Compression/DeflateZLib/Inflater.cs @@ -332,6 +332,6 @@ private void DeallocateInputBufferHandle() } } - private unsafe bool IsInputBufferHandleAllocated => _inputBufferHandle.Pointer != (void*)0; + private unsafe bool IsInputBufferHandleAllocated => _inputBufferHandle.Pointer != default; } }