From cb769a36a967b7847a7a5db882faea682444f34b Mon Sep 17 00:00:00 2001 From: Ratan Rai Sur Date: Wed, 7 Aug 2019 12:24:02 -0400 Subject: [PATCH] [PAN-2819] [PAN-2820] Mark Sweep Pruner * Mark Sweep Pruner * add `unload` method on Node interface, which is a noop everywhere but on `StoredNode` --- .../ethereum/storage/StorageProvider.java | 3 + .../keyvalue/KeyValueStorageProvider.java | 1 + .../ethereum/worldstate/MarkSweepPruner.java | 181 ++++++++++++++++++ .../pantheon/ethereum/worldstate/Pruner.java | 127 ++++++++++++ .../core/InMemoryStorageProvider.java | 6 + .../worldstate/MarkSweepPrunerTest.java | 70 +++++++ .../ethereum/worldstate/PrunerTest.java | 173 +++++++++++++++++ .../eth/sync/DefaultSynchronizer.java | 8 + .../manager/DeterministicEthScheduler.java | 2 + .../eth/manager/EthSchedulerTest.java | 2 + .../eth/manager/MockScheduledExecutor.java | 2 + .../worldstate/WorldStateDownloaderTest.java | 2 +- .../ethereum/trie/AllNodesVisitor.java | 51 +++++ .../ethereum/trie/MerklePatriciaTrie.java | 3 + .../pegasys/pantheon/ethereum/trie/Node.java | 3 + .../trie/SimpleMerklePatriciaTrie.java | 6 + .../trie/StoredMerklePatriciaTrie.java | 6 + .../pantheon/ethereum/trie/StoredNode.java | 5 + .../metrics/PantheonMetricCategory.java | 1 + .../controller/PantheonControllerBuilder.java | 29 +++ .../kvstore/AbstractKeyValueStorageTest.java | 15 ++ testutil/build.gradle | 1 + .../testutil}/MockExecutorService.java | 2 +- 23 files changed, 697 insertions(+), 2 deletions(-) create mode 100644 ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPruner.java create mode 100644 ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/Pruner.java create mode 100644 ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPrunerTest.java create mode 100644 ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/PrunerTest.java create mode 100644 ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/AllNodesVisitor.java rename {ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager => testutil/src/main/java/tech/pegasys/pantheon/testutil}/MockExecutorService.java (98%) diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/StorageProvider.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/StorageProvider.java index 393b29e26b..5c66bdc75b 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/StorageProvider.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/StorageProvider.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.privacy.PrivateTransactionStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStatePreimageStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.services.kvstore.KeyValueStorage; import java.io.Closeable; @@ -32,4 +33,6 @@ public interface StorageProvider extends Closeable { PrivateTransactionStorage createPrivateTransactionStorage(); PrivateStateStorage createPrivateStateStorage(); + + KeyValueStorage createPruningStorage(); } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageProvider.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageProvider.java index 7f65ab9686..da8d0f19e8 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageProvider.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageProvider.java @@ -76,6 +76,7 @@ public PrivateStateStorage createPrivateStateStorage() { return new PrivateStateKeyValueStorage(privateStateStorage); } + @Override public KeyValueStorage createPruningStorage() { return pruningStorage; } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPruner.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPruner.java new file mode 100644 index 0000000000..57cd35f34b --- /dev/null +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPruner.java @@ -0,0 +1,181 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.worldstate; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.rlp.RLP; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; +import tech.pegasys.pantheon.ethereum.trie.StoredMerklePatriciaTrie; +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.PantheonMetricCategory; +import tech.pegasys.pantheon.services.kvstore.KeyValueStorage; +import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction; +import tech.pegasys.pantheon.util.bytes.Bytes32; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MarkSweepPruner { + private static final Logger LOG = LogManager.getLogger(); + private static final BytesValue IN_USE = BytesValue.of(1); + private static final int MARKS_PER_TRANSACTION = 1000; + private final WorldStateStorage worldStateStorage; + private final KeyValueStorage markStorage; + private final Counter markedNodesCounter; + private final Counter markOperationCounter; + private final Counter sweepOperationCounter; + private final Counter sweptNodesCounter; + private volatile long nodeAddedListenerId; + private final ReentrantLock markLock = new ReentrantLock(true); + private final Set pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public MarkSweepPruner( + final WorldStateStorage worldStateStorage, + final KeyValueStorage markStorage, + final MetricsSystem metricsSystem) { + this.worldStateStorage = worldStateStorage; + this.markStorage = markStorage; + + markedNodesCounter = + metricsSystem.createCounter( + PantheonMetricCategory.PRUNER, + "marked_nodes_total", + "Total number of nodes marked as in use"); + markOperationCounter = + metricsSystem.createCounter( + PantheonMetricCategory.PRUNER, + "mark_operations_total", + "Total number of mark operations performed"); + + sweptNodesCounter = + metricsSystem.createCounter( + PantheonMetricCategory.PRUNER, + "swept_nodes_total", + "Total number of unused nodes removed"); + sweepOperationCounter = + metricsSystem.createCounter( + PantheonMetricCategory.PRUNER, + "sweep_operations_total", + "Total number of sweep operations performed"); + } + + public void prepare() { + worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case. + nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes); + } + + public void cleanup() { + worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); + } + + public void mark(final Hash rootHash) { + markOperationCounter.inc(); + markStorage.clear(); + createStateTrie(rootHash) + .visitAll( + node -> { + if (Thread.interrupted()) { + // Since we don't expect to abort marking ourselves, + // our abort process consists only of handling interrupts + throw new RuntimeException("Interrupted while marking"); + } + markNode(node.getHash()); + node.getValue().ifPresent(this::processAccountState); + }); + LOG.info("Completed marking used nodes for pruning"); + } + + public void sweep() { + flushPendingMarks(); + sweepOperationCounter.inc(); + LOG.info("Sweeping unused nodes"); + final long prunedNodeCount = worldStateStorage.prune(markStorage::containsKey); + sweptNodesCounter.inc(prunedNodeCount); + worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); + markStorage.clear(); + LOG.info("Completed sweeping unused nodes"); + } + + private MerklePatriciaTrie createStateTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStateTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + private MerklePatriciaTrie createStorageTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStorageTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + private void processAccountState(final BytesValue value) { + final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value)); + markNode(accountValue.getCodeHash()); + + createStorageTrie(accountValue.getStorageRoot()) + .visitAll(storageNode -> markNode(storageNode.getHash())); + } + + private void markNode(final Bytes32 hash) { + markedNodesCounter.inc(); + markLock.lock(); + try { + pendingMarks.add(hash); + maybeFlushPendingMarks(); + } finally { + markLock.unlock(); + } + } + + private void maybeFlushPendingMarks() { + if (pendingMarks.size() > MARKS_PER_TRANSACTION) { + flushPendingMarks(); + } + } + + void flushPendingMarks() { + markLock.lock(); + try { + final Transaction transaction = markStorage.startTransaction(); + pendingMarks.forEach(node -> transaction.put(node, IN_USE)); + transaction.commit(); + pendingMarks.clear(); + } finally { + markLock.unlock(); + } + } + + private void markNewNodes(final Collection nodeHashes) { + markedNodesCounter.inc(nodeHashes.size()); + markLock.lock(); + try { + pendingMarks.addAll(nodeHashes); + maybeFlushPendingMarks(); + } finally { + markLock.unlock(); + } + } +} diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/Pruner.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/Pruner.java new file mode 100644 index 0000000000..285527f08f --- /dev/null +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/Pruner.java @@ -0,0 +1,127 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.worldstate; + +import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class Pruner { + private static final Logger LOG = LogManager.getLogger(); + + private final MarkSweepPruner pruningStrategy; + private final Blockchain blockchain; + private final ExecutorService executorService; + private final long retentionPeriodInBlocks; + private final AtomicReference state = new AtomicReference<>(State.IDLE); + private volatile long markBlockNumber = 0; + private volatile BlockHeader markedBlockHeader; + private long transientForkThreshold; + + public Pruner( + final MarkSweepPruner pruningStrategy, + final Blockchain blockchain, + final ExecutorService executorService, + final long transientForkThreshold, + final long retentionPeriodInBlocks) { + this.pruningStrategy = pruningStrategy; + this.executorService = executorService; + this.blockchain = blockchain; + if (transientForkThreshold < 0 || retentionPeriodInBlocks < 0) { + throw new IllegalArgumentException( + String.format( + "TransientForkThreshold and RetentionPeriodInBlocks must be non-negative. transientForkThreshold=%d, retentionPeriodInBlocks=%d", + transientForkThreshold, retentionPeriodInBlocks)); + } + this.retentionPeriodInBlocks = retentionPeriodInBlocks; + this.transientForkThreshold = transientForkThreshold; + } + + public void start() { + blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); + } + + public void stop() throws InterruptedException { + pruningStrategy.cleanup(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + } + + private void handleNewBlock(final BlockAddedEvent event) { + if (!event.isNewCanonicalHead()) { + return; + } + + final long blockNumber = event.getBlock().getHeader().getNumber(); + if (state.compareAndSet(State.IDLE, State.TRANSIENT_FORK_OUTLIVING)) { + pruningStrategy.prepare(); + markBlockNumber = blockNumber; + } else if (blockNumber >= markBlockNumber + transientForkThreshold + && state.compareAndSet(State.TRANSIENT_FORK_OUTLIVING, State.MARKING)) { + markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get(); + mark(markedBlockHeader); + } else if (blockNumber >= markBlockNumber + retentionPeriodInBlocks + && blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash()) + && state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) { + sweep(); + } + } + + private void mark(final BlockHeader header) { + markBlockNumber = header.getNumber(); + final Hash stateRoot = header.getStateRoot(); + LOG.info( + "Begin marking used nodes for pruning. Block number: {} State root: {}", + markBlockNumber, + stateRoot); + execute( + () -> { + pruningStrategy.mark(stateRoot); + state.compareAndSet(State.MARKING, State.MARKING_COMPLETE); + }); + } + + private void sweep() { + LOG.info( + "Begin sweeping unused nodes for pruning. Retention period: {}", retentionPeriodInBlocks); + execute( + () -> { + pruningStrategy.sweep(); + state.compareAndSet(State.SWEEPING, State.IDLE); + }); + } + + private void execute(final Runnable action) { + try { + executorService.execute(action); + } catch (final Throwable t) { + LOG.error("Pruning failed", t); + state.set(State.IDLE); + } + } + + private enum State { + IDLE, + TRANSIENT_FORK_OUTLIVING, + MARKING, + MARKING_COMPLETE, + SWEEPING; + } +} diff --git a/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/InMemoryStorageProvider.java b/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/InMemoryStorageProvider.java index 483ed9057e..5f6b763b1f 100644 --- a/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/InMemoryStorageProvider.java +++ b/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/InMemoryStorageProvider.java @@ -32,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.services.kvstore.KeyValueStorage; public class InMemoryStorageProvider implements StorageProvider { @@ -86,6 +87,11 @@ public PrivateStateStorage createPrivateStateStorage() { return new PrivateStateKeyValueStorage(new InMemoryKeyValueStorage()); } + @Override + public KeyValueStorage createPruningStorage() { + return new InMemoryKeyValueStorage(); + } + @Override public void close() {} } diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPrunerTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPrunerTest.java new file mode 100644 index 0000000000..e509dbf631 --- /dev/null +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/MarkSweepPrunerTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.worldstate; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.MutableWorldState; +import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStateKeyValueStorage; +import tech.pegasys.pantheon.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; + +public class MarkSweepPrunerTest { + + private final BlockDataGenerator gen = new BlockDataGenerator(); + private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem(); + + @Test + public void shouldMarkAllNodesInCurrentWorldState() { + + // Setup "remote" state + final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage(); + final InMemoryKeyValueStorage stateStorage = new InMemoryKeyValueStorage(); + final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage); + final WorldStateArchive worldStateArchive = + new WorldStateArchive( + worldStateStorage, + new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage())); + final MutableWorldState worldState = worldStateArchive.getMutable(); + + // Generate accounts and save corresponding state root + gen.createRandomContractAccountsWithNonEmptyStorage(worldState, 20); + final Hash stateRoot = worldState.rootHash(); + + final MarkSweepPruner pruner = + new MarkSweepPruner(worldStateStorage, markStorage, metricsSystem); + pruner.mark(stateRoot); + pruner.flushPendingMarks(); + + final Set keysToKeep = new HashSet<>(stateStorage.keySet()); + assertThat(markStorage.keySet()).containsExactlyInAnyOrderElementsOf(keysToKeep); + + // Generate some more nodes from a world state we didn't mark + gen.createRandomContractAccountsWithNonEmptyStorage(worldStateArchive.getMutable(), 10); + assertThat(stateStorage.keySet()).hasSizeGreaterThan(keysToKeep.size()); + + // All those new nodes should be removed when we sweep + pruner.sweep(); + assertThat(stateStorage.keySet()).containsExactlyInAnyOrderElementsOf(keysToKeep); + assertThat(markStorage.keySet()).isEmpty(); + } +} diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/PrunerTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/PrunerTest.java new file mode 100644 index 0000000000..bdd9d87dc6 --- /dev/null +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/PrunerTest.java @@ -0,0 +1,173 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.worldstate; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.chain.BlockchainStorage; +import tech.pegasys.pantheon.ethereum.chain.DefaultMutableBlockchain; +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHeaderFunctions; +import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.testutil.MockExecutorService; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PrunerTest { + + private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem(); + + private final BlockDataGenerator gen = new BlockDataGenerator(); + + @Mock private MarkSweepPruner markSweepPruner; + private final ExecutorService mockExecutorService = new MockExecutorService(); + + private final Block genesisBlock = gen.genesisBlock(); + + @Test + public void shouldMarkCorrectBlockAndSweep() throws InterruptedException { + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final DefaultMutableBlockchain blockchain = + new DefaultMutableBlockchain(genesisBlock, blockchainStorage, metricsSystem); + + final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 0); + pruner.start(); + + final Block block1 = appendBlockWithParent(blockchain, genesisBlock); + appendBlockWithParent(blockchain, block1); + appendBlockWithParent(blockchain, blockchain.getChainHeadBlock()); + + verify(markSweepPruner).mark(block1.getHeader().getStateRoot()); + verify(markSweepPruner).sweep(); + pruner.stop(); + } + + @Test + public void shouldOnlySweepAfterTransientForkPeriodAndRetentionPeriodEnds() + throws InterruptedException { + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final DefaultMutableBlockchain blockchain = + new DefaultMutableBlockchain(genesisBlock, blockchainStorage, metricsSystem); + + final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 1, 2); + pruner.start(); + + final Hash markBlockStateRootHash = + appendBlockWithParent(blockchain, genesisBlock).getHeader().getStateRoot(); + verify(markSweepPruner, never()).mark(markBlockStateRootHash); + verify(markSweepPruner, never()).sweep(); + + appendBlockWithParent(blockchain, blockchain.getChainHeadBlock()); + verify(markSweepPruner).mark(markBlockStateRootHash); + verify(markSweepPruner, never()).sweep(); + + appendBlockWithParent(blockchain, blockchain.getChainHeadBlock()); + verify(markSweepPruner).sweep(); + pruner.stop(); + } + + @Test + public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain() + throws InterruptedException { + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final DefaultMutableBlockchain blockchain = + new DefaultMutableBlockchain(genesisBlock, blockchainStorage, metricsSystem); + + // start pruner so it can start handling block added events + final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 1); + pruner.start(); + + /* + Set up pre-marking state: + O <---- marking of the this block's parent will begin when this block is added + | + | O <- this is a fork as of now (non-canonical) + O | <- this is the initially canonical block that will be marked + \/ + O <--- the common ancestor when the reorg happens + */ + final Block initiallyCanonicalBlock = appendBlockWithParent(blockchain, genesisBlock); + appendBlockWithParent(blockchain, initiallyCanonicalBlock); + final Block forkBlock = appendBlockWithParent(blockchain, genesisBlock); + /* + Cause reorg: + Set up pre-marking state: + O + | O <---- this block causes a reorg; this branch becomes canonical + | O <---- which means that state here is referring to nodes from the common ancestor block, + O | <- because this was block at which marking began + \/ + O + */ + appendBlockWithParent(blockchain, forkBlock); + verify(markSweepPruner).mark(initiallyCanonicalBlock.getHeader().getStateRoot()); + verify(markSweepPruner, never()).sweep(); + pruner.stop(); + } + + @Test + public void shouldRejectInvalidArguments() { + final Blockchain mockchain = mock(Blockchain.class); + assertThatThrownBy(() -> new Pruner(markSweepPruner, mockchain, mockExecutorService, -1, -2)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void shouldCleanUpPruningStrategyOnShutdown() throws InterruptedException { + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final DefaultMutableBlockchain blockchain = + new DefaultMutableBlockchain(genesisBlock, blockchainStorage, metricsSystem); + + final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 0); + pruner.start(); + pruner.stop(); + verify(markSweepPruner).cleanup(); + } + + private Block appendBlockWithParent( + final DefaultMutableBlockchain blockchain, final Block parent) { + BlockOptions options = + new BlockOptions() + .setBlockNumber(parent.getHeader().getNumber() + 1) + .setParentHash(parent.getHash()); + final Block newBlock = gen.block(options); + final List receipts = gen.receipts(newBlock); + blockchain.appendBlock(newBlock, receipts); + return newBlock; + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index a6f8d06888..e53105d931 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -26,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.ethereum.worldstate.Pruner; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.PantheonMetricCategory; @@ -44,6 +45,8 @@ public class DefaultSynchronizer implements Synchronizer { private static final Logger LOG = LogManager.getLogger(); + private static final boolean PRUNING_ENABLED = false; + private final Pruner pruner; private final SyncState syncState; private final AtomicBoolean running = new AtomicBoolean(false); private final Subscribers syncStatusListeners = Subscribers.create(); @@ -57,11 +60,13 @@ public DefaultSynchronizer( final ProtocolContext protocolContext, final WorldStateStorage worldStateStorage, final BlockBroadcaster blockBroadcaster, + final Pruner pruner, final EthContext ethContext, final SyncState syncState, final Path dataDirectory, final Clock clock, final MetricsSystem metricsSystem) { + this.pruner = pruner; this.syncState = syncState; ChainHeadTracker.trackChainHeadForPeers( @@ -164,6 +169,9 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er private void startFullSync() { fullSyncDownloader.start(); + if (PRUNING_ENABLED) { + pruner.start(); + } } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index 6073bb7ee1..eac8b55102 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.testutil.MockExecutorService; + import java.time.Duration; import java.util.Arrays; import java.util.List; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java index dfcbc08131..6eb273a8aa 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import tech.pegasys.pantheon.testutil.MockExecutorService; + import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockScheduledExecutor.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockScheduledExecutor.java index 48172114c8..650476e050 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockScheduledExecutor.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockScheduledExecutor.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.testutil.MockExecutorService; + import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 3e6b1630a5..46833c25d9 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -40,7 +40,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; -import tech.pegasys.pantheon.ethereum.eth.manager.MockExecutorService; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; @@ -64,6 +63,7 @@ import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue; +import tech.pegasys.pantheon.testutil.MockExecutorService; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/AllNodesVisitor.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/AllNodesVisitor.java new file mode 100644 index 0000000000..3775e5f705 --- /dev/null +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/AllNodesVisitor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.trie; + +import java.util.function.Consumer; + +public class AllNodesVisitor implements NodeVisitor { + + private final Consumer> handler; + + AllNodesVisitor(final Consumer> handler) { + this.handler = handler; + } + + @Override + public void visit(final ExtensionNode extensionNode) { + handler.accept(extensionNode); + acceptAndUnload(extensionNode.getChild()); + } + + @Override + public void visit(final BranchNode branchNode) { + handler.accept(branchNode); + for (byte i = 0; i < BranchNode.RADIX; i++) { + acceptAndUnload(branchNode.child(i)); + } + } + + @Override + public void visit(final LeafNode leafNode) { + handler.accept(leafNode); + } + + @Override + public void visit(final NullNode nullNode) {} + + private void acceptAndUnload(final Node storedNode) { + storedNode.accept(this); + storedNode.unload(); + } +} diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java index 1b869ada5e..1cac37b452 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; /** An Merkle Patricial Trie. */ public interface MerklePatriciaTrie { @@ -74,4 +75,6 @@ public interface MerklePatriciaTrie { * @return the requested storage entries as a map of key hash to value. */ Map entriesFrom(Bytes32 startKeyHash, int limit); + + void visitAll(Consumer> visitor); } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java index e24eaaa4b5..47741f298a 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java @@ -56,4 +56,7 @@ default boolean isReferencedByHash() { boolean isDirty(); String print(); + + /** Unloads the node if it is, for example, a StoredNode. */ + default void unload() {} } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/SimpleMerklePatriciaTrie.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/SimpleMerklePatriciaTrie.java index dde962259b..25fda9395c 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/SimpleMerklePatriciaTrie.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/SimpleMerklePatriciaTrie.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -82,4 +83,9 @@ public void commit(final NodeUpdater nodeUpdater) { public Map entriesFrom(final Bytes32 startKeyHash, final int limit) { return StorageEntriesCollector.collectEntries(root, startKeyHash, limit); } + + @Override + public void visitAll(final Consumer> visitor) { + root.accept(new AllNodesVisitor<>(visitor)); + } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java index 63f6f3648c..78438cff60 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -109,6 +110,11 @@ public Map entriesFrom(final Bytes32 startKeyHash, final int limit) return StorageEntriesCollector.collectEntries(root, startKeyHash, limit); } + @Override + public void visitAll(final Consumer> visitor) { + root.accept(new AllNodesVisitor<>(visitor)); + } + @Override public Bytes32 getRootHash() { return root.getHash(); diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java index 6b409aba23..09c512418c 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java @@ -108,6 +108,11 @@ private Node load() { return loaded; } + @Override + public void unload() { + loaded = null; + } + @Override public String print() { if (loaded == null) { diff --git a/metrics/core/src/main/java/tech/pegasys/pantheon/metrics/PantheonMetricCategory.java b/metrics/core/src/main/java/tech/pegasys/pantheon/metrics/PantheonMetricCategory.java index 6b54cd1b7b..18d439ef18 100644 --- a/metrics/core/src/main/java/tech/pegasys/pantheon/metrics/PantheonMetricCategory.java +++ b/metrics/core/src/main/java/tech/pegasys/pantheon/metrics/PantheonMetricCategory.java @@ -27,6 +27,7 @@ public enum PantheonMetricCategory implements MetricCategory { PERMISSIONING("permissioning"), KVSTORE_ROCKSDB("rocksdb"), KVSTORE_ROCKSDB_STATS("rocksdb", false), + PRUNER("pruner"), RPC("rpc"), SYNCHRONIZER("synchronizer"), TRANSACTION_POOL("transaction_pool"); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java index 6359bcef87..7bbb1ea843 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java @@ -44,6 +44,8 @@ import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.storage.keyvalue.RocksDbStorageProvider; +import tech.pegasys.pantheon.ethereum.worldstate.MarkSweepPruner; +import tech.pegasys.pantheon.ethereum.worldstate.Pruner; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; @@ -56,7 +58,9 @@ import java.util.Collections; import java.util.List; import java.util.OptionalLong; +import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -199,6 +203,30 @@ public PantheonController build() throws IOException { final MutableBlockchain blockchain = protocolContext.getBlockchain(); + final Pruner pruner = + new Pruner( + new MarkSweepPruner( + protocolContext.getWorldStateArchive().getWorldStateStorage(), + storageProvider.createPruningStorage(), + metricsSystem), + protocolContext.getBlockchain(), + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .setNameFormat("StatePruning-%d") + .build()), + 10, + 1000); + addShutdownAction( + () -> { + try { + pruner.stop(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + }); + final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST); ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled); final SyncState syncState = @@ -210,6 +238,7 @@ public PantheonController build() throws IOException { protocolContext, protocolContext.getWorldStateArchive().getWorldStateStorage(), ethProtocolManager.getBlockBroadcaster(), + pruner, ethProtocolManager.ethContext(), syncState, dataDirectory, diff --git a/services/kvstore/src/test/java/tech/pegasys/pantheon/services/kvstore/AbstractKeyValueStorageTest.java b/services/kvstore/src/test/java/tech/pegasys/pantheon/services/kvstore/AbstractKeyValueStorageTest.java index 4ad98f657b..e5a94ff93e 100644 --- a/services/kvstore/src/test/java/tech/pegasys/pantheon/services/kvstore/AbstractKeyValueStorageTest.java +++ b/services/kvstore/src/test/java/tech/pegasys/pantheon/services/kvstore/AbstractKeyValueStorageTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction; @@ -62,6 +63,20 @@ public void put() throws Exception { Optional.of(BytesValue.fromHexString("0DEF")), store.get(BytesValue.fromHexString("0F"))); } + @Test + public void containsKey() throws Exception { + final KeyValueStorage store = createStore(); + final BytesValue key = BytesValue.fromHexString("ABCD"); + + assertFalse(store.containsKey(key)); + + final Transaction transaction = store.startTransaction(); + transaction.put(key, BytesValue.fromHexString("DEFF")); + transaction.commit(); + + assertTrue(store.containsKey(key)); + } + @Test public void removeExisting() throws Exception { final KeyValueStorage store = createStore(); diff --git a/testutil/build.gradle b/testutil/build.gradle index 7fcad75c79..b72215da56 100644 --- a/testutil/build.gradle +++ b/testutil/build.gradle @@ -30,4 +30,5 @@ dependencies { implementation 'com.google.guava:guava' implementation 'com.squareup.okhttp3:okhttp' implementation 'net.consensys:orion' + implementation 'org.mockito:mockito-core' } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java b/testutil/src/main/java/tech/pegasys/pantheon/testutil/MockExecutorService.java similarity index 98% rename from ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java rename to testutil/src/main/java/tech/pegasys/pantheon/testutil/MockExecutorService.java index f8f62b481c..f6d0758557 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java +++ b/testutil/src/main/java/tech/pegasys/pantheon/testutil/MockExecutorService.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.ethereum.eth.manager; +package tech.pegasys.pantheon.testutil; import static org.mockito.Mockito.spy;