Skip to content

Commit

Permalink
custody subnet count decoding during discovery (#6777)
Browse files Browse the repository at this point in the history
* renamed eip7594_helpers to peerdas_helpers, added csc subnet decoding during discovery

* added link
  • Loading branch information
agnxsh authored Dec 20, 2024
1 parent e2e2266 commit c728d87
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 38 deletions.
14 changes: 7 additions & 7 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import
# Internals
../spec/[
beaconstate, state_transition_block, forks,
helpers, network, signatures, eip7594_helpers],
helpers, network, signatures, peerdas_helpers],
../consensus_object_pools/[
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool,
Expand Down Expand Up @@ -496,7 +496,7 @@ proc validateBlobSidecar*(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id
proc validateDataColumnSidecar*(
dag: ChainDAGRef, quarantine: ref Quarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
data_column_sidecar: DataColumnSidecar,
wallTime: BeaconTime, subnet_id: uint64):
Result[void, ValidationError] =
Expand All @@ -508,14 +508,14 @@ proc validateDataColumnSidecar*(
if not (data_column_sidecar.index < NUMBER_OF_COLUMNS):
return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS")

# [REJECT] The sidecar is for the correct subnet
# [REJECT] The sidecar is for the correct subnet
# -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`.
if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")

# [IGNORE] The sidecar is not from a future slot
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
# [IGNORE] The sidecar is not from a future slot
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
# processing at the appropriate slot).
if not (block_header.slot <=
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
Expand Down Expand Up @@ -608,7 +608,7 @@ proc validateDataColumnSidecar*(
data_column_sidecar.signed_block_header.signature):
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")

# [REJECT] The sidecar's column data is valid as
# [REJECT] The sidecar's column data is valid as
# verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
Expand Down
18 changes: 16 additions & 2 deletions beacon_chain/networking/eth2_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
std/[algorithm, sequtils],
chronos, chronicles,
eth/p2p/discoveryv5/[enr, protocol, node, random2],
../spec/datatypes/altair,
../spec/datatypes/[altair, fulu],
../spec/eth2_ssz_serialization,
".."/[conf, conf_light_client]

Expand Down Expand Up @@ -127,6 +127,7 @@ proc queryRandom*(
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits,
wantedCscnets: CscBits,
minScore: int): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
Expand All @@ -151,13 +152,26 @@ proc queryRandom*(
if not forkId.isCompatibleForkId(peerForkId):
continue

let cscCountBytes = n.record.get(enrCustodySubnetCountField, seq[byte])
if cscCountBytes.isOk():
let cscCountNode =
try:
SSZ.decode(cscCountBytes.get(), uint8)
except SerializationError as e:
debug "Could not decode the csc ENR field of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue

if wantedCscnets.countOnes().uint8 == cscCountNode:
score += 1

let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isOk():
let attnetsNode =
try:
SSZ.decode(attnetsBytes.get(), AttnetBits)
except SerializationError as e:
debug "Could not decode the attnets ERN bitfield of peer",
debug "Could not decode the attnets ENR bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue

Expand Down
23 changes: 17 additions & 6 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,8 @@ proc trimConnections(node: Eth2Node, count: int) =
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return

proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
proc getLowSubnets(node: Eth2Node, epoch: Epoch):
(AttnetBits, SyncnetBits, CscBits) =
# Returns the subnets required to have a healthy mesh
# The subnets are computed, to, in order:
# - Have 0 subnet with < `dLow` peers from topic subscription
Expand Down Expand Up @@ -1570,7 +1571,11 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
if epoch + 1 >= node.cfg.ALTAIR_FORK_EPOCH:
findLowSubnets(getSyncCommitteeTopic, SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT)
else:
default(SyncnetBits)
default(SyncnetBits),
if epoch >= node.cfg.FULU_FORK_EPOCH:
findLowSubnets(getDataColumnSidecarTopic, uint64, (DATA_COLUMN_SIDECAR_SUBNET_COUNT).int)
else:
default(CscBits)
)

proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
Expand All @@ -1579,23 +1584,29 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
while true:
let
currentEpoch = node.getBeaconTime().slotOrZero.epoch
(wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch)
(wantedAttnets, wantedSyncnets, wantedCscnets) = node.getLowSubnets(currentEpoch)
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
wantedCscnetsCount = wantedCscnets.countOnes()
outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing})
targetOutgoingPeers = max(node.wantedPeers div 10, 3)

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
outgoingPeers < targetOutgoingPeers:
wantedCscnetsCount > 0 or outgoingPeers < targetOutgoingPeers:

let
minScore =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
wantedCscnetsCount > 0:
1
else:
0
discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore)
node.discoveryForkId,
wantedAttnets,
wantedSyncnets,
wantedCscnets,
minScore)

let newPeers = block:
var np = newSeq[PeerAddr]()
Expand Down
4 changes: 2 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import
./spec/datatypes/[altair, bellatrix, phase0],
./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
eip7594_helpers],
peerdas_helpers],
./sync/[sync_protocol, light_client_protocol, sync_overseer],
./validators/[keystore_management, beacon_validators],
"."/[
Expand Down Expand Up @@ -535,7 +535,7 @@ proc initFullNode(
processor: processor,
network: node.network)
requestManager = RequestManager.init(
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,13 @@ func getSyncSubnets*(
iterator blobSidecarTopics*(forkDigest: ForkDigest): string =
for subnet_id in BlobId:
yield getBlobSidecarTopic(forkDigest, subnet_id)

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
func getDataColumnSidecarTopic*(forkDigest: ForkDigest,
subnet_id: uint64): string =
eth2Prefix(forkDigest) & "data_column_sidecar_" & $subnet_id & "/ssz_snappy"

iterator dataColumnSidecarTopics*(forkDigest: ForkDigest,
targetSubnetCount: uint64): string =
for subnet_id in 0'u64..<targetSubnetCount:
yield getDataColumnSidecarTopic(forkDigest, subnet_id)
File renamed without changes.
28 changes: 14 additions & 14 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, deneb, fulu],
../spec/[forks, network, eip7594_helpers],
../spec/[forks, network, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
Expand Down Expand Up @@ -299,23 +299,23 @@ proc checkPeerCustody*(rman: RequestManager,
peer: Peer):
bool =
# Returns true if the peer custodies atleast
# ONE of the common custody columns, straight
# ONE of the common custody columns, straight
# away returns true if the peer is a supernode.
if rman.supernode:
# For a supernode, it is always best/optimistic
# to filter other supernodes, rather than filter
# too many full nodes that have a subset of the custody
# columns
if peer.lookupCscFromPeer() ==
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true

else:
if peer.lookupCscFromPeer() ==
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true

elif peer.lookupCscFromPeer() ==
elif peer.lookupCscFromPeer() ==
CUSTODY_REQUIREMENT.uint64:

# Fetch the remote custody count
Expand All @@ -333,9 +333,9 @@ proc checkPeerCustody*(rman: RequestManager,
for local_column in rman.custody_columns_set:
if local_column notin remoteCustodyColumns:
return false

return true

else:
return false

Expand Down Expand Up @@ -551,7 +551,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()

const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)

var
Expand All @@ -574,7 +574,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
if id.index in rman.custody_columns_set and id notin fetches and
if id.index in rman.custody_columns_set and id notin fetches and
len(forkyBlck.message.body.blob_kzg_commitments) != 0:
fetches.incl(id)
else:
Expand All @@ -583,7 +583,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
ready.add(columnless.root)

for root in ready:
let columnless = rman.quarantine[].popColumnless(root).valueOr:
continue
Expand All @@ -593,7 +593,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
proc requestManagerDataColumnLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:

await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
Expand Down Expand Up @@ -623,7 +623,7 @@ proc requestManagerDataColumnLoop(
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
for blockRoot in blockRoots:
let blck = rman.quarantine[].popColumnless(blockRoot).valueOr:
Expand All @@ -644,7 +644,7 @@ proc requestManagerDataColumnLoop(
array[PARALLEL_REQUESTS_DATA_COLUMNS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REQUESTS_DATA_COLUMNS:
workers[i] = rman.fetchDataColumnsFromNetwork(columnIds)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(columnIds)))

Expand Down
2 changes: 1 addition & 1 deletion tests/consensus_spec/test_fixture_networking.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
kzg4844/[kzg, kzg_abi],
stint,
eth/p2p/discoveryv5/[node],
../../beacon_chain/spec/eip7594_helpers,
../../beacon_chain/spec/peerdas_helpers,
../testutil,
./fixtures_utils, ./os_ops

Expand Down
13 changes: 9 additions & 4 deletions tests/test_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proc generateNode(rng: ref HmacDrbgContext, port: Port,

# TODO: Add tests with a syncnets preference
const noSyncnetsPreference = SyncnetBits()
const noCscnetsPreference = CscBits()

procSuite "Eth2 specific discovery tests":
let
Expand Down Expand Up @@ -67,7 +68,8 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(34)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -105,7 +107,8 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -133,7 +136,8 @@ procSuite "Eth2 specific discovery tests":

block:
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 0

block:
Expand All @@ -148,7 +152,8 @@ procSuite "Eth2 specific discovery tests":
discard node1.addNode(nodes[][0])

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down
4 changes: 2 additions & 2 deletions tests/test_eip7594_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
results,
kzg4844/[kzg_abi, kzg],
./consensus_spec/[os_ops, fixtures_utils],
../beacon_chain/spec/[helpers, eip7594_helpers],
../beacon_chain/spec/[helpers, peerdas_helpers],
../beacon_chain/spec/datatypes/[fulu, deneb]

from std/strutils import rsplit
Expand Down Expand Up @@ -79,7 +79,7 @@ suite "EIP-7594 Unit Tests":
blob_count = rng.rand(1..(NUMBER_OF_COLUMNS.int))
blobs = createSampleKzgBlobs(blob_count, rng.rand(int))
extended_matrix = compute_matrix(blobs)

# Construct a matrix with some entries missing
var partial_matrix: seq[MatrixEntry]
for blob_entries in chunks(extended_matrix.get, kzg_abi.CELLS_PER_EXT_BLOB):
Expand Down

0 comments on commit c728d87

Please sign in to comment.