Skip to content

Commit

Permalink
Merge branch 'develop' into gh-pages
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 9, 2024
2 parents 14a5451 + f4dfc0b commit 5288779
Show file tree
Hide file tree
Showing 25 changed files with 304 additions and 193 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Release Notes
====

# 07-09-2024
<a href="https://www.nuget.org/packages/dotnext.io/5.9.0">DotNext.IO 5.7.1</a>
* Improved performance of `FileWriter` in some corner cases

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.3">DotNext.Net.Cluster 5.7.3</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.3">DotNext.AspNetCore.Cluster 5.7.3</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

# 07-01-2024
<a href="https://www.nuget.org/packages/dotnext.threading/5.9.0">DotNext.Threading 5.9.0</a>
* Added `WaitAnyAsync` overload method to wait on a group of cancellation tokens that supports interruption
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)

# What's new
Release Date: 07-01-2024
Release Date: 07-09-2024

<a href="https://www.nuget.org/packages/dotnext.threading/5.9.0">DotNext.Threading 5.9.0</a>
* Added `WaitAnyAsync` overload method to wait on a group of cancellation tokens that supports interruption
<a href="https://www.nuget.org/packages/dotnext.io/5.9.0">DotNext.IO 5.7.1</a>
* Improved performance of `FileWriter` in some corner cases

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.2">DotNext.Net.Cluster 5.7.2</a>
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.7.3">DotNext.Net.Cluster 5.7.3</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.2">DotNext.AspNetCore.Cluster 5.7.2</a>
<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.7.3">DotNext.AspNetCore.Cluster 5.7.3</a>
* Fixed [244](https://github.com/dotnet/dotNext/issues/244)

Changelog for previous versions located [here](./CHANGELOG.md).
Expand Down
3 changes: 1 addition & 2 deletions src/DotNext.IO/DotNext.IO.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Authors>.NET Foundation and Contributors</Authors>
<Company />
<Product>.NEXT Family of Libraries</Product>
<VersionPrefix>5.7.0</VersionPrefix>
<VersionPrefix>5.7.1</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyName>DotNext.IO</AssemblyName>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down Expand Up @@ -59,7 +59,6 @@
</ItemGroup>

<ItemGroup>
<AdditionalFiles Include="../stylecop.json" />
<SourceRoot Include="$(MSBuildProjectDirectory)\..\..\" />
</ItemGroup>

Expand Down
7 changes: 7 additions & 0 deletions src/DotNext.IO/IO/BlittableTransferObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ static BlittableTransferObject<T> IBinaryFormattable<BlittableTransferObject<T>>
source.CopyTo(destination);
return result;
}

/// <inheritdoc cref="IDataTransferObject.TryGetMemory"/>
bool IDataTransferObject.TryGetMemory(out ReadOnlyMemory<byte> memory)
{
memory = Memory;
return true;
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
Expand Down
7 changes: 2 additions & 5 deletions src/DotNext.IO/IO/FileWriter.Binary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ private async ValueTask WriteBufferedAsync<T>(T arg, SpanAction<byte, T> writer,

private async ValueTask WriteDirectAsync<T>(T arg, SpanAction<byte, T> writer, int length, CancellationToken token)
{
await FlushCoreAsync(token).ConfigureAwait(false);

using var buffer = allocator.AllocateExactly(length);
writer(buffer.Span, arg);
await RandomAccess.WriteAsync(handle, buffer.Memory, fileOffset, token).ConfigureAwait(false);
fileOffset += buffer.Length;
await WriteDirectAsync(buffer.Memory, token).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -136,7 +133,7 @@ private int WriteLength(int length, LengthFormat lengthFormat)

/// <summary>
/// Encodes a block of memory, optionally prefixed with the length encoded as a sequence of bytes
/// according with the specified format.
/// according to the specified format.
/// </summary>
/// <param name="input">A block of memory.</param>
/// <param name="lengthFormat">Indicates how the length of the BLOB must be encoded.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
</ItemGroup>

<ItemGroup>
<AdditionalFiles Include="../stylecop.json" />
<SourceRoot Include="$(MSBuildProjectDirectory)\..\..\" />
</ItemGroup>
</Project>
1 change: 0 additions & 1 deletion src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
</ItemGroup>

<ItemGroup>
<AdditionalFiles Include="../stylecop.json" />
<SourceRoot Include="$(MSBuildProjectDirectory)\..\..\" />
</ItemGroup>
</Project>
32 changes: 32 additions & 0 deletions src/DotNext.Tests/IO/FileWriterTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace DotNext.IO;

using Buffers.Binary;

public sealed class FileWriterTests : Test
{
[Fact]
Expand Down Expand Up @@ -168,4 +171,33 @@ public static async Task FlushWithOffsetAsync()
Equal(2, actual[101]);
Equal(1, actual[100]);
}

[Fact]
public static async Task WriteDirect()
{
var path = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using var handle = File.OpenHandle(path, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, FileOptions.Asynchronous);
using var writer = new FileWriter(handle, fileOffset: 0L, bufferSize: 64);
await writer.WriteAsync(new Blittable<Buffer512> { Value = default });
False(writer.HasBufferedData);
Equal(writer.FilePosition, Unsafe.SizeOf<Buffer512>());
}

[Fact]
public static async Task BufferOverflow()
{
var path = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using var handle = File.OpenHandle(path, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, FileOptions.Asynchronous);
using var writer = new FileWriter(handle, fileOffset: 0L, bufferSize: 64);
await writer.WriteAsync(new byte[2]);
await writer.WriteAsync(new Blittable<Buffer512> { Value = default });
False(writer.HasBufferedData);
Equal(writer.FilePosition, Unsafe.SizeOf<Buffer512>() + 2);
}

[InlineArray(512)]
private struct Buffer512
{
private byte element0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public override ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToke
=> writer.WriteAsync(snapshot, token);
}

internal PersistentStateWithSnapshot(string path, bool useCaching, CompactionMode compactionMode = default)
internal PersistentStateWithSnapshot(string path, bool useCaching, CompactionMode compactionMode = CompactionMode.Sequential)
: base(path, RecordsPerPartition, new Options { UseCaching = useCaching, CompactionMode = compactionMode, IntegrityCheck = true, WriteMode = WriteMode.AutoFlush })
{
}
Expand Down Expand Up @@ -625,16 +625,20 @@ public static async Task ClearLog()
}

[Theory]
[InlineData(MemoryBasedStateMachine.CompactionMode.Background)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Foreground)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Sequential)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Incremental)]
public static async Task AppendAndCommitAsync(MemoryBasedStateMachine.CompactionMode compaction)
[InlineData(MemoryBasedStateMachine.CompactionMode.Background, false)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Foreground, false)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Sequential, false)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Incremental, false)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Background, true)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Foreground, true)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Sequential, true)]
[InlineData(MemoryBasedStateMachine.CompactionMode.Incremental, true)]
public static async Task AppendAndCommitAsync(MemoryBasedStateMachine.CompactionMode compaction, bool caching)
{
var entries = new Int64LogEntry[RecordsPerPartition * 2 + 1];
entries.AsSpan().ForEach((ref Int64LogEntry entry, int index) => entry = new Int64LogEntry { Content = 42L + index, Term = index });
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using var state = new PersistentStateWithSnapshot(dir, true, compaction);
using var state = new PersistentStateWithSnapshot(dir, caching, compaction);
Equal(0L, await state.As<IRaftLog>().AppendAndCommitAsync(new LogEntryList(entries), 1L, false, 0L));
Equal(0L, state.LastCommittedEntryIndex);
Equal(9L, state.LastEntryIndex);
Expand Down Expand Up @@ -666,12 +670,20 @@ public static async Task SequentialCompaction(bool useCaching)
Equal(entries.Length + 41L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Single(readResult);
Equal(9, snapshotIndex);
True(readResult[0].IsSnapshot);
Equal(7, snapshotIndex);
True(Single(readResult).IsSnapshot);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None);

checker = static (readResult, snapshotIndex, token) =>
{
NotEmpty(readResult);
Equal(7, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
}

Expand All @@ -688,8 +700,8 @@ public static async Task SequentialCompaction(bool useCaching)
Equal(0L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Single(readResult);
Equal(9, snapshotIndex);
NotEmpty(readResult);
Equal(7, snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
Expand All @@ -716,8 +728,8 @@ public static async Task BackgroundCompaction(bool useCaching)
await state.ForceCompactionAsync(1L, CancellationToken.None);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(3, readResult.Count);
Equal(4, snapshotIndex);
Equal(4, readResult.Count);
Equal(3, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
False(readResult[2].IsSnapshot);
Expand All @@ -726,8 +738,8 @@ public static async Task BackgroundCompaction(bool useCaching)
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(6, readResult.Count);
Equal(4, snapshotIndex);
Equal(7, readResult.Count);
Equal(3, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
False(readResult[2].IsSnapshot);
Expand All @@ -741,16 +753,16 @@ public static async Task BackgroundCompaction(bool useCaching)
{
checker = static (readResult, snapshotIndex, token) =>
{
Equal(3, readResult.Count);
Equal(4, readResult.Count);
NotNull(snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None);
Equal(0L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(6, readResult.Count);
Equal(4, snapshotIndex);
Equal(7, readResult.Count);
Equal(3, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
False(readResult[2].IsSnapshot);
Expand Down Expand Up @@ -821,13 +833,13 @@ public static async Task IncrementalCompaction(bool useCaching)
{
False(state.IsBackgroundCompaction);
await state.AppendAsync(new LogEntryList(entries));
await state.CommitAsync(4, CancellationToken.None);
await state.CommitAsync(5, CancellationToken.None);
await state.CommitAsync(CancellationToken.None);
Equal(entries.Length + 41L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(3, readResult.Count);
Equal(4, snapshotIndex);
Equal(2, readResult.Count);
Equal(5, snapshotIndex);
True(readResult[0].IsSnapshot);
return default;
};
Expand All @@ -839,16 +851,16 @@ public static async Task IncrementalCompaction(bool useCaching)
{
checker = static (readResult, snapshotIndex, token) =>
{
Equal(3, readResult.Count);
Equal(2, readResult.Count);
NotNull(snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, 6, CancellationToken.None);
Equal(0L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(6, readResult.Count);
Equal(4, snapshotIndex);
Equal(5, readResult.Count);
Equal(5, snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
Expand Down Expand Up @@ -1047,13 +1059,17 @@ public static async Task JsonSerialization(bool cached)
}

[Fact]
public static async Task RegressionIssue244()
public static async Task EnsureMetadataPersistence()
{
var path = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(path, RecordsPerPartition, new() { UseCaching = true }))
{
var entries = RandomEntries();
foreach (var entry in entries[..2])
IReadOnlyList<Int64LogEntry> entries =
[
new() { Term = 1L, Content = 10L },
new() { Term = 1L, Content = 11L }
];
foreach (var entry in entries)
{
await state.AppendAsync(entry);
}
Expand All @@ -1074,16 +1090,48 @@ public static async Task RegressionIssue244()

await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1L);
}
}

[Fact]
public static async Task RegressionIssue244()
{
Int64LogEntry snapshot;
long snapshotIndex;

var entries = new Int64LogEntry[RecordsPerPartition * 2 + 1];
entries.AsSpan().ForEach((ref Int64LogEntry entry, int index) => entry = new Int64LogEntry { Content = 42L + index, Term = index });
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithSnapshot(dir, true))
{
Equal(5L, await state.As<IRaftLog>().AppendAndCommitAsync(new LogEntryList(entries), 1L, false, 5L));
Equal(5L, state.LastCommittedEntryIndex);

// take snapshot
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<(Int64LogEntry, long)>> snapshotReader =
static async (entries, snapshotIndex, token) =>
{
NotNull(snapshotIndex);
Equal(snapshotIndex, 3L);

var entry = entries[0];
var snapshot = await entry.ToByteArrayAsync(token: token);
return (new Int64LogEntry() { Term = entry.Term, IsSnapshot = true, Content = BitConverter.ToInt64(snapshot) },
snapshotIndex.Value);
};
(snapshot, snapshotIndex) = await state
.As<IRaftLog>()
.ReadAsync(new IO.Log.LogEntryConsumer<IRaftLogEntry, (Int64LogEntry, long)>(snapshotReader), 1L);
}

Directory.Delete(dir, recursive: true);

static Int64LogEntry[] RandomEntries()
using (var state = new PersistentStateWithSnapshot(dir, true))
{
var entries = new Int64LogEntry[RecordsPerPartition];
for (var i = 0; i < entries.Length; i++)
{
entries[i] = new Int64LogEntry() { Term = 1L, Content = i + 10L };
}
// install snapshot
await state.AppendAsync(snapshot, snapshotIndex);
Equal(3L, state.LastCommittedEntryIndex);

return entries;
await state.AppendAsync(new Int64LogEntry { Content = 10L, Term = 20L }, 4L);
}
}
}
4 changes: 3 additions & 1 deletion src/DotNext.Tests/Threading/AsyncBarrierTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using static System.Threading.Timeout;

namespace DotNext.Threading;

public sealed class AsyncBarrierTests : Test
Expand Down Expand Up @@ -51,7 +53,7 @@ public static async Task PhaseCompletion()
ICollection<Task> tasks = new LinkedList<Task>();
Equal(0, barrier.CurrentPhaseNumber);
tasks.Add(barrier.SignalAndWaitAsync().AsTask());
tasks.Add(barrier.SignalAndWaitAsync().AsTask());
tasks.Add(barrier.SignalAndWaitAsync(InfiniteTimeSpan).AsTask());
tasks.Add(barrier.SignalAndWaitAsync().AsTask());
await Task.WhenAll(tasks);
Equal(1, barrier.CurrentPhaseNumber);
Expand Down
Loading

0 comments on commit 5288779

Please sign in to comment.