Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelUnbalancedWork for efficient unbalanced parallel loops #7787

Merged
merged 14 commits into from
Dec 2, 2024
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Benchmark.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public DashboardConfig(params Job[] jobs)
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Descriptor);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Statistics);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Params);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Metrics);
AddLogger(BenchmarkDotNet.Loggers.ConsoleLogger.Default);
AddExporter(BenchmarkDotNet.Exporters.Json.JsonExporter.FullCompressed);
AddDiagnoser(BenchmarkDotNet.Diagnosers.MemoryDiagnoser.Default);
Expand Down Expand Up @@ -59,7 +60,7 @@ public static void Main(string[] args)
{
foreach (Assembly assembly in additionalJobAssemblies)
{
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core80)), args);
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core90)), args);
}

foreach (Assembly assembly in simpleJobAssemblies)
Expand Down
59 changes: 59 additions & 0 deletions src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using BenchmarkDotNet.Attributes;
using Nethermind.Core.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Benchmarks.Core;

[HideColumns("Job", "RatioSD")]
public class ParallelBenchmark
{
private int[] _times;

[GlobalSetup]
public void Setup()
{
_times = new int[200];

for (int i = 0; i < _times.Length; i++)
{
_times[i] = i % 100;
}
}

[Benchmark(Baseline = true)]
public void ParallelFor()
{
Parallel.For(
0,
_times.Length,
(i) => Thread.Sleep(_times[i]));
}

[Benchmark]
public void ParallelForEach()
{
Parallel.ForEach(
_times,
(time) => Thread.Sleep(time));
}

[Benchmark]
public void UnbalancedParallel()
{
ParallelUnbalancedWork.For<int[]>(
0,
_times.Length,
ParallelUnbalancedWork.DefaultOptions,
_times,
(i, value) =>
{
Thread.Sleep(value[i]);
return value;
},
(value) => { });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,21 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe
{
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot),
static (i, state) =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super minor: maybe better to pass the pool directly rather than whole prewarmer as it is not needed?
And maybe build custom struct for it as those are used in 2 places?

try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot);
scope.WorldState.WarmUp(state.block.Withdrawals[i].Address);
}
finally
{
_envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
}
Expand All @@ -135,24 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp

try
{
int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
ParallelUnbalancedWork.For<BlockState>(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get();
SystemTransaction systemTransaction = state.PreWarmer._systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
if (state.Block.TransactionProcessed > i) return state;

tx = block.Transactions[i];
tx = state.Block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);

Address senderAddress = tx.SenderAddress!;
if (!scope.WorldState.AccountExists(senderAddress))
Expand All @@ -163,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
UInt256 nonceDelta = UInt256.Zero;
for (int prev = 0; prev < i; prev++)
{
if (senderAddress == block.Transactions[prev].SenderAddress)
if (senderAddress == state.Block.Transactions[prev].SenderAddress)
{
nonceDelta++;
}
Expand All @@ -174,26 +166,28 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
scope.WorldState.IncrementNonce(senderAddress, nonceDelta);
}

if (spec.UseTxAccessLists)
if (state.Spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.Block.Header.Clone()), NullTxTracer.Instance);
if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex) when (ex is EvmException or OverflowException)
{
// Ignore, regular tx processing exceptions
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
state.PreWarmer._systemTransactionPool.Return(systemTransaction);
state.PreWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -273,21 +267,16 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot),
static (i, state) =>
{
int i = 0;
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Transaction tx = state.block.Transactions[i];
Address? sender = tx.SenderAddress;

var env = PreWarmer._envPool.Get();
var env = state.preWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
Expand All @@ -300,8 +289,10 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
finally
{
PreWarmer._envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand All @@ -316,4 +307,13 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF
public IReadOnlyTxProcessorSource Create() => envFactory.Create();
public bool Return(IReadOnlyTxProcessorSource obj) => true;
}

private struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec)
{
public BlockCachePreWarmer PreWarmer = preWarmer;
public Block Block = block;
public Hash256 StateRoot = stateRoot;
public IReleaseSpec Spec = spec;
}
}

11 changes: 8 additions & 3 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Evm;
using Nethermind.Evm.Tracing;
Expand Down Expand Up @@ -333,11 +334,15 @@ protected virtual TxReceipt[] ProcessBlock(
[MethodImpl(MethodImplOptions.NoInlining)]
private static void CalculateBlooms(TxReceipt[] receipts)
{
int index = 0;
Parallel.For(0, receipts.Length, _ =>
ParallelUnbalancedWork.For(
0,
receipts.Length,
ParallelUnbalancedWork.DefaultOptions,
receipts,
static (i, receipts) =>
{
int i = Interlocked.Increment(ref index) - 1;
receipts[i].CalculateBloom();
return receipts;
});
}

Expand Down
65 changes: 40 additions & 25 deletions src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;
Expand Down Expand Up @@ -48,13 +49,20 @@ public void RecoverData(Block block)
// so we assume the rest of txs in the block are already recovered
return;

Parallel.For(0, txs.Length, i =>
ParallelUnbalancedWork.For(
0,
txs.Length,
ParallelUnbalancedWork.DefaultOptions,
txs,
static (i, txs) =>
{
Transaction tx = txs[i];
if (!tx.IsHashCalculated)
{
tx.CalculateHashInternal();
}

return txs;
});


Expand Down Expand Up @@ -111,14 +119,21 @@ public void RecoverData(Block block)
if (recoverFromEcdsa > 3)
{
// Recover ecdsa in Parallel
Parallel.For(0, txs.Length, i =>
ParallelUnbalancedWork.For(
0,
txs.Length,
ParallelUnbalancedWork.DefaultOptions,
(recover: this, txs, releaseSpec, useSignatureChainId),
static (i, state) =>
{
Transaction tx = txs[i];
if (!ShouldRecoverSignatures(tx)) return;
Transaction tx = state.txs[i];
if (!ShouldRecoverSignatures(tx)) return state;

tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId);
RecoverAuthorities(tx);
if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");
tx.SenderAddress ??= state.recover._ecdsa.RecoverAddress(tx, state.useSignatureChainId);
state.recover.RecoverAuthorities(tx, state.releaseSpec);
if (state.recover._logger.IsTrace) state.recover._logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");

return state;
});
}
else
Expand All @@ -128,32 +143,32 @@ public void RecoverData(Block block)
if (!ShouldRecoverSignatures(tx)) continue;

tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId);
RecoverAuthorities(tx);
RecoverAuthorities(tx, releaseSpec);
if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");
}
}
}

void RecoverAuthorities(Transaction tx)
private void RecoverAuthorities(Transaction tx, IReleaseSpec releaseSpec)
{
if (!releaseSpec.IsAuthorizationListEnabled
|| !tx.HasAuthorizationList)
{
if (!releaseSpec.IsAuthorizationListEnabled
|| !tx.HasAuthorizationList)
{
return;
}
return;
}

if (tx.AuthorizationList.Length > 3)
if (tx.AuthorizationList.Length > 3)
{
Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) =>
{
Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) =>
{
tuple.Authority = _ecdsa.RecoverAddress(tuple);
});
}
else
tuple.Authority = _ecdsa.RecoverAddress(tuple);
});
}
else
{
foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan())
{
foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan())
{
tuple.Authority ??= _ecdsa.RecoverAddress(tuple);
}
tuple.Authority ??= _ecdsa.RecoverAddress(tuple);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ public void Truncate(int newLength)
Count = newLength;
}

public ref T GetRef(int index)
{
GuardIndex(index);
return ref _array[index];
}

public T this[int index]
{
get
Expand Down
Loading