Skip to content

Commit

Permalink
Snap sync server StorageRange message limit to apply limit hash as po…
Browse files Browse the repository at this point in the history
…st check (#7399)

Signed-off-by: Jason Frame <jason.frame@consensys.net>
  • Loading branch information
jframe authored Aug 13, 2024
1 parent a55c331 commit dc336f4
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ public NavigableMap<Bytes32, Bytes> streamFlatAccounts(
}

public NavigableMap<Bytes32, Bytes> streamFlatAccounts(
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
final Bytes startKeyHash, final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return getFlatDbStrategy()
.streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, takeWhile);
.streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, takeWhile);
}

public NavigableMap<Bytes32, Bytes> streamFlatStorages(
Expand All @@ -146,11 +144,9 @@ public NavigableMap<Bytes32, Bytes> streamFlatStorages(
public NavigableMap<Bytes32, Bytes> streamFlatStorages(
final Hash accountHash,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return getFlatDbStrategy()
.streamStorageFlatDatabase(
composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, takeWhile);
.streamStorageFlatDatabase(composedWorldStateStorage, accountHash, startKeyHash, takeWhile);
}

public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ public NavigableMap<Bytes32, Bytes> streamAccountFlatDatabase(
public NavigableMap<Bytes32, Bytes> streamAccountFlatDatabase(
final SegmentedKeyValueStorage storage,
final Bytes startKeyHash,
final Bytes32 endKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {

return toNavigableMap(
accountsToPairStream(storage, startKeyHash, endKeyHash).takeWhile(takeWhile));
return toNavigableMap(accountsToPairStream(storage, startKeyHash).takeWhile(takeWhile));
}

/** streams RLP encoded storage values using a specified stream limit. */
Expand Down Expand Up @@ -240,6 +238,34 @@ public NavigableMap<Bytes32, Bytes> streamStorageFlatDatabase(
.takeWhile(takeWhile));
}

/** streams raw storage Bytes using a specified predicate filter and value mapper. */
public NavigableMap<Bytes32, Bytes> streamStorageFlatDatabase(
final SegmentedKeyValueStorage storage,
final Hash accountHash,
final Bytes startKeyHash,
final Predicate<Pair<Bytes32, Bytes>> takeWhile) {
return toNavigableMap(
storageToPairStream(storage, accountHash, startKeyHash, RLP::encodeValue)
.takeWhile(takeWhile));
}

private static Stream<Pair<Bytes32, Bytes>> storageToPairStream(
final SegmentedKeyValueStorage storage,
final Hash accountHash,
final Bytes startKeyHash,
final Function<Bytes, Bytes> valueMapper) {

return storage
.streamFromKey(
ACCOUNT_STORAGE_STORAGE, Bytes.concatenate(accountHash, startKeyHash).toArrayUnsafe())
.takeWhile(pair -> Bytes.wrap(pair.getKey()).slice(0, Hash.SIZE).equals(accountHash))
.map(
pair ->
new Pair<>(
Bytes32.wrap(Bytes.wrap(pair.getKey()).slice(Hash.SIZE)),
valueMapper.apply(Bytes.wrap(pair.getValue()).trimLeadingZeros())));
}

private static Stream<Pair<Bytes32, Bytes>> storageToPairStream(
final SegmentedKeyValueStorage storage,
final Hash accountHash,
Expand All @@ -266,6 +292,13 @@ private static Stream<Pair<Bytes32, Bytes>> accountsToPairStream(
.map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue())));
}

private static Stream<Pair<Bytes32, Bytes>> accountsToPairStream(
final SegmentedKeyValueStorage storage, final Bytes startKeyHash) {
return storage
.streamFromKey(ACCOUNT_INFO_STATE, startKeyHash.toArrayUnsafe())
.map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue())));
}

private static NavigableMap<Bytes32, Bytes> toNavigableMap(
final Stream<Pair<Bytes32, Bytes>> pairStream) {
final TreeMap<Bytes32, Bytes> collected =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ MessageData constructGetAccountRangeResponse(final MessageData message) {
.map(
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
StatefulPredicate shouldContinuePredicate =
new StatefulPredicate(
ResponseSizePredicate responseSizePredicate =
new ResponseSizePredicate(
"account",
stopWatch,
maxResponseBytes,
Expand All @@ -248,9 +248,13 @@ MessageData constructGetAccountRangeResponse(final MessageData message) {
return rlpOutput.encodedSize();
});

final Bytes32 endKeyBytes = range.endKeyHash();
var shouldContinuePredicate =
new ExceedingPredicate(
new EndKeyExceedsPredicate(endKeyBytes).and(responseSizePredicate));

NavigableMap<Bytes32, Bytes> accounts =
storage.streamFlatAccounts(
range.startKeyHash(), range.endKeyHash(), shouldContinuePredicate);
storage.streamFlatAccounts(range.startKeyHash(), shouldContinuePredicate);

if (accounts.isEmpty() && shouldContinuePredicate.shouldContinue.get()) {
// fetch next account after range, if it exists
Expand Down Expand Up @@ -331,8 +335,8 @@ MessageData constructGetStorageRangeResponse(final MessageData message) {
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
// reusable predicate to limit by rec count and bytes:
var statefulPredicate =
new StatefulPredicate(
var responsePredicate =
new ResponseSizePredicate(
"storage",
stopWatch,
maxResponseBytes,
Expand Down Expand Up @@ -364,9 +368,12 @@ MessageData constructGetStorageRangeResponse(final MessageData message) {
new WorldStateProofProvider(new WorldStateStorageCoordinator(storage));

for (var forAccountHash : range.hashes()) {
var predicate =
new ExceedingPredicate(
new EndKeyExceedsPredicate(endKeyBytes).and(responsePredicate));
var accountStorages =
storage.streamFlatStorages(
Hash.wrap(forAccountHash), startKeyBytes, endKeyBytes, statefulPredicate);
Hash.wrap(forAccountHash), startKeyBytes, predicate);

//// address partial range queries that return empty
if (accountStorages.isEmpty() && isPartialRange) {
Expand All @@ -386,7 +393,7 @@ MessageData constructGetStorageRangeResponse(final MessageData message) {

// if a partial storage range was requested, or we interrupted storage due to
// request limits, send proofs:
if (isPartialRange || !statefulPredicate.shouldGetMore()) {
if (isPartialRange || !predicate.shouldGetMore()) {
// send a proof for the left side range origin
proofNodes.addAll(
worldStateProof.getStorageProofRelatedNodes(
Expand All @@ -403,7 +410,7 @@ MessageData constructGetStorageRangeResponse(final MessageData message) {
}
}

if (!statefulPredicate.shouldGetMore()) {
if (!predicate.shouldGetMore()) {
break;
}
}
Expand Down Expand Up @@ -462,7 +469,7 @@ MessageData constructGetBytecodesResponse(final MessageData message) {
if (optCode.isPresent()) {
if (!codeBytes.isEmpty()
&& (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST)) {
|| stopWatch.getTime() > ResponseSizePredicate.MAX_MILLIS_PER_REQUEST)) {
break;
}
codeBytes.add(optCode.get());
Expand Down Expand Up @@ -521,7 +528,8 @@ MessageData constructGetTrieNodesResponse(final MessageData message) {
var trieNode = optStorage.orElse(Bytes.EMPTY);
if (!trieNodes.isEmpty()
&& (sumListBytes(trieNodes) + trieNode.size() > maxResponseBytes
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST)) {
|| stopWatch.getTime()
> ResponseSizePredicate.MAX_MILLIS_PER_REQUEST)) {
break;
}
trieNodes.add(trieNode);
Expand Down Expand Up @@ -578,7 +586,39 @@ && sumListBytes(trieNodes) + trieNode.size() > maxResponseBytes) {
}
}

static class StatefulPredicate implements Predicate<Pair<Bytes32, Bytes>> {
/**
* Predicate that doesn't immediately stop when the delegate predicate returns false, but instead
* sets a flag to stop after the current element is processed.
*/
static class ExceedingPredicate implements Predicate<Pair<Bytes32, Bytes>> {
private final Predicate<Pair<Bytes32, Bytes>> delegate;
final AtomicBoolean shouldContinue = new AtomicBoolean(true);

public ExceedingPredicate(final Predicate<Pair<Bytes32, Bytes>> delegate) {
this.delegate = delegate;
}

@Override
public boolean test(final Pair<Bytes32, Bytes> pair) {
final boolean result = delegate.test(pair);
return shouldContinue.getAndSet(result);
}

public boolean shouldGetMore() {
return shouldContinue.get();
}
}

/** Predicate that stops when the end key is exceeded. */
record EndKeyExceedsPredicate(Bytes endKey) implements Predicate<Pair<Bytes32, Bytes>> {

@Override
public boolean test(final Pair<Bytes32, Bytes> pair) {
return endKey.compareTo(Bytes.wrap(pair.getFirst())) > 0;
}
}

static class ResponseSizePredicate implements Predicate<Pair<Bytes32, Bytes>> {
// default to a max of 4 seconds per request
static final long MAX_MILLIS_PER_REQUEST = 4000;

Expand All @@ -588,26 +628,19 @@ static class StatefulPredicate implements Predicate<Pair<Bytes32, Bytes>> {
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator;
final StopWatch stopWatch;
final int maxResponseBytes;
// TODO: remove this hack, 10% is a fudge factor to account for the proof node size
final int maxResponseBytesFudgeFactor;
final String forWhat;

StatefulPredicate(
ResponseSizePredicate(
final String forWhat,
final StopWatch stopWatch,
final int maxResponseBytes,
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator) {
this.stopWatch = stopWatch;
this.maxResponseBytes = maxResponseBytes;
this.maxResponseBytesFudgeFactor = maxResponseBytes * 9 / 10;
this.forWhat = forWhat;
this.encodingSizeAccumulator = encodingSizeAccumulator;
}

public boolean shouldGetMore() {
return shouldContinue.get();
}

@Override
public boolean test(final Pair<Bytes32, Bytes> pair) {
LOGGER
Expand All @@ -628,14 +661,11 @@ public boolean test(final Pair<Bytes32, Bytes> pair) {
return false;
}

var hasNoRecords = recordLimit.get() == 0;
var underRecordLimit = recordLimit.addAndGet(1) <= MAX_ENTRIES_PER_REQUEST;
var underByteLimit =
byteLimit.accumulateAndGet(0, (cur, __) -> cur + encodingSizeAccumulator.apply(pair))
< maxResponseBytesFudgeFactor;
// Only enforce limits when we have at least 1 record as the snapsync spec
// requires at least 1 record must be returned
if (hasNoRecords || (underRecordLimit && underByteLimit)) {
< maxResponseBytes;
if (underRecordLimit && underByteLimit) {
return true;
} else {
shouldContinue.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ public void assertAccountLimitRangeResponse() {
var rangeData =
getAndVerifyAccountRangeData(
(AccountRangeMessage) snapServer.constructGetAccountRangeResponse(tinyRangeLimit),
// TODO: after sorting out the request fudge factor, adjust this assertion to match
acctCount * 90 / 100 - 1);
acctCount);

// assert proofs are valid for the requested range
assertThat(assertIsValidAccountRangeProof(Hash.ZERO, rangeData)).isTrue();
Expand Down Expand Up @@ -315,6 +314,31 @@ public void assertPartialStorageForSingleAccountEmptyRange() {
.isTrue();
}

@Test
public void assertPartialStorageLimitHashBetweenSlots() {
Bytes accountShortHash = Bytes.fromHexStringLenient("0x40");
Hash accountFullHash = Hash.wrap(Bytes32.leftPad(accountShortHash));
SnapTestAccount testAccount = createTestContractAccount(accountFullHash, 2, inMemoryStorage);

Hash startHash = Hash.wrap(Bytes32.rightPad(Bytes.fromHexString("12"))); // slot 2
Hash endHash = Hash.wrap(Bytes32.rightPad(Bytes.fromHexString("13"))); // between slots 2 and 3
var rangeData = requestStorageRange(List.of(testAccount.addressHash), startHash, endHash);

assertThat(rangeData).isNotNull();
var slotsData = rangeData.slotsData(false);
assertThat(slotsData).isNotNull();
assertThat(slotsData.slots()).isNotNull();
assertThat(slotsData.slots().size()).isEqualTo(1);
var firstAccountStorages = slotsData.slots().first();
// expecting to see 2 slots
assertThat(firstAccountStorages.size()).isEqualTo(2);
// assert proofs are valid for the requested range
assertThat(
assertIsValidStorageProof(
testAccount, startHash, firstAccountStorages, slotsData.proofs()))
.isTrue();
}

@Test
public void assertLastEmptyPartialStorageForSingleAccount() {
// When our final range request is empty, no next account is possible,
Expand Down Expand Up @@ -343,7 +367,7 @@ public void assertLastEmptyPartialStorageForSingleAccount() {
@Test
public void assertStorageLimitRangeResponse() {
// assert we limit the range response according to bytessize
final int storageSlotSize = 70;
final int storageSlotSize = 69;
final int storageSlotCount = 16;
insertTestAccounts(acct1, acct2, acct3, acct4);

Expand Down Expand Up @@ -374,8 +398,7 @@ public void assertStorageLimitRangeResponse() {
assertThat(firstAccountStorages.size()).isEqualTo(10);
var secondAccountStorages = slotsData.slots().last();
// expecting to see only 6 since request was limited to 16 slots
// TODO: after sorting out the request fudge factor, adjust this assertion to match
assertThat(secondAccountStorages.size()).isEqualTo(6 * 90 / 100 - 1);
assertThat(secondAccountStorages.size()).isEqualTo(6);
// proofs required for interrupted storage range:
assertThat(slotsData.proofs().size()).isNotEqualTo(0);

Expand Down Expand Up @@ -556,7 +579,7 @@ public void assertStorageTriePathRequest_accountNotPresent() {
public void assertStorageTrieShortAccountHashPathRequest() {
Bytes accountShortHash = Bytes.fromHexStringLenient("0x40");
Hash accountFullHash = Hash.wrap(Bytes32.leftPad(accountShortHash));
SnapTestAccount testAccount = createTestContractAccount(accountFullHash, inMemoryStorage);
SnapTestAccount testAccount = createTestContractAccount(accountFullHash, 1, inMemoryStorage);
insertTestAccounts(testAccount);
var pathToSlot11 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0101"));
var pathToSlot12 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0102"));
Expand Down Expand Up @@ -707,11 +730,11 @@ static SnapTestAccount createTestAccount(final String hexAddr) {
static SnapTestAccount createTestContractAccount(
final String hexAddr, final BonsaiWorldStateKeyValueStorage storage) {
final Hash acctHash = Hash.wrap(Bytes32.rightPad(Bytes.fromHexString(hexAddr)));
return createTestContractAccount(acctHash, storage);
return createTestContractAccount(acctHash, 1, storage);
}

static SnapTestAccount createTestContractAccount(
final Hash acctHash, final BonsaiWorldStateKeyValueStorage storage) {
final Hash acctHash, final int slotKeyGap, final BonsaiWorldStateKeyValueStorage storage) {
MerkleTrie<Bytes32, Bytes> trie =
new StoredMerklePatriciaTrie<>(
(loc, hash) -> storage.getAccountStorageTrieNode(acctHash, loc, hash),
Expand All @@ -724,7 +747,7 @@ static SnapTestAccount createTestContractAccount(
var flatdb = storage.getFlatDbStrategy();
var updater = storage.updater();
updater.putCode(Hash.hash(mockCode), mockCode);
IntStream.range(10, 20)
IntStream.iterate(10, i -> i < 20, i -> i + slotKeyGap)
.boxed()
.forEach(
i -> {
Expand Down

0 comments on commit dc336f4

Please sign in to comment.