Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool state cache #7046

Merged
merged 15 commits into from
May 20, 2024
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ private ChainLevelInfo UpdateOrCreateLevel(long number, Hash256 hash, BlockInfo
/// <returns></returns>
private bool ShouldCache(long number)
{
return number == 0L || Head is null || number >= Head.Number - HeaderStore.CacheSize;
return number == 0L || Head is null || number >= Head.Number - BlockStore.CacheSize;
}

public ChainLevelInfo? FindLevel(long number)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class BlockStore : IBlockStore
{
private readonly IDb _blockDb;
private readonly BlockDecoder _blockDecoder = new();
private const int CacheSize = 128 + 32;
public const int CacheSize = 128 + 32;

private readonly LruCache<ValueHash256, Block>
_blockCache = new(CacheSize, CacheSize, "blocks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ private void ValidateProcessedBlock(Block suggestedBlock, ProcessingOptions opti
if (_logger.IsWarn) _logger.Warn($"Suggested block TD: {suggestedBlock.TotalDifficulty}, Suggested block IsPostMerge {suggestedBlock.IsPostMerge}, Block TD: {block.TotalDifficulty}, Block IsPostMerge {block.IsPostMerge}");
throw new InvalidBlockException(suggestedBlock, error);
}

// Block is valid, copy the account changes as we use the suggested block not the processed one
suggestedBlock.AccountChanges = block.AccountChanges;
}

private bool ShouldComputeStateRoot(BlockHeader header) =>
Expand Down Expand Up @@ -251,6 +254,8 @@ protected virtual TxReceipt[] ProcessBlock(

_stateProvider.Commit(spec, commitStorageRoots: true);

// Get the accounts that have been changed
block.AccountChanges = _stateProvider.GetAccountChanges();
if (ShouldComputeStateRoot(block.Header))
{
_stateProvider.RecalculateStateRoot();
Expand Down
5 changes: 5 additions & 0 deletions src/Nethermind/Nethermind.Core/Block.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Text.Json.Serialization;

using Nethermind.Core.Crypto;
using Nethermind.Int256;

Expand Down Expand Up @@ -109,6 +111,9 @@ public Transaction[] Transactions
public Hash256? WithdrawalsRoot => Header.WithdrawalsRoot; // do not add setter here
public Hash256? ParentBeaconBlockRoot => Header.ParentBeaconBlockRoot; // do not add setter here

[JsonIgnore]
public AddressAsKey[]? AccountChanges { get; internal set; }

public override string ToString() => ToString(Format.Short);

public string ToString(Format format) => format switch
Expand Down
10 changes: 8 additions & 2 deletions src/Nethermind/Nethermind.Db/Metrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System.ComponentModel;
using System.Threading;

using Nethermind.Core.Attributes;

namespace Nethermind.Db
Expand All @@ -14,15 +16,19 @@ public static class Metrics

[CounterMetric]
[Description("Number of State Trie cache hits.")]
public static long StateTreeCache { get; set; }
public static long StateTreeCache => _stateTreeCacheHits;
private static long _stateTreeCacheHits;
public static void IncrementTreeCacheHits() => Interlocked.Increment(ref _stateTreeCacheHits);

[CounterMetric]
[Description("Number of State Trie reads.")]
public static long StateTreeReads { get; set; }

[CounterMetric]
[Description("Number of State Reader reads.")]
public static long StateReaderReads { get; set; }
public static long StateReaderReads => _stateReaderReads;
private static long _stateReaderReads;
public static void IncrementStateReaderReads() => Interlocked.Increment(ref _stateReaderReads);

[CounterMetric]
[Description("Number of Blocks Trie writes.")]
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.State/IWorldState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,5 @@ public interface IWorldState : IJournal<Snapshot>, IReadOnlyStateProvider
void Commit(IReleaseSpec releaseSpec, IWorldStateTracer? tracer, bool isGenesis = false, bool commitStorageRoots = true);

void CommitTree(long blockNumber);
AddressAsKey[] GetAccountChanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
Expand Down
8 changes: 7 additions & 1 deletion src/Nethermind/Nethermind.State/StateProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using Nethermind.Core;
using Nethermind.Core.Caching;
Expand Down Expand Up @@ -674,7 +675,7 @@ private void ReportChanges(IStateTracer stateTracer, Dictionary<AddressAsKey, Ch
}
else
{
Metrics.StateTreeCache++;
Metrics.IncrementTreeCacheHits();
}
return account;
}
Expand Down Expand Up @@ -798,6 +799,11 @@ public Change(ChangeType type, Address address, Account? account)
public Account? Account { get; }
}

public AddressAsKey[]? ChangedAddresses()
{
return _blockCache.Count == 0 ? Array.Empty<AddressAsKey>() : _blockCache.Keys.ToArray();
benaadams marked this conversation as resolved.
Show resolved Hide resolved
}

public void Reset()
{
if (_logger.IsTrace) _logger.Trace("Clearing state provider caches");
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.State/StateReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private bool TryGetState(Hash256 stateRoot, Address address, out AccountStruct a
return false;
}

Metrics.StateReaderReads++;
Metrics.IncrementStateReaderReads();
return _state.TryGetStruct(address, out account, stateRoot);
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/Nethermind/Nethermind.State/WorldState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,10 @@ public void CreateAccountIfNotExists(Address address, in UInt256 balance, in UIn
{
_stateProvider.CreateAccountIfNotExists(address, balance, nonce);
}

AddressAsKey[] IWorldState.GetAccountChanges()
{
return _stateProvider.ChangedAddresses();
}
}
}
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public void get_next_pending_nonce()

// LatestPendingNonce=1, when the current nonce of the account=1 and no pending transactions
_stateProvider.IncrementNonce(TestItem.AddressA);
_txPool.ResetAddress(TestItem.AddressA);
latestNonce = _txPool.GetLatestPendingNonce(TestItem.AddressA);
Assert.That((UInt256)1, Is.EqualTo(latestNonce));

Expand Down Expand Up @@ -564,6 +565,7 @@ public void should_not_count_txs_with_stale_nonces_when_calculating_cumulative_c
if (i < numberOfStaleTxsInBucket)
{
_stateProvider.IncrementNonce(TestItem.AddressA);
_txPool.ResetAddress(TestItem.AddressA);
}
}

Expand Down Expand Up @@ -1041,6 +1043,7 @@ public void should_retrieve_added_persistent_transaction_correctly_even_if_was_e
retrievedTransaction.Should().BeEquivalentTo(transaction);

EnsureSenderBalance(transactionWithHigherFee);
_txPool.ResetAddress(transactionWithHigherFee.SenderAddress);
_txPool.SubmitTx(transactionWithHigherFee, TxHandlingOptions.None).Should().Be(AcceptTxResult.Accepted);
_txPool.TryGetPendingTransaction(transactionWithHigherFee.Hash, out var retrievedTransactionWithHigherFee).Should().BeTrue();
retrievedTransactionWithHigherFee.Should().BeEquivalentTo(transactionWithHigherFee);
Expand Down
91 changes: 90 additions & 1 deletion src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class TxPool : ITxPool, IDisposable

private readonly IChainHeadSpecProvider _specProvider;
private readonly IAccountStateProvider _accounts;
private readonly AccountCache _accountCache;
private readonly IEthereumEcdsa _ecdsa;
private readonly IBlobTxStorage _blobTxStorage;
private readonly IChainHeadInfoProvider _headInfo;
Expand All @@ -65,6 +67,8 @@ public class TxPool : ITxPool, IDisposable
private readonly ITimer? _timer;
private Transaction[]? _transactionSnapshot;
private Transaction[]? _blobTransactionSnapshot;
private long _lastBlockNumber = -1;
private Hash256? _lastBlockHash;

/// <summary>
/// This class stores all known pending transactions that can be used for block production
Expand Down Expand Up @@ -97,7 +101,7 @@ public TxPool(IEthereumEcdsa ecdsa,
_headInfo = chainHeadInfoProvider ?? throw new ArgumentNullException(nameof(chainHeadInfoProvider));
_txPoolConfig = txPoolConfig;
_blobReorgsSupportEnabled = txPoolConfig.BlobsSupport.SupportsReorgs();
_accounts = _headInfo.AccountStateProvider;
_accounts = _accountCache = new AccountCache(_headInfo.AccountStateProvider);
_specProvider = _headInfo.SpecProvider;

MemoryAllowance.MemPoolSize = txPoolConfig.Size;
Expand Down Expand Up @@ -206,6 +210,20 @@ private void ProcessNewHeads()
{
try
{
AddressAsKey[]? accountChanges = args.Block.AccountChanges;
if (!CanUseCache(args.Block, accountChanges))
{
// Not sequential block, reset cache
_accountCache.Reset();
}
else
{
// Sequential block, just remove changed accounts from cache
_accountCache.RemoveAccounts(accountChanges);
}
_lastBlockNumber = args.Block.Number;
_lastBlockHash = args.Block.Hash;

ReAddReorganisedTransactions(args.PreviousBlock);
RemoveProcessedTransactions(args.Block);
UpdateBuckets();
Expand All @@ -219,6 +237,11 @@ private void ProcessNewHeads()
}
}
}

bool CanUseCache(Block block, [NotNullWhen(true)] AddressAsKey[]? accountChanges)
{
return accountChanges is not null && accountChanges.Length != 0 && block.ParentHash == _lastBlockHash && _lastBlockNumber + 1 == block.Number;
}
}, TaskCreationOptions.LongRunning).ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down Expand Up @@ -737,6 +760,72 @@ private void TimerOnElapsed(object? sender, EventArgs e)
_timer!.Enabled = true;
}

internal void ResetAddress(Address address)
{
_accountCache.RemoveAccounts([address]);
}

private sealed class AccountCache : IAccountStateProvider
{
private readonly IAccountStateProvider _provider;
private readonly LruCache<AddressAsKey, AccountStruct>[] _caches;
benaadams marked this conversation as resolved.
Show resolved Hide resolved

public AccountCache(IAccountStateProvider provider)
{
_provider = provider;
_caches = new LruCache<AddressAsKey, AccountStruct>[16];
for (int i = 0; i < _caches.Length; i++)
{
// Cache per nibble to reduce contention as TxPool is very parallel
_caches[i] = new LruCache<AddressAsKey, AccountStruct>(1_024, "");
}
}

public bool TryGetAccount(Address address, out AccountStruct account)
{
var cache = _caches[GetCacheIndex(address)];
if (!cache.TryGet(new AddressAsKey(address), out account))
{
if (!_provider.TryGetAccount(address, out account))
{
cache.Set(address, AccountStruct.TotallyEmpty);
return false;
}
cache.Set(address, account);
}
else
{
Db.Metrics.IncrementTreeCacheHits();
}

return true;
}

public void RemoveAccounts(AddressAsKey[] address)
{
Parallel.ForEach(address.GroupBy(a => GetCacheIndex(a.Value)),
(n) =>
{
var cache = _caches[n.Key];
foreach (Address a in n)
{
cache.Delete(a);
}
}
);
}

private static int GetCacheIndex(Address address) => address.Bytes[^1] & 0xf;

public void Reset()
{
for (int i = 0; i < _caches.Length; i++)
{
_caches[i].Clear();
}
}
}

private static void WriteTxPoolReport(in ILogger logger)
{
if (!logger.IsInfo)
Expand Down