From 03ddd2b6e8c5c719c017a9b4a8d83e106db3b5b6 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 20 Nov 2024 10:36:59 +0000 Subject: [PATCH 01/12] Create method for processing efficient unbalanced parallel loops - Introduced the `ParallelUnbalancedWork` class to efficiently execute parallel loops over a range of integers, handling unbalanced workloads. - Added static `For` methods to support parallel execution with and without thread-local data, including initialization and finalization functions. - Utilized thread pooling and a shared counter (`SharedCounter`) to distribute iterations among threads dynamically. - Implemented internal classes (`BaseData`, `Data`, and `InitProcessor`) to manage shared state and thread synchronization. - Aimed to optimize performance in scenarios where the workload per iteration is uneven, ensuring better resource utilization and reduced execution time. --- .../Processing/BlockCachePreWarmer.cs | 24 +-- .../Processing/BlockProcessor.cs | 5 +- .../Processing/RecoverSignature.cs | 5 +- .../Threading/ParallelUnbalancedWork.cs | 192 ++++++++++++++++++ .../Nethermind.JsonRpc/JsonRpcService.cs | 5 +- .../PersistentStorageProvider.cs | 10 +- .../FastBlocks/SyncStatusList.cs | 4 +- .../Nethermind.Trie/TrieNode.Decoder.cs | 10 +- 8 files changed, 218 insertions(+), 37 deletions(-) create mode 100644 src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index 1426aca1430..39406fb333b 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -98,18 +98,13 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe { if (spec.WithdrawalsEnabled && block.Withdrawals is not null) { - int progress = 0; - Parallel.For(0, block.Withdrawals.Length, parallelOptions, - _ => + ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, + i => { IReadOnlyTxProcessorSource env = _envPool.Get(); - int i = 0; try { using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); - // Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - i = Interlocked.Increment(ref progress) - 1; scope.WorldState.WarmUp(block.Withdrawals[i].Address); } finally @@ -135,8 +130,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp try { - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, _ => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, i => { using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); IReadOnlyTxProcessorSource env = _envPool.Get(); @@ -144,9 +138,6 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp Transaction? tx = null; try { - // Process transactions in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - int i = Interlocked.Increment(ref progress) - 1; // If the transaction has already been processed or being processed, exit early if (block.TransactionProcessed > i) return; @@ -273,14 +264,9 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block) } } - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, - _ => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, + i => { - int i = 0; - // Process addresses in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - i = Interlocked.Increment(ref progress) - 1; Transaction tx = block.Transactions[i]; Address? sender = tx.SenderAddress; diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index 154733f9903..d10fb0aa60e 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -19,6 +19,7 @@ using Nethermind.Core; using Nethermind.Core.Crypto; using Nethermind.Core.Specs; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.Evm; using Nethermind.Evm.Tracing; @@ -333,10 +334,8 @@ protected virtual TxReceipt[] ProcessBlock( [MethodImpl(MethodImplOptions.NoInlining)] private static void CalculateBlooms(TxReceipt[] receipts) { - int index = 0; - Parallel.For(0, receipts.Length, _ => + ParallelUnbalancedWork.For(0, receipts.Length, i => { - int i = Interlocked.Increment(ref index) - 1; receipts[i].CalculateBloom(); }); } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs index c748ecae850..32278552f8a 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Specs; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.Logging; using Nethermind.Serialization.Rlp; @@ -48,7 +49,7 @@ public void RecoverData(Block block) // so we assume the rest of txs in the block are already recovered return; - Parallel.For(0, txs.Length, i => + ParallelUnbalancedWork.For(0, txs.Length, i => { Transaction tx = txs[i]; if (!tx.IsHashCalculated) @@ -111,7 +112,7 @@ public void RecoverData(Block block) if (recoverFromEcdsa > 3) { // Recover ecdsa in Parallel - Parallel.For(0, txs.Length, i => + ParallelUnbalancedWork.For(0, txs.Length, i => { Transaction tx = txs[i]; if (!ShouldRecoverSignatures(tx)) return; diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs new file mode 100644 index 00000000000..424a9940d91 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -0,0 +1,192 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Core.Threading; + +public class ParallelUnbalancedWork : IThreadPoolWorkItem +{ + private static readonly ParallelOptions s_parallelOptions = new() + { + // default to the number of processors + MaxDegreeOfParallelism = Environment.ProcessorCount + }; + + private readonly Data _data; + + public static void For(int fromInclusive, int toExclusive, Action action) + => For(fromInclusive, toExclusive, s_parallelOptions, action); + + public static void For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action action) + { + int threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; + + Data data = new(threads, fromInclusive, toExclusive, action); + + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new ParallelUnbalancedWork(data), preferLocal: false); + } + + new ParallelUnbalancedWork(data).Execute(); + + if (data.ActiveThreads > 0) + { + lock (data) + { + if (data.ActiveThreads > 0) + { + // Wait for remaining to complete + Monitor.Wait(data); + } + } + } + } + + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func init, + Func action, + Action @finally) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, init, default, action, @finally); + + public static void For(int fromInclusive, int toExclusive, TLocal state, Func action) + => For(fromInclusive, toExclusive, s_parallelOptions, state, action); + + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + TLocal state, + Func action) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, state, action); + + private ParallelUnbalancedWork(Data data) + { + _data = data; + } + + public void Execute() + { + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + _data.Action(i); + i = _data.Index.GetNext(); + } + + _data.MarkThreadCompleted(); + } + + private class SharedCounter(int fromInclusive) + { + private int _index = fromInclusive; + public int GetNext() => Interlocked.Increment(ref _index) - 1; + } + + private class BaseData(int threads, int fromInclusive, int toExclusive) + { + public SharedCounter Index { get; } = new SharedCounter(fromInclusive); + public int ToExclusive => toExclusive; + public int ActiveThreads => Volatile.Read(ref threads); + + public int MarkThreadCompleted() + { + var remaining = Interlocked.Decrement(ref threads); + + if (remaining == 0) + { + lock (this) + { + Monitor.Pulse(this); + } + } + + return remaining; + } + } + + private class Data(int threads, int fromInclusive, int toExclusive, Action action) : + BaseData(threads, fromInclusive, toExclusive) + { + public Action Action => action; + } + + private class InitProcessor : IThreadPoolWorkItem + { + private readonly Data _data; + + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func? init, + TLocal? initValue, + Func action, + Action? @finally = null) + { + var threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; + + var data = new Data(threads, fromInclusive, toExclusive, action, init, initValue, @finally); + + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new InitProcessor(data), preferLocal: false); + } + + new InitProcessor(data).Execute(); + + if (data.ActiveThreads > 0) + { + lock (data) + { + if (data.ActiveThreads > 0) + { + // Wait for remaining to complete + Monitor.Wait(data); + } + } + } + } + + private InitProcessor(Data data) => _data = data; + + public void Execute() + { + TLocal? value = _data.Init(); + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + value = _data.Action(i, value); + i = _data.Index.GetNext(); + } + + _data.Finally(value); + + _data.MarkThreadCompleted(); + } + + private class Data(int threads, + int fromInclusive, + int toExclusive, + Func action, + Func? init = null, + TValue? initValue = default, + Action? @finally = null) : BaseData(threads, fromInclusive, toExclusive) + { + public Func Action => action; + + public TValue Init() => initValue ?? (init is not null ? init.Invoke() : default)!; + + public void Finally(TValue value) + { + @finally?.Invoke(value); + } + } + } +} diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs index 74f027bb80e..cc67f658b88 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs @@ -9,14 +9,13 @@ using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; - using Nethermind.Core; +using Nethermind.Core.Threading; using Nethermind.JsonRpc.Exceptions; using Nethermind.JsonRpc.Modules; using Nethermind.Logging; using Nethermind.Serialization.Json; using Nethermind.State; - using static Nethermind.JsonRpc.Modules.RpcModuleProvider; using static Nethermind.JsonRpc.Modules.RpcModuleProvider.ResolvedMethodInfo; @@ -357,7 +356,7 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec } else if (providedParametersLength > parallelThreshold) { - Parallel.For(0, providedParametersLength, (int i) => + ParallelUnbalancedWork.For(0, providedParametersLength, (int i) => { JsonElement providedParameter = providedParameters[i]; ExpectedParameter expectedParameter = expectedParameters[i]; diff --git a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs index 9bbbf99282d..58693fe964f 100644 --- a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs +++ b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs @@ -14,6 +14,7 @@ using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; using Nethermind.Int256; using Nethermind.Logging; using Nethermind.State.Tracing; @@ -22,6 +23,7 @@ namespace Nethermind.State; using Nethermind.Core.Cpu; + /// /// Manages persistent storage allowing for snapshotting and restoring /// Persists data to ITrieStore @@ -266,8 +268,10 @@ void UpdateRootHashesSingleThread() void UpdateRootHashesMultiThread() { // We can recalculate the roots in parallel as they are all independent tries - Parallel.ForEach(_storages, RuntimeInformation.ParallelOptionsLogicalCores, kvp => + var storages = _storages.ToArray(); + ParallelUnbalancedWork.For(0, storages.Length, RuntimeInformation.ParallelOptionsLogicalCores, i => { + ref var kvp = ref storages[i]; if (!_toUpdateRoots.Contains(kvp.Key)) { // Wasn't updated don't recalculate @@ -278,8 +282,9 @@ void UpdateRootHashesMultiThread() }); // Update the storage roots in the main thread non in parallel - foreach (KeyValuePair kvp in _storages) + for (int i = 0; i < storages.Length; i++) { + ref var kvp = ref storages[i]; if (!_toUpdateRoots.Contains(kvp.Key)) { continue; @@ -288,7 +293,6 @@ void UpdateRootHashesMultiThread() // Update the storage root for the Account _stateProvider.UpdateStorageRoot(address: kvp.Key, kvp.Value.RootHash); } - } } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs index 208d355a844..115fe36dfdb 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs @@ -3,11 +3,11 @@ using System; using System.Threading; -using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Core; using Nethermind.Core.Caching; using Nethermind.Core.Collections; +using Nethermind.Core.Threading; namespace Nethermind.Synchronization.FastBlocks { @@ -123,7 +123,7 @@ public bool TryGetInfosForBatch(int batchSize, Func blockExist, { bool hasNonNull = false; bool hasInserted = false; - Parallel.For(0, workingArray.Count, (i) => + ParallelUnbalancedWork.For(0, workingArray.Count, (i) => { if (workingArray[i] is not null) { diff --git a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs index b32c4f95a59..6ff1cf3a203 100644 --- a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs +++ b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs @@ -7,10 +7,10 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; using Nethermind.Core.Buffers; using Nethermind.Core.Cpu; using Nethermind.Core.Crypto; +using Nethermind.Core.Threading; using Nethermind.Serialization.Rlp; using Nethermind.Trie.Pruning; @@ -175,9 +175,9 @@ private static int GetChildrenRlpLengthForBranchParallel(ITrieNodeResolver tree, private static int GetChildrenRlpLengthForBranchNonRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool bufferPool) { int totalLength = 0; - Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, + ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, () => 0, - (i, _, local) => + (i, local) => { object? data = item._data[i]; if (ReferenceEquals(data, _nullNode) || data is null) @@ -236,9 +236,9 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r private static int GetChildrenRlpLengthForBranchRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool? bufferPool) { int totalLength = 0; - Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, + ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, () => 0, - (i, _, local) => + (i, local) => { ValueRlpStream rlpStream = item.RlpStream; item.SeekChild(ref rlpStream, i); From 2b691f18d4effd2d7c95f0a54a37ceba284dba2a Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 20 Nov 2024 10:37:50 +0000 Subject: [PATCH 02/12] Add doc comments --- .../Threading/ParallelUnbalancedWork.cs | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index 424a9940d91..f362c662853 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -7,6 +7,9 @@ namespace Nethermind.Core.Threading; +/// +/// Provides methods to execute parallel loops efficiently for unbalanced workloads. +/// public class ParallelUnbalancedWork : IThreadPoolWorkItem { private static readonly ParallelOptions s_parallelOptions = new() @@ -17,9 +20,22 @@ public class ParallelUnbalancedWork : IThreadPoolWorkItem private readonly Data _data; + /// + /// Executes a parallel for loop over a range of integers. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The delegate that is invoked once per iteration. public static void For(int fromInclusive, int toExclusive, Action action) => For(fromInclusive, toExclusive, s_parallelOptions, action); + /// + /// Executes a parallel for loop over a range of integers, with the specified options. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The delegate that is invoked once per iteration. public static void For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action action) { int threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; @@ -46,6 +62,16 @@ public static void For(int fromInclusive, int toExclusive, ParallelOptions paral } } + /// + /// Executes a parallel for loop over a range of integers, with thread-local data, initialization, and finalization functions. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. public static void For( int fromInclusive, int toExclusive, @@ -55,9 +81,26 @@ public static void For( Action @finally) => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, init, default, action, @finally); + /// + /// Executes a parallel for loop over a range of integers, with thread-local data. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. public static void For(int fromInclusive, int toExclusive, TLocal state, Func action) => For(fromInclusive, toExclusive, s_parallelOptions, state, action); + /// + /// Executes a parallel for loop over a range of integers, with thread-local data and specified options. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. public static void For( int fromInclusive, int toExclusive, @@ -66,11 +109,18 @@ public static void For( Func action) => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, state, action); + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. private ParallelUnbalancedWork(Data data) { _data = data; } + /// + /// Executes the parallel work item. + /// public void Execute() { int i = _data.Index.GetNext(); @@ -83,18 +133,44 @@ public void Execute() _data.MarkThreadCompleted(); } + /// + /// Provides a thread-safe counter for sharing indices among threads. + /// private class SharedCounter(int fromInclusive) { private int _index = fromInclusive; + + /// + /// Gets the next index in a thread-safe manner. + /// + /// The next index. public int GetNext() => Interlocked.Increment(ref _index) - 1; } + /// + /// Represents the base data shared among threads during parallel execution. + /// private class BaseData(int threads, int fromInclusive, int toExclusive) { + /// + /// Gets the shared counter for indices. + /// public SharedCounter Index { get; } = new SharedCounter(fromInclusive); + + /// + /// Gets the exclusive upper bound of the range. + /// public int ToExclusive => toExclusive; + + /// + /// Gets the number of active threads. + /// public int ActiveThreads => Volatile.Read(ref threads); + /// + /// Marks a thread as completed. + /// + /// The number of remaining active threads. public int MarkThreadCompleted() { var remaining = Interlocked.Decrement(ref threads); @@ -111,16 +187,36 @@ public int MarkThreadCompleted() } } + /// + /// Represents the data shared among threads for the parallel action. + /// private class Data(int threads, int fromInclusive, int toExclusive, Action action) : BaseData(threads, fromInclusive, toExclusive) { + /// + /// Gets the action to be executed for each iteration. + /// public Action Action => action; } + /// + /// Provides methods to execute parallel loops with thread-local data initialization and finalization. + /// + /// The type of the thread-local data. private class InitProcessor : IThreadPoolWorkItem { private readonly Data _data; + /// + /// Executes a parallel for loop over a range of integers, with thread-local data initialization and finalization. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The initial value of the local data. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. public static void For( int fromInclusive, int toExclusive, @@ -154,8 +250,15 @@ public static void For( } } + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. private InitProcessor(Data data) => _data = data; + /// + /// Executes the parallel work item with thread-local data. + /// public void Execute() { TLocal? value = _data.Init(); @@ -171,6 +274,10 @@ public void Execute() _data.MarkThreadCompleted(); } + /// + /// Represents the data shared among threads for the parallel action with thread-local data. + /// + /// The type of the thread-local data. private class Data(int threads, int fromInclusive, int toExclusive, @@ -179,10 +286,21 @@ private class Data(int threads, TValue? initValue = default, Action? @finally = null) : BaseData(threads, fromInclusive, toExclusive) { + /// + /// Gets the action to be executed for each iteration. + /// public Func Action => action; + /// + /// Initializes the thread-local data. + /// + /// The initialized thread-local data. public TValue Init() => initValue ?? (init is not null ? init.Invoke() : default)!; + /// + /// Finalizes the thread-local data. + /// + /// The thread-local data to finalize. public void Finally(TValue value) { @finally?.Invoke(value); From 942a2a5aae8a75a01fbbe71d335eea99576f00ab Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 20 Nov 2024 10:49:00 +0000 Subject: [PATCH 03/12] Comments --- .../Threading/ParallelUnbalancedWork.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index f362c662853..4cab8a8498d 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -127,9 +127,11 @@ public void Execute() while (i < _data.ToExclusive) { _data.Action(i); + // Get the next index i = _data.Index.GetNext(); } + // Signal that this thread has completed its work _data.MarkThreadCompleted(); } @@ -179,6 +181,7 @@ public int MarkThreadCompleted() { lock (this) { + // Notify any waiting threads that all work is complete Monitor.Pulse(this); } } @@ -226,24 +229,31 @@ public static void For( Func action, Action? @finally = null) { - var threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; + // Determine the number of threads to use + var threads = parallelOptions.MaxDegreeOfParallelism > 0 + ? parallelOptions.MaxDegreeOfParallelism + : Environment.ProcessorCount; + // Create shared data with thread-local initializers and finalizers var data = new Data(threads, fromInclusive, toExclusive, action, init, initValue, @finally); + // Queue work items to the thread pool for all threads except the current one for (int i = 0; i < threads - 1; i++) { ThreadPool.UnsafeQueueUserWorkItem(new InitProcessor(data), preferLocal: false); } + // Execute work on the current thread new InitProcessor(data).Execute(); + // If there are still active threads, wait for them to complete if (data.ActiveThreads > 0) { lock (data) { if (data.ActiveThreads > 0) { - // Wait for remaining to complete + // Wait for the remaining threads to complete Monitor.Wait(data); } } From c7f169a72c680bc40a186e0df5bd44b8069dc4d5 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 20 Nov 2024 11:11:25 +0000 Subject: [PATCH 04/12] Use ManualResetEventSlim --- .../Threading/ParallelUnbalancedWork.cs | 32 +++---------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index 4cab8a8498d..94d4eafc7fe 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -49,17 +49,8 @@ public static void For(int fromInclusive, int toExclusive, ParallelOptions paral new ParallelUnbalancedWork(data).Execute(); - if (data.ActiveThreads > 0) - { - lock (data) - { - if (data.ActiveThreads > 0) - { - // Wait for remaining to complete - Monitor.Wait(data); - } - } - } + // If there are still active threads, wait for them to complete + data.Event.Wait(); } /// @@ -158,6 +149,7 @@ private class BaseData(int threads, int fromInclusive, int toExclusive) /// Gets the shared counter for indices. /// public SharedCounter Index { get; } = new SharedCounter(fromInclusive); + public ManualResetEventSlim Event { get; } = new(); /// /// Gets the exclusive upper bound of the range. @@ -179,11 +171,7 @@ public int MarkThreadCompleted() if (remaining == 0) { - lock (this) - { - // Notify any waiting threads that all work is complete - Monitor.Pulse(this); - } + Event.Set(); } return remaining; @@ -247,17 +235,7 @@ public static void For( new InitProcessor(data).Execute(); // If there are still active threads, wait for them to complete - if (data.ActiveThreads > 0) - { - lock (data) - { - if (data.ActiveThreads > 0) - { - // Wait for the remaining threads to complete - Monitor.Wait(data); - } - } - } + data.Event.Wait(); } /// From 58fbcf689f6cf72a54d0e37c8109a41e72afe13b Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 20 Nov 2024 13:05:33 +0000 Subject: [PATCH 05/12] Add padding --- .../Threading/ParallelUnbalancedWork.cs | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index 94d4eafc7fe..1d2e6069a70 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -50,7 +51,10 @@ public static void For(int fromInclusive, int toExclusive, ParallelOptions paral new ParallelUnbalancedWork(data).Execute(); // If there are still active threads, wait for them to complete - data.Event.Wait(); + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } } /// @@ -131,13 +135,20 @@ public void Execute() /// private class SharedCounter(int fromInclusive) { - private int _index = fromInclusive; + private PaddedValue _index = new(fromInclusive); /// /// Gets the next index in a thread-safe manner. /// /// The next index. - public int GetNext() => Interlocked.Increment(ref _index) - 1; + public int GetNext() => Interlocked.Increment(ref _index.Value) - 1; + + [StructLayout(LayoutKind.Explicit, Size = 128)] + private struct PaddedValue(int value) + { + [FieldOffset(64)] + public int Value = value; + } } /// @@ -149,7 +160,7 @@ private class BaseData(int threads, int fromInclusive, int toExclusive) /// Gets the shared counter for indices. /// public SharedCounter Index { get; } = new SharedCounter(fromInclusive); - public ManualResetEventSlim Event { get; } = new(); + public SemaphoreSlim Event { get; } = new(initialCount: 0); /// /// Gets the exclusive upper bound of the range. @@ -171,7 +182,7 @@ public int MarkThreadCompleted() if (remaining == 0) { - Event.Set(); + Event.Release(); } return remaining; @@ -235,7 +246,10 @@ public static void For( new InitProcessor(data).Execute(); // If there are still active threads, wait for them to complete - data.Event.Wait(); + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } } /// From 6224dc9ccc06ae3b5880cdf07a2adbc09b606469 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Fri, 22 Nov 2024 03:28:19 +0000 Subject: [PATCH 06/12] Less allocations --- .../Processing/BlockCachePreWarmer.cs | 57 ++++++++++--------- .../TransactionProcessor.cs | 2 +- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index 39406fb333b..e803fa8c98b 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -98,19 +98,21 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe { if (spec.WithdrawalsEnabled && block.Withdrawals is not null) { - ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, - i => + ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot), + static (i, state) => { - IReadOnlyTxProcessorSource env = _envPool.Get(); + IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get(); try { - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); - scope.WorldState.WarmUp(block.Withdrawals[i].Address); + using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot); + scope.WorldState.WarmUp(state.block.Withdrawals[i].Address); } finally { - _envPool.Return(env); + state.preWarmer._envPool.Return(env); } + + return state; }); } } @@ -130,20 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp try { - ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, i => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: this, block, stateRoot, spec), static (i, state) => { - using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); - IReadOnlyTxProcessorSource env = _envPool.Get(); - SystemTransaction systemTransaction = _systemTransactionPool.Get(); + IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get(); + SystemTransaction systemTransaction = state.preWarmer._systemTransactionPool.Get(); Transaction? tx = null; try { // If the transaction has already been processed or being processed, exit early - if (block.TransactionProcessed > i) return; + if (state.block.TransactionProcessed > i) return state; - tx = block.Transactions[i]; + tx = state.block.Transactions[i]; tx.CopyTo(systemTransaction); - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); + using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot); Address senderAddress = tx.SenderAddress!; if (!scope.WorldState.AccountExists(senderAddress)) @@ -154,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp UInt256 nonceDelta = UInt256.Zero; for (int prev = 0; prev < i; prev++) { - if (senderAddress == block.Transactions[prev].SenderAddress) + if (senderAddress == state.block.Transactions[prev].SenderAddress) { nonceDelta++; } @@ -165,12 +166,12 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp scope.WorldState.IncrementNonce(senderAddress, nonceDelta); } - if (spec.UseTxAccessLists) + if (state.spec.UseTxAccessLists) { scope.WorldState.WarmUp(tx.AccessList); // eip-2930 } - TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance); - if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); + TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.block.Header.Clone()), NullTxTracer.Instance); + if (state.preWarmer._logger.IsTrace) state.preWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); } catch (Exception ex) when (ex is EvmException or OverflowException) { @@ -178,13 +179,15 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp } catch (Exception ex) { - if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex); + if (state.preWarmer._logger.IsDebug) state.preWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex); } finally { - _systemTransactionPool.Return(systemTransaction); - _envPool.Return(env); + state.preWarmer._systemTransactionPool.Return(systemTransaction); + state.preWarmer._envPool.Return(env); } + + return state; }); } catch (OperationCanceledException) @@ -264,16 +267,16 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block) } } - ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, - i => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot), + static (i, state) => { - Transaction tx = block.Transactions[i]; + Transaction tx = state.block.Transactions[i]; Address? sender = tx.SenderAddress; - var env = PreWarmer._envPool.Get(); + var env = state.preWarmer._envPool.Get(); try { - using IReadOnlyTxProcessingScope scope = env.Build(StateRoot); + using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot); if (sender is not null) { scope.WorldState.WarmUp(sender); @@ -286,8 +289,10 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block) } finally { - PreWarmer._envPool.Return(env); + state.preWarmer._envPool.Return(env); } + + return state; }); } catch (OperationCanceledException) diff --git a/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs b/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs index e3ff782f422..de88ea69482 100644 --- a/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs +++ b/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs @@ -141,7 +141,7 @@ protected virtual TransactionResult Execute(Transaction tx, in BlockExecutionCon // commit - is for standard execute, we will commit thee state after execution // !commit - is for build up during block production, we won't commit state after each transaction to support rollbacks // we commit only after all block is constructed - bool commit = opts.HasFlag(ExecutionOptions.Commit) || !spec.IsEip658Enabled; + bool commit = opts.HasFlag(ExecutionOptions.Commit) || (!opts.HasFlag(ExecutionOptions.Warmup) && !spec.IsEip658Enabled); TransactionResult result; if (!(result = ValidateStatic(tx, header, spec, opts, out long intrinsicGas))) return result; From 8f5f3283490a33ea8554a6f2c5b98b092963e483 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Fri, 22 Nov 2024 03:56:32 +0000 Subject: [PATCH 07/12] Less allocations --- .../Processing/BlockProcessor.cs | 8 ++- .../Processing/RecoverSignature.cs | 64 +++++++++++-------- .../Threading/ParallelUnbalancedWork.cs | 6 +- .../Nethermind.JsonRpc/JsonRpcService.cs | 19 ++++-- .../PersistentStorageProvider.cs | 14 ++-- 5 files changed, 72 insertions(+), 39 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index d10fb0aa60e..063988403c3 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -334,9 +334,15 @@ protected virtual TxReceipt[] ProcessBlock( [MethodImpl(MethodImplOptions.NoInlining)] private static void CalculateBlooms(TxReceipt[] receipts) { - ParallelUnbalancedWork.For(0, receipts.Length, i => + ParallelUnbalancedWork.For( + 0, + receipts.Length, + ParallelUnbalancedWork.DefaultOptions, + receipts, + static (i, receipts) => { receipts[i].CalculateBloom(); + return receipts; }); } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs index 32278552f8a..ac780699cb9 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs @@ -49,13 +49,20 @@ public void RecoverData(Block block) // so we assume the rest of txs in the block are already recovered return; - ParallelUnbalancedWork.For(0, txs.Length, i => + ParallelUnbalancedWork.For( + 0, + txs.Length, + ParallelUnbalancedWork.DefaultOptions, + txs, + static (i, txs) => { Transaction tx = txs[i]; if (!tx.IsHashCalculated) { tx.CalculateHashInternal(); } + + return txs; }); @@ -112,14 +119,21 @@ public void RecoverData(Block block) if (recoverFromEcdsa > 3) { // Recover ecdsa in Parallel - ParallelUnbalancedWork.For(0, txs.Length, i => + ParallelUnbalancedWork.For( + 0, + txs.Length, + ParallelUnbalancedWork.DefaultOptions, + (recover: this, txs, releaseSpec, useSignatureChainId), + static (i, state) => { - Transaction tx = txs[i]; - if (!ShouldRecoverSignatures(tx)) return; + Transaction tx = state.txs[i]; + if (!ShouldRecoverSignatures(tx)) return state; - tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId); - RecoverAuthorities(tx); - if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); + tx.SenderAddress ??= state.recover._ecdsa.RecoverAddress(tx, state.useSignatureChainId); + state.recover.RecoverAuthorities(tx, state.releaseSpec); + if (state.recover._logger.IsTrace) state.recover._logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); + + return state; }); } else @@ -129,32 +143,32 @@ public void RecoverData(Block block) if (!ShouldRecoverSignatures(tx)) continue; tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId); - RecoverAuthorities(tx); + RecoverAuthorities(tx, releaseSpec); if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); } } + } - void RecoverAuthorities(Transaction tx) + private void RecoverAuthorities(Transaction tx, IReleaseSpec releaseSpec) + { + if (!releaseSpec.IsAuthorizationListEnabled + || !tx.HasAuthorizationList) { - if (!releaseSpec.IsAuthorizationListEnabled - || !tx.HasAuthorizationList) - { - return; - } + return; + } - if (tx.AuthorizationList.Length > 3) + if (tx.AuthorizationList.Length > 3) + { + Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) => { - Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) => - { - tuple.Authority = _ecdsa.RecoverAddress(tuple); - }); - } - else + tuple.Authority = _ecdsa.RecoverAddress(tuple); + }); + } + else + { + foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan()) { - foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan()) - { - tuple.Authority ??= _ecdsa.RecoverAddress(tuple); - } + tuple.Authority ??= _ecdsa.RecoverAddress(tuple); } } } diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index 1d2e6069a70..f18430bf913 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -13,7 +13,7 @@ namespace Nethermind.Core.Threading; /// public class ParallelUnbalancedWork : IThreadPoolWorkItem { - private static readonly ParallelOptions s_parallelOptions = new() + public static readonly ParallelOptions DefaultOptions = new() { // default to the number of processors MaxDegreeOfParallelism = Environment.ProcessorCount @@ -28,7 +28,7 @@ public class ParallelUnbalancedWork : IThreadPoolWorkItem /// The exclusive upper bound of the range. /// The delegate that is invoked once per iteration. public static void For(int fromInclusive, int toExclusive, Action action) - => For(fromInclusive, toExclusive, s_parallelOptions, action); + => For(fromInclusive, toExclusive, DefaultOptions, action); /// /// Executes a parallel for loop over a range of integers, with the specified options. @@ -85,7 +85,7 @@ public static void For( /// The initial state of the thread-local data. /// The delegate that is invoked once per iteration. public static void For(int fromInclusive, int toExclusive, TLocal state, Func action) - => For(fromInclusive, toExclusive, s_parallelOptions, state, action); + => For(fromInclusive, toExclusive, DefaultOptions, state, action); /// /// Executes a parallel for loop over a range of integers, with thread-local data and specified options. diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs index cc67f658b88..1c48629b742 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs @@ -356,17 +356,24 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec } else if (providedParametersLength > parallelThreshold) { - ParallelUnbalancedWork.For(0, providedParametersLength, (int i) => + ParallelUnbalancedWork.For( + 0, + providedParametersLength, + ParallelUnbalancedWork.DefaultOptions, + (providedParameters, expectedParameters, executionParameters, hasMissing), + static (i, state) => { - JsonElement providedParameter = providedParameters[i]; - ExpectedParameter expectedParameter = expectedParameters[i]; + JsonElement providedParameter = state.providedParameters[i]; + ExpectedParameter expectedParameter = state.expectedParameters[i]; object? parameter = DeserializeParameter(providedParameter, expectedParameter); - executionParameters[i] = parameter; - if (!hasMissing && ReferenceEquals(parameter, Type.Missing)) + state.executionParameters[i] = parameter; + if (!state.hasMissing && ReferenceEquals(parameter, Type.Missing)) { - hasMissing = true; + state.hasMissing = true; } + + return state; }); } diff --git a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs index 58693fe964f..fb6b5474ac0 100644 --- a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs +++ b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs @@ -269,16 +269,22 @@ void UpdateRootHashesMultiThread() { // We can recalculate the roots in parallel as they are all independent tries var storages = _storages.ToArray(); - ParallelUnbalancedWork.For(0, storages.Length, RuntimeInformation.ParallelOptionsLogicalCores, i => + ParallelUnbalancedWork.For( + 0, + storages.Length, + RuntimeInformation.ParallelOptionsLogicalCores, + (storages, toUpdateRoots: _toUpdateRoots), + static (i, state) => { - ref var kvp = ref storages[i]; - if (!_toUpdateRoots.Contains(kvp.Key)) + ref var kvp = ref state.storages[i]; + if (!state.toUpdateRoots.Contains(kvp.Key)) { // Wasn't updated don't recalculate - return; + return state; } StorageTree storageTree = kvp.Value; storageTree.UpdateRootHash(canBeParallel: false); + return state; }); // Update the storage roots in the main thread non in parallel From 061956abf020d9abfa3309df6e02b1b5cd18b9e8 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Fri, 22 Nov 2024 04:07:29 +0000 Subject: [PATCH 08/12] Less allocations --- .../Threading/ParallelUnbalancedWork.cs | 19 +++++++ .../Nethermind.Trie/TrieNode.Decoder.cs | 50 +++++++++---------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index f18430bf913..3844007d69e 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -76,6 +76,25 @@ public static void For( Action @finally) => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, init, default, action, @finally); + /// + /// Executes a parallel for loop over a range of integers, with thread-local data, initialization, and finalization functions. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The initial the local data for each thread. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + TLocal value, + Func action, + Action @finally) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, value, action, @finally); + /// /// Executes a parallel for loop over a range of integers, with thread-local data. /// diff --git a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs index 6ff1cf3a203..0223672333f 100644 --- a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs +++ b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs @@ -176,32 +176,32 @@ private static int GetChildrenRlpLengthForBranchNonRlpParallel(ITrieNodeResolver { int totalLength = 0; ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, - () => 0, - (i, local) => + (local: 0, item, tree, bufferPool, rootPath), + static (i, state) => { - object? data = item._data[i]; + object? data = state.item._data[i]; if (ReferenceEquals(data, _nullNode) || data is null) { - local++; + state.local++; } else if (data is Hash256) { - local += Rlp.LengthOfKeccakRlp; + state.local += Rlp.LengthOfKeccakRlp; } else { - TreePath path = rootPath; + TreePath path = state.rootPath; path.AppendMut(i); TrieNode childNode = Unsafe.As(data); - childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool); - local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; + childNode.ResolveKey(state.tree, ref path, isRoot: false, bufferPool: state.bufferPool); + state.local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; } - return local; + return state; }, - local => + state => { - Interlocked.Add(ref totalLength, local); + Interlocked.Add(ref totalLength, state.local); }); return totalLength; @@ -237,39 +237,39 @@ private static int GetChildrenRlpLengthForBranchRlpParallel(ITrieNodeResolver tr { int totalLength = 0; ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, - () => 0, - (i, local) => + (local: 0, item, tree, bufferPool, rootPath), + static (i, state) => { - ValueRlpStream rlpStream = item.RlpStream; - item.SeekChild(ref rlpStream, i); - object? data = item._data[i]; + ValueRlpStream rlpStream = state.item.RlpStream; + state.item.SeekChild(ref rlpStream, i); + object? data = state.item._data[i]; if (data is null) { - local += rlpStream.PeekNextRlpLength(); + state.local += rlpStream.PeekNextRlpLength(); } else if (ReferenceEquals(data, _nullNode)) { - local++; + state.local++; } else if (data is Hash256) { - local += Rlp.LengthOfKeccakRlp; + state.local += Rlp.LengthOfKeccakRlp; } else { - TreePath path = rootPath; + TreePath path = state.rootPath; path.AppendMut(i); Debug.Assert(data is TrieNode, "Data is not TrieNode"); TrieNode childNode = Unsafe.As(data); - childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool); - local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; + childNode.ResolveKey(state.tree, ref path, isRoot: false, bufferPool: state.bufferPool); + state.local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; } - return local; + return state; }, - local => + state => { - Interlocked.Add(ref totalLength, local); + Interlocked.Add(ref totalLength, state.local); }); return totalLength; From 0a3b349320a84c145f5f4b7d31a4bbf09db07f91 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Dec 2024 11:43:26 +0000 Subject: [PATCH 09/12] Use pooledlist --- .../Nethermind.Core/Collections/ArrayPoolList.cs | 6 ++++++ .../Nethermind.State/PersistentStorageProvider.cs | 9 ++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs b/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs index 574e574992b..7b734e34236 100644 --- a/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs +++ b/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs @@ -281,6 +281,12 @@ public void Truncate(int newLength) Count = newLength; } + public ref T GetRef(int index) + { + GuardIndex(index); + return ref _array[index]; + } + public T this[int index] { get diff --git a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs index fb6b5474ac0..9c0a2961637 100644 --- a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs +++ b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs @@ -268,15 +268,15 @@ void UpdateRootHashesSingleThread() void UpdateRootHashesMultiThread() { // We can recalculate the roots in parallel as they are all independent tries - var storages = _storages.ToArray(); + using var storages = _storages.ToPooledList(); ParallelUnbalancedWork.For( 0, - storages.Length, + storages.Count, RuntimeInformation.ParallelOptionsLogicalCores, (storages, toUpdateRoots: _toUpdateRoots), static (i, state) => { - ref var kvp = ref state.storages[i]; + ref var kvp = ref state.storages.GetRef(i); if (!state.toUpdateRoots.Contains(kvp.Key)) { // Wasn't updated don't recalculate @@ -288,9 +288,8 @@ void UpdateRootHashesMultiThread() }); // Update the storage roots in the main thread non in parallel - for (int i = 0; i < storages.Length; i++) + foreach (ref var kvp in storages.AsSpan()) { - ref var kvp = ref storages[i]; if (!_toUpdateRoots.Contains(kvp.Key)) { continue; From 84aec6b916df5a7dfbc6c6495b8a80e6a97180cd Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Dec 2024 11:51:57 +0000 Subject: [PATCH 10/12] Introduce struct for state --- .../Processing/BlockCachePreWarmer.cs | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index e803fa8c98b..7ba634d102e 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -132,19 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp try { - ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: this, block, stateRoot, spec), static (i, state) => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) => { - IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get(); - SystemTransaction systemTransaction = state.preWarmer._systemTransactionPool.Get(); + IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get(); + SystemTransaction systemTransaction = state.PreWarmer._systemTransactionPool.Get(); Transaction? tx = null; try { // If the transaction has already been processed or being processed, exit early - if (state.block.TransactionProcessed > i) return state; + if (state.Block.TransactionProcessed > i) return state; - tx = state.block.Transactions[i]; + tx = state.Block.Transactions[i]; tx.CopyTo(systemTransaction); - using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot); + using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot); Address senderAddress = tx.SenderAddress!; if (!scope.WorldState.AccountExists(senderAddress)) @@ -155,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp UInt256 nonceDelta = UInt256.Zero; for (int prev = 0; prev < i; prev++) { - if (senderAddress == state.block.Transactions[prev].SenderAddress) + if (senderAddress == state.Block.Transactions[prev].SenderAddress) { nonceDelta++; } @@ -166,12 +166,12 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp scope.WorldState.IncrementNonce(senderAddress, nonceDelta); } - if (state.spec.UseTxAccessLists) + if (state.Spec.UseTxAccessLists) { scope.WorldState.WarmUp(tx.AccessList); // eip-2930 } - TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.block.Header.Clone()), NullTxTracer.Instance); - if (state.preWarmer._logger.IsTrace) state.preWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); + TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.Block.Header.Clone()), NullTxTracer.Instance); + if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); } catch (Exception ex) when (ex is EvmException or OverflowException) { @@ -179,12 +179,12 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp } catch (Exception ex) { - if (state.preWarmer._logger.IsDebug) state.preWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex); + if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex); } finally { - state.preWarmer._systemTransactionPool.Return(systemTransaction); - state.preWarmer._envPool.Return(env); + state.PreWarmer._systemTransactionPool.Return(systemTransaction); + state.PreWarmer._envPool.Return(env); } return state; @@ -307,4 +307,13 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF public IReadOnlyTxProcessorSource Create() => envFactory.Create(); public bool Return(IReadOnlyTxProcessorSource obj) => true; } + + private struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec) + { + public BlockCachePreWarmer PreWarmer = preWarmer; + public Block Block = block; + public Hash256 StateRoot = stateRoot; + public IReleaseSpec Spec = spec; + } } + From f18e10bfe3d6d247155ec97fc826c8a2433e89fc Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Dec 2024 11:56:14 +0000 Subject: [PATCH 11/12] Capture threads --- .../Nethermind.Core/Threading/ParallelUnbalancedWork.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs index 3844007d69e..c54d39cc255 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -180,6 +180,7 @@ private class BaseData(int threads, int fromInclusive, int toExclusive) /// public SharedCounter Index { get; } = new SharedCounter(fromInclusive); public SemaphoreSlim Event { get; } = new(initialCount: 0); + private int _activeThreads = threads; /// /// Gets the exclusive upper bound of the range. @@ -189,7 +190,7 @@ private class BaseData(int threads, int fromInclusive, int toExclusive) /// /// Gets the number of active threads. /// - public int ActiveThreads => Volatile.Read(ref threads); + public int ActiveThreads => Volatile.Read(ref _activeThreads); /// /// Marks a thread as completed. @@ -197,7 +198,7 @@ private class BaseData(int threads, int fromInclusive, int toExclusive) /// The number of remaining active threads. public int MarkThreadCompleted() { - var remaining = Interlocked.Decrement(ref threads); + var remaining = Interlocked.Decrement(ref _activeThreads); if (remaining == 0) { From b84d6e72d04447d235c37d082afdf3ec7b57fe62 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Dec 2024 13:28:37 +0000 Subject: [PATCH 12/12] Add benchmark --- .../Nethermind.Benchmark.Runner/Program.cs | 3 +- .../Core/ParallelBenchmark.cs | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs diff --git a/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs b/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs index d4afb51ab17..1b6a025ef45 100644 --- a/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs +++ b/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs @@ -26,6 +26,7 @@ public DashboardConfig(params Job[] jobs) AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Descriptor); AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Statistics); AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Params); + AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Metrics); AddLogger(BenchmarkDotNet.Loggers.ConsoleLogger.Default); AddExporter(BenchmarkDotNet.Exporters.Json.JsonExporter.FullCompressed); AddDiagnoser(BenchmarkDotNet.Diagnosers.MemoryDiagnoser.Default); @@ -59,7 +60,7 @@ public static void Main(string[] args) { foreach (Assembly assembly in additionalJobAssemblies) { - BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core80)), args); + BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core90)), args); } foreach (Assembly assembly in simpleJobAssemblies) diff --git a/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs b/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs new file mode 100644 index 00000000000..b5a46721e6c --- /dev/null +++ b/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using BenchmarkDotNet.Attributes; +using Nethermind.Core.Threading; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Benchmarks.Core; + +[HideColumns("Job", "RatioSD")] +public class ParallelBenchmark +{ + private int[] _times; + + [GlobalSetup] + public void Setup() + { + _times = new int[200]; + + for (int i = 0; i < _times.Length; i++) + { + _times[i] = i % 100; + } + } + + [Benchmark(Baseline = true)] + public void ParallelFor() + { + Parallel.For( + 0, + _times.Length, + (i) => Thread.Sleep(_times[i])); + } + + [Benchmark] + public void ParallelForEach() + { + Parallel.ForEach( + _times, + (time) => Thread.Sleep(time)); + } + + [Benchmark] + public void UnbalancedParallel() + { + ParallelUnbalancedWork.For( + 0, + _times.Length, + ParallelUnbalancedWork.DefaultOptions, + _times, + (i, value) => + { + Thread.Sleep(value[i]); + return value; + }, + (value) => { }); + } +}