Skip to content

Commit

Permalink
Release DotNext Core library 5.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Mar 20, 2024
1 parent faa56e3 commit 40e1dd9
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 55 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Release Notes
====

# 03-20-2024
<a href="https://www.nuget.org/packages/dotnext/5.3.1">DotNext 5.3.1</a>
* Provided support of thread-local storage for `StreamSource.AsSharedStream`
* Remove type cast for `Func.Constant` static method

# 03-19-2024
<a href="https://www.nuget.org/packages/dotnext/5.3.0">DotNext 5.3.0</a>
* Added `StreamSource.AsSharedStream` extension method that allows to obtain read-only stream over memory block which position is local for each consuming async flow or thread. In other words, the stream can be shared between async flows for independent reads.
Expand Down
25 changes: 4 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,11 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)

# What's new
Release Date: 03-19-2024
Release Date: 03-20-2024

<a href="https://www.nuget.org/packages/dotnext/5.3.0">DotNext 5.3.0</a>
* Added `StreamSource.AsSharedStream` extension method that allows to obtain read-only stream over memory block which position is local for each consuming async flow or thread. In other words, the stream can be shared between async flows for independent reads.

<a href="https://www.nuget.org/packages/dotnext.metaprogramming/5.3.0">DotNext.Metaprogramming 5.3.0</a>
* Updated dependencies

<a href="https://www.nuget.org/packages/dotnext.unsafe/5.3.0">DotNext.Unsafe 5.3.0</a>
* Updated dependencies

<a href="https://www.nuget.org/packages/dotnext.threading/5.3.0">DotNext.Threading 5.3.0</a>
* Improved performance of `IndexPool.Take` method

<a href="https://www.nuget.org/packages/dotnext.io/5.3.0">DotNext.IO 5.3.0</a>
* Updated dependencies

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.3.0">DotNext.Net.Cluster 5.3.0</a>
* Smallish performance improvements of WAL

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.3.0">DotNext.AspNetCore.Cluster 5.3.0</a>
* Smallish performance improvements of WAL
<a href="https://www.nuget.org/packages/dotnext/5.3.1">DotNext 5.3.1</a>
* Provided support of thread-local storage for `StreamSource.AsSharedStream`
* Remove type cast for `Func.Constant` static method

Changelog for previous versions located [here](./CHANGELOG.md).

Expand Down
9 changes: 3 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ stages:
nobuild: false
testRunTitle: 'Debug on Windows'
publishTestResults: true
- task: PublishCodeCoverageResults@1
- task: PublishCodeCoverageResults@2
displayName: 'Publish code coverage'
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: $(Agent.TempDirectory)/*/coverage.cobertura.xml
- job: Linux
pool:
Expand All @@ -57,10 +56,9 @@ stages:
nobuild: false
testRunTitle: 'Debug on Linux'
publishTestResults: true
- task: PublishCodeCoverageResults@1
- task: PublishCodeCoverageResults@2
displayName: 'Publish code coverage'
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: $(Agent.TempDirectory)/*/coverage.cobertura.xml
- job: MacOS
pool:
Expand All @@ -83,10 +81,9 @@ stages:
nobuild: false
testRunTitle: 'Debug on MacOS'
publishTestResults: true
- task: PublishCodeCoverageResults@1
- task: PublishCodeCoverageResults@2
displayName: 'Publish code coverage'
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: $(Agent.TempDirectory)/*/coverage.cobertura.xml
- stage: BuildPackages
condition: and(succeeded('Tests'), eq(variables.isMain, true))
Expand Down
12 changes: 7 additions & 5 deletions src/DotNext.Tests/IO/StreamSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ static ValueTask WriteToBuffer(ReadOnlyMemory<byte> block, ArrayBufferWriter<byt
[Fact]
public static async Task SharedStreamConcurrentReadAsync()
{
byte[] expected = [10, 20, 30, 40, 50, 60];
byte[] expected = RandomBytes(512);

await using var stream = StreamSource.AsSharedStream(new(expected));

Expand All @@ -632,12 +632,14 @@ static async Task<byte[]> ReadStreamAsync(Stream source)
}
}

[Fact]
public static void SharedStreamConcurrentRead()
[Theory]
[InlineData(false)]
[InlineData(true)]
public static void SharedStreamConcurrentRead(bool compatWithAsync)
{
byte[] expected = [10, 20, 30, 40, 50, 60];
byte[] expected = RandomBytes(512);

using var stream = StreamSource.AsSharedStream(new(expected));
using var stream = StreamSource.AsSharedStream(new(expected), compatWithAsync);

var thread1 = new Thread(ReadStream);
var thread2 = new Thread(ReadStream);
Expand Down
2 changes: 1 addition & 1 deletion src/DotNext/DotNext.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Authors>.NET Foundation and Contributors</Authors>
<Company />
<Product>.NEXT Family of Libraries</Product>
<VersionPrefix>5.3.0</VersionPrefix>
<VersionPrefix>5.3.1</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyName>DotNext</AssemblyName>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
14 changes: 12 additions & 2 deletions src/DotNext/Func.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ public static Func<T> Constant<T>(T obj)
// slow path - allocates a new delegate
return obj is null
? Default!
: obj.UnboxAny<T>;
: typeof(T).IsValueType
? new BoxedConstant<T>() { Value = obj }.GetValue
: Unsafe.As<T, object>(ref obj).UnboxRefType<T>;

static T? Default() => default;
}

private static T UnboxAny<T>(this object obj) => (T)obj;
private static T UnboxRefType<T>(this object obj)
=> Unsafe.As<object, T>(ref obj);

private static Func<bool> Constant(bool value)
{
Expand Down Expand Up @@ -481,4 +484,11 @@ public static Result<TResult> TryInvoke<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10,

return result;
}

private sealed class BoxedConstant<T> : StrongBox<T>
{
internal T GetValue() => Value!;

public override string? ToString() => Value?.ToString();
}
}
72 changes: 55 additions & 17 deletions src/DotNext/IO/SharedReadOnlyMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,24 @@ namespace DotNext.IO;

using static Buffers.Memory;

internal sealed class SharedReadOnlyMemoryStream(ReadOnlySequence<byte> sequence) : ReadOnlyStream
internal abstract class SharedReadOnlyMemoryStream(ReadOnlySequence<byte> sequence) : ReadOnlyStream
{
// don't use BoxedValue due to limitations of AsyncLocal
private readonly AsyncLocal<StrongBox<SequencePosition>> position = new();

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private SequencePosition LocalPosition
private protected abstract SequencePosition LocalPosition
{
get => position.Value?.Value ?? sequence.Start;
set => (position.Value ??= new()).Value = value;
get;
set;
}

private protected SequencePosition StartPosition => sequence.Start;

private ReadOnlySequence<byte> GetRemainingSequence(out SequencePosition start)
=> sequence.Slice(start = LocalPosition);

public override bool CanSeek => true;
public sealed override bool CanSeek => true;

public override long Length => sequence.Length;
public sealed override long Length => sequence.Length;

public override long Position
public sealed override long Position
{
get => sequence.GetOffset(LocalPosition);
set
Expand All @@ -36,7 +34,7 @@ public override long Position
}
}

public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
public sealed override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
{
ValidateCopyToArguments(destination, bufferSize);

Expand All @@ -46,7 +44,7 @@ public override async Task CopyToAsync(Stream destination, int bufferSize, Cance
LocalPosition = sequence.End;
}

public override void CopyTo(Stream destination, int bufferSize)
public sealed override void CopyTo(Stream destination, int bufferSize)
{
ValidateCopyToArguments(destination, bufferSize);

Expand All @@ -56,16 +54,16 @@ public override void CopyTo(Stream destination, int bufferSize)
LocalPosition = sequence.End;
}

public override void SetLength(long value) => throw new NotSupportedException();
public sealed override void SetLength(long value) => throw new NotSupportedException();

public override int Read(Span<byte> buffer)
public sealed override int Read(Span<byte> buffer)
{
GetRemainingSequence(out var startPos).CopyTo(buffer, out var writtenCount);
LocalPosition = sequence.GetPosition(writtenCount, startPos);
return writtenCount;
}

public override long Seek(long offset, SeekOrigin origin)
public sealed override long Seek(long offset, SeekOrigin origin)
{
var newPosition = origin switch
{
Expand All @@ -84,5 +82,45 @@ public override long Seek(long offset, SeekOrigin origin)
return newPosition;
}

public override string ToString() => sequence.ToString();
public sealed override string ToString() => sequence.ToString();

internal static SharedReadOnlyMemoryStream CreateAsyncLocalStream(ReadOnlySequence<byte> sequence)
=> new AsyncLocalStream(sequence);

internal static SharedReadOnlyMemoryStream CreateThreadLocalStream(ReadOnlySequence<byte> sequence)
=> new ThreadLocalStream(sequence);
}

file sealed class AsyncLocalStream(ReadOnlySequence<byte> sequence) : SharedReadOnlyMemoryStream(sequence)
{
// don't use BoxedValue due to limitations of AsyncLocal
private readonly AsyncLocal<StrongBox<SequencePosition>> position = new();

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private protected override SequencePosition LocalPosition
{
get => position.Value?.Value ?? StartPosition;
set => (position.Value ??= new()).Value = value;
}
}

file sealed class ThreadLocalStream(ReadOnlySequence<byte> sequence) : SharedReadOnlyMemoryStream(sequence)
{
private readonly ThreadLocal<SequencePosition> position = new(Func.Constant(sequence.Start), trackAllValues: false);

private protected override SequencePosition LocalPosition
{
get => position.Value;
set => position.Value = value;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
position.Dispose();
}

base.Dispose(disposing);
}
}
14 changes: 11 additions & 3 deletions src/DotNext/IO/StreamSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,24 @@ public static Stream AsStream(this ReadOnlyMemory<byte> memory)
=> AsStream(new ReadOnlySequence<byte>(memory));

/// <summary>
/// Gets read-only stream that can be shared across async flows for independent reads.
/// Gets read-only stream that can be shared across async flows or threads for independent reads.
/// </summary>
/// <remarks>
/// You need to set a position explicitly before using stream for each parallel async flow.
/// <see cref="Stream.SetLength(long)"/> is not supported to avoid different views of the same stream.
/// </remarks>
/// <param name="sequence">The sequence of bytes.</param>
/// <param name="compatWithAsync">
/// <see langword="true"/> to create a stream than can be shared across async flows and different threads;
/// <see langword="false"/> to create a stream that is safe to share between different threads only.
/// </param>
/// <returns>The stream over sequence of bytes.</returns>
public static Stream AsSharedStream(this ReadOnlySequence<byte> sequence)
=> sequence.IsEmpty ? Stream.Null : new SharedReadOnlyMemoryStream(sequence);
public static Stream AsSharedStream(this ReadOnlySequence<byte> sequence, bool compatWithAsync = true)
=> sequence.IsEmpty
? Stream.Null
: compatWithAsync
? SharedReadOnlyMemoryStream.CreateAsyncLocalStream(sequence)
: SharedReadOnlyMemoryStream.CreateThreadLocalStream(sequence);

/// <summary>
/// Returns writable synchronous stream.
Expand Down

0 comments on commit 40e1dd9

Please sign in to comment.