Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache last blocks data (block headers, block bodies, transactions' receipts and total difficulty) #6009

Merged
merged 14 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Next Release
## Next release
- Cache last n blocks by using a new Besu flag --cache-last-blocks=n

### Breaking Changes

Expand Down
8 changes: 7 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,11 @@ static class PermissionsOptionGroup {
"Specifies the maximum number of blocks to retrieve logs from via RPC. Must be >=0. 0 specifies no limit (default: ${DEFAULT-VALUE})")
private final Long rpcMaxLogsRange = 5000L;

@CommandLine.Option(
names = {"--cache-last-blocks"},
description = "Specifies the number of last blocks to cache (default: ${DEFAULT-VALUE})")
private final Integer numberOfblocksToCache = 0;

@Mixin private P2PTLSConfigOptions p2pTLSConfigOptions;

@Mixin private PkiBlockCreationOptions pkiBlockCreationOptions;
Expand Down Expand Up @@ -2290,7 +2295,8 @@ public BesuControllerBuilder getControllerBuilder() {
.lowerBoundPeers(peersLowerBound)
.maxRemotelyInitiatedPeers(maxRemoteInitiatedPeers)
.randomPeerPriority(p2PDiscoveryOptionGroup.randomPeerPriority)
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject());
.chainPruningConfiguration(unstableChainPruningOptions.toDomainObject())
.cacheLastBlocks(numberOfblocksToCache);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides

private PluginTransactionValidatorFactory pluginTransactionValidatorFactory;

private int numberOfBlocksToCache = 0;

/**
* Provide a BesuComponent which can be used to get other dependencies
*
Expand Down Expand Up @@ -505,6 +507,17 @@ public BesuControllerBuilder chainPruningConfiguration(
return this;
}

/**
* Chain pruning configuration besu controller builder.
*
* @param numberOfBlocksToCache the number of blocks to cache
* @return the besu controller builder
*/
public BesuControllerBuilder cacheLastBlocks(final Integer numberOfBlocksToCache) {
this.numberOfBlocksToCache = numberOfBlocksToCache;
return this;
}

/**
* sets the networkConfiguration in the builder
*
Expand Down Expand Up @@ -592,7 +605,8 @@ public BesuController build() {
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory.toString());
dataDirectory.toString(),
numberOfBlocksToCache);

final CachedMerkleTrieLoader cachedMerkleTrieLoader =
besuComponent
Expand Down
12 changes: 12 additions & 0 deletions besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5585,4 +5585,16 @@ public void snapsyncForHealingFeaturesShouldFailWhenHealingIsNotEnabled() {
.contains(
"--Xsnapsync-synchronizer-flat option can only be used when -Xsnapsync-synchronizer-flat-db-healing-enabled is true");
}

@Test
public void cacheLastBlocksOptionShouldWork() {
int numberOfBlocksToCache = 512;
parseCommand("--cache-last-blocks", String.valueOf(numberOfBlocksToCache));
verify(mockControllerBuilder).cacheLastBlocks(intArgumentCaptor.capture());
verify(mockControllerBuilder).build();

assertThat(intArgumentCaptor.getValue()).isEqualTo(numberOfBlocksToCache);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public void initMocks() throws Exception {
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.besuComponent(any(BesuComponent.class)))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.cacheLastBlocks(any())).thenReturn(mockControllerBuilder);

// doReturn used because of generic BesuController
doReturn(mockController).when(mockControllerBuilder).build();
lenient().when(mockController.getProtocolManager()).thenReturn(mockEthProtocolManager);
Expand Down
3 changes: 2 additions & 1 deletion besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ rpc-http-max-batch-size=1
rpc-http-max-request-content-length = 5242880
rpc-max-logs-range=100
json-pretty-print-enabled=false
cache-last-blocks=512

# PRIVACY TLS
privacy-tls-enabled=false
Expand Down Expand Up @@ -226,4 +227,4 @@ Xp2p-tls-crl-file="none.file"
Xp2p-tls-clienthello-sni=false

#contracts
Xevm-jumpdest-cache-weight-kb=32000
Xevm-jumpdest-cache-weight-kb=32000
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.InvalidConfigurationException;
import org.hyperledger.besu.util.Subscribers;
Expand All @@ -49,8 +50,11 @@
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.prometheus.client.guava.cache.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,20 +77,27 @@ public class DefaultBlockchain implements MutableBlockchain {

private Comparator<BlockHeader> blockChoiceRule;

private final int numberOfBlocksToCache;
private final Optional<Cache<Hash, BlockHeader>> blockHeadersCache;
private final Optional<Cache<Hash, BlockBody>> blockBodiesCache;
private final Optional<Cache<Hash, List<TransactionReceipt>>> transactionReceiptsCache;
private final Optional<Cache<Hash, Difficulty>> totalDifficultyCache;

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null);
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null, 0);
}

private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory) {
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
checkNotNull(blockchainStorage);
checkNotNull(metricsSystem);
Expand Down Expand Up @@ -144,6 +155,34 @@ private DefaultBlockchain(

this.reorgLoggingThreshold = reorgLoggingThreshold;
this.blockChoiceRule = heaviestChainBlockChoiceRule;
this.numberOfBlocksToCache = numberOfBlocksToCache;

if (numberOfBlocksToCache != 0) {
blockHeadersCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
blockBodiesCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
transactionReceiptsCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
totalDifficultyCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
CacheMetricsCollector cacheMetrics = new CacheMetricsCollector();
cacheMetrics.addCache("blockHeaders", blockHeadersCache.get());
cacheMetrics.addCache("blockBodies", blockBodiesCache.get());
cacheMetrics.addCache("transactionReceipts", transactionReceiptsCache.get());
cacheMetrics.addCache("totalDifficulty", totalDifficultyCache.get());
if (metricsSystem instanceof PrometheusMetricsSystem prometheusMetricsSystem)
prometheusMetricsSystem.addCollector(BesuMetricCategory.BLOCKCHAIN, () -> cacheMetrics);
} else {
blockHeadersCache = Optional.empty();
blockBodiesCache = Optional.empty();
transactionReceiptsCache = Optional.empty();
totalDifficultyCache = Optional.empty();
}
}

public static MutableBlockchain createMutable(
Expand All @@ -153,7 +192,12 @@ public static MutableBlockchain createMutable(
final long reorgLoggingThreshold) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock), blockchainStorage, metricsSystem, reorgLoggingThreshold);
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
null,
0);
}

public static MutableBlockchain createMutable(
Expand All @@ -168,7 +212,25 @@ public static MutableBlockchain createMutable(
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory);
dataDirectory,
0);
}

public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory,
numberOfBlocksToCache);
}

public static Blockchain create(
Expand Down Expand Up @@ -227,22 +289,37 @@ public Block getChainHeadBlock() {

@Override
public Optional<BlockHeader> getBlockHeader(final long blockNumber) {
return blockchainStorage.getBlockHash(blockNumber).flatMap(blockchainStorage::getBlockHeader);
return blockchainStorage.getBlockHash(blockNumber).flatMap(this::getBlockHeader);
}

@Override
public Optional<BlockHeader> getBlockHeader(final Hash blockHeaderHash) {
return blockchainStorage.getBlockHeader(blockHeaderHash);
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}

@Override
public Optional<BlockBody> getBlockBody(final Hash blockHeaderHash) {
return blockchainStorage.getBlockBody(blockHeaderHash);
return blockBodiesCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockBody(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockBody(blockHeaderHash));
}

@Override
public Optional<List<TransactionReceipt>> getTxReceipts(final Hash blockHeaderHash) {
return blockchainStorage.getTransactionReceipts(blockHeaderHash);
return transactionReceiptsCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash));
}

@Override
Expand All @@ -252,7 +329,12 @@ public Optional<Hash> getBlockHashByNumber(final long number) {

@Override
public Optional<Difficulty> getTotalDifficultyByHash(final Hash blockHeaderHash) {
return blockchainStorage.getTotalDifficulty(blockHeaderHash);
return totalDifficultyCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash));
}

@Override
Expand Down Expand Up @@ -283,14 +365,24 @@ public void setBlockChoiceRule(final Comparator<BlockHeader> blockChoiceRule) {

@Override
public synchronized void appendBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false);
}

@Override
public synchronized void storeBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), true);
}

private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
blockHeadersCache.ifPresent(cache -> cache.put(block.getHash(), block.getHeader()));
blockBodiesCache.ifPresent(cache -> cache.put(block.getHash(), block.getBody()));
transactionReceiptsCache.ifPresent(cache -> cache.put(block.getHash(), receipts));
totalDifficultyCache.ifPresent(
cache -> cache.put(block.getHash(), block.getHeader().getDifficulty()));
}

private boolean blockShouldBeProcessed(
final Block block, final List<TransactionReceipt> receipts) {
checkArgument(
Expand Down Expand Up @@ -768,4 +860,20 @@ int observerCount() {
private void notifyChainReorgBlockAdded(final BlockWithReceipts blockWithReceipts) {
blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockWithReceipts, this));
}

public Optional<Cache<Hash, BlockHeader>> getBlockHeadersCache() {
return blockHeadersCache;
}

public Optional<Cache<Hash, BlockBody>> getBlockBodiesCache() {
return blockBodiesCache;
}

public Optional<Cache<Hash, List<TransactionReceipt>>> getTransactionReceiptsCache() {
return transactionReceiptsCache;
}

public Optional<Cache<Hash, Difficulty>> getTotalDifficultyCache() {
return totalDifficultyCache;
}
}
Loading