-
Notifications
You must be signed in to change notification settings - Fork 250
/
consensus_manager.nim
474 lines (403 loc) · 19 KB
/
consensus_manager.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# beacon_chain
# Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
chronicles, chronos, web3/[ethtypes, engine_api_types],
../spec/datatypes/base,
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
../el/el_manager,
../beacon_clock
from ../spec/beaconstate import
get_expected_withdrawals, has_eth1_withdrawal_credential
from ../spec/datatypes/capella import Withdrawal
from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getPerValidatorDefaultFeeRecipient, getSuggestedFeeRecipient,
getSuggestedGasLimit
from ../validators/action_tracker import ActionTracker, getNextProposalSlot
type
ConsensusManager* = object
expectedSlot: Slot
expectedBlockReceived: Future[bool]
# Validated & Verified
# ----------------------------------------------------------------
dag*: ChainDAGRef
attestationPool*: ref AttestationPool
# Missing info
# ----------------------------------------------------------------
quarantine*: ref Quarantine
# Execution layer integration
# ----------------------------------------------------------------
elManager*: ELManager
# Allow determination of whether there's an upcoming proposal
# ----------------------------------------------------------------
actionTracker*: ActionTracker
# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
validatorsDir: string
defaultFeeRecipient: Opt[Eth1Address]
defaultGasLimit: uint64
# Tracking last proposal forkchoiceUpdated payload information
# ----------------------------------------------------------------
optimisticHead: tuple[bid: BlockId, execution_block_hash: Eth2Digest]
# Initialization
# ------------------------------------------------------------------------------
func new*(T: type ConsensusManager,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
elManager: ELManager,
actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
validatorsDir: string,
defaultFeeRecipient: Opt[Eth1Address],
defaultGasLimit: uint64
): ref ConsensusManager =
(ref ConsensusManager)(
dag: dag,
attestationPool: attestationPool,
quarantine: quarantine,
elManager: elManager,
actionTracker: actionTracker,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
validatorsDir: validatorsDir,
defaultFeeRecipient: defaultFeeRecipient,
defaultGasLimit: defaultGasLimit
)
# Consensus Management
# -----------------------------------------------------------------------------------
proc checkExpectedBlock(self: var ConsensusManager) =
if self.expectedBlockReceived == nil:
return
if self.dag.head.slot < self.expectedSlot:
return
self.expectedBlockReceived.complete(true)
self.expectedBlockReceived = nil # Don't keep completed futures around!
proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] =
## Return a future that will complete when a head is selected whose slot is
## equal or greater than the given slot, or a new expectation is created
if self.expectedBlockReceived != nil:
# Reset the old future to not leave it hanging.. an alternative would be to
# cancel it, but it doesn't make any practical difference for now
self.expectedBlockReceived.complete(false)
let fut = newFuture[bool]("ConsensusManager.expectBlock")
self.expectedSlot = expectedSlot
self.expectedBlockReceived = fut
# It might happen that by the time we're expecting a block, it might have
# already been processed!
self.checkExpectedBlock()
return fut
func `$`(h: BlockHash): string = $h.asEth2Digest
func shouldSyncOptimistically*(
optimisticSlot, dagSlot, wallSlot: Slot): bool =
## Determine whether an optimistic execution block hash should be reported
## to the EL client instead of the current head as determined by fork choice.
# Check whether optimistic head is sufficiently ahead of DAG
const minProgress = 8 * SLOTS_PER_EPOCH # Set arbitrarily
if optimisticSlot < dagSlot or optimisticSlot - dagSlot < minProgress:
return false
# Check whether optimistic head has synced sufficiently close to wall slot
const maxAge = 2 * SLOTS_PER_EPOCH # Set arbitrarily
if optimisticSlot < max(wallSlot, maxAge.Slot) - maxAge:
return false
true
func shouldSyncOptimistically*(self: ConsensusManager, wallSlot: Slot): bool =
if self.optimisticHead.execution_block_hash.isZero:
return false
shouldSyncOptimistically(
optimisticSlot = self.optimisticHead.bid.slot,
dagSlot = getStateField(self.dag.headState, slot),
wallSlot = wallSlot)
func optimisticHead*(self: ConsensusManager): BlockId =
self.optimisticHead.bid
func optimisticExecutionPayloadHash*(self: ConsensusManager): Eth2Digest =
self.optimisticHead.execution_block_hash
func setOptimisticHead*(
self: var ConsensusManager,
bid: BlockId, execution_block_hash: Eth2Digest) =
self.optimisticHead = (bid: bid, execution_block_hash: execution_block_hash)
proc updateExecutionClientHead(self: ref ConsensusManager,
newHead: BeaconHead): Future[Opt[void]] {.async.} =
let headExecutionPayloadHash = self.dag.loadExecutionBlockHash(newHead.blck)
if headExecutionPayloadHash.isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(newHead.blck)
return Opt[void].ok()
template callForkchoiceUpdated(attributes: untyped): auto =
await self.elManager.forkchoiceUpdated(
headBlockHash = headExecutionPayloadHash,
safeBlockHash = newHead.safeExecutionPayloadHash,
finalizedBlockHash = newHead.finalizedExecutionPayloadHash,
payloadAttributes = none attributes)
# Can't use dag.head here because it hasn't been updated yet
let (payloadExecutionStatus, latestValidHash) =
case self.dag.cfg.consensusForkAtEpoch(newHead.blck.bid.slot.epoch)
of ConsensusFork.Capella, ConsensusFork.Deneb:
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#specification-1
# Consensus layer client MUST call this method instead of
# `engine_forkchoiceUpdatedV1` under any of the following conditions:
# `headBlockHash` references a block which `timestamp` is greater or
# equal to the Shanghai timestamp
callForkchoiceUpdated(PayloadAttributesV2)
of ConsensusFork.Phase0, ConsensusFork.Altair, ConsensusFork.Bellatrix:
callForkchoiceUpdated(PayloadAttributesV1)
case payloadExecutionStatus
of PayloadExecutionStatus.valid:
self.dag.markBlockVerified(newHead.blck)
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
self.attestationPool[].forkChoice.mark_root_invalid(newHead.blck.root)
self.quarantine[].addUnviable(newHead.blck.root)
return Opt.none(void)
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
# Don't do anything. Either newHead.blck.executionValid was already false,
# in which case it'd be superfluous to set it to false again, or the block
# was marked as `VALID` in the `newPayload` path already, in which case it
# is fine to keep it as valid here. Conceptually, were this to be lines of
# code, it'd be something like
# if newHead.blck.executionValid:
# do nothing because of latter case
# else:
# do nothing because it's a no-op
# So, either way, do nothing.
discard
return Opt[void].ok()
func getKnownValidatorsForBlsChangeTracking(
self: ConsensusManager, newHead: BlockRef): seq[ValidatorIndex] =
# Ensure that large nodes won't be overloaded by a nice-to-have, but
# inessential cosmetic feature.
const MAX_CHECKED_INDICES = 64
if newHead.bid.slot.epoch >= self.dag.cfg.CAPELLA_FORK_EPOCH:
var res = newSeqOfCap[ValidatorIndex](min(
len(self.actionTracker.knownValidators), MAX_CHECKED_INDICES))
for vi in self.actionTracker.knownValidators.keys():
res.add vi
if res.len >= MAX_CHECKED_INDICES:
break
res
else:
# It is not possible for any BLS to execution changes, for any validator,
# to have been yet processed.
# https://github.com/nim-lang/Nim/issues/19802
(static(@[]))
proc updateHead*(self: var ConsensusManager, newHead: BlockRef) =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(
newHead, self.quarantine[],
self.getKnownValidatorsForBlsChangeTracking(newHead))
self.checkExpectedBlock()
proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time).valueOr:
warn "Head selection failed, using previous head",
head = shortLog(self.dag.head), wallSlot
return
if self.dag.loadExecutionBlockHash(newHead.blck).isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(newHead.blck)
self.updateHead(newHead.blck)
func isSynced(dag: ChainDAGRef, wallSlot: Slot): bool =
# This is a tweaked version of the validator_duties isSynced. TODO, refactor
# that one so this becomes the default version, with the same information to
# work with. For the head slot, use the DAG head regardless of what head the
# proposer forkchoiceUpdated is using, because by the validator_duties might
# be ready to actually propose, it's going to do so from the DAG head. Given
# the defaultSyncHorizon, it will start triggering in time so that potential
# discrepancies between the head here, and the head the DAG has (which might
# not yet be updated) won't be visible.
const defaultSyncHorizon = 50
if dag.head.slot + defaultSyncHorizon < wallSlot:
false
else:
dag.head.executionValid
proc checkNextProposer(
dag: ChainDAGRef, actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
wallSlot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let nextWallSlot = wallSlot + 1
# Avoid long rewinds during syncing, when it's not going to propose. Though
# this is preparing for a proposal on `nextWallSlot`, it can't possibly yet
# be on said slot, so still check just `wallSlot`.
if not dag.isSynced(wallSlot):
return Opt.none((ValidatorIndex, ValidatorPubKey))
let proposer = ? dag.getProposer(dag.head, nextWallSlot)
if actionTracker.getNextProposalSlot(wallSlot) != nextWallSlot and
dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
proposer, nextWallSlot.epoch).isNone:
return Opt.none((ValidatorIndex, ValidatorPubKey))
let proposerKey = dag.validatorKey(proposer).get().toPubKey
Opt.some((proposer, proposerKey))
proc checkNextProposer*(self: ref ConsensusManager, wallSlot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
self.dag.checkNextProposer(
self.actionTracker, self.dynamicFeeRecipientsStore, wallSlot)
proc getFeeRecipient*(
self: ConsensusManager, pubkey: ValidatorPubKey,
validatorIdx: Opt[ValidatorIndex], epoch: Epoch): Eth1Address =
let dynFeeRecipient = if validatorIdx.isSome:
self.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
validatorIdx.get(), epoch)
else:
Opt.none(Eth1Address)
dynFeeRecipient.valueOr:
let
withdrawalAddress =
if validatorIdx.isSome:
withState(self.dag.headState):
if validatorIdx.get < forkyState.data.validators.lenu64:
let validator = forkyState.data.validators.item(validatorIdx.get)
if has_eth1_withdrawal_credential(validator):
var address: distinctBase(Eth1Address)
address[0..^1] = validator.withdrawal_credentials.data[12..^1]
Opt.some Eth1Address address
else:
Opt.none Eth1Address
else:
Opt.none Eth1Address
else:
Opt.none Eth1Address
defaultFeeRecipient = getPerValidatorDefaultFeeRecipient(
self.defaultFeeRecipient, withdrawalAddress)
self.validatorsDir.getSuggestedFeeRecipient(
pubkey, defaultFeeRecipient).valueOr:
# Ignore errors and use default - errors are logged in gsfr
defaultFeeRecipient
proc getGasLimit*(
self: ConsensusManager, pubkey: ValidatorPubKey): uint64 =
self.validatorsDir.getSuggestedGasLimit(
pubkey, self.defaultGasLimit).valueOr:
self.defaultGasLimit
from ../spec/datatypes/bellatrix import PayloadID
proc runProposalForkchoiceUpdated*(
self: ref ConsensusManager, wallSlot: Slot) {.async.} =
let
nextWallSlot = wallSlot + 1
(validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr:
return
debug "runProposalForkchoiceUpdated: expected to be proposing next slot",
nextWallSlot, validatorIndex, nextProposer
# In Capella and later, computing correct withdrawals would mean creating a
# proposal state. Instead, only do that at proposal time.
if nextWallSlot.is_epoch:
debug "runProposalForkchoiceUpdated: not running early fcU for epoch-aligned proposal slot",
nextWallSlot, validatorIndex, nextProposer
return
# Approximately lines up with validator_duties version. Used optimistically/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = withState(self.dag.headState):
compute_timestamp_at_slot(forkyState.data, nextWallSlot)
# If the current head block still forms the basis of the eventual proposal
# state, then its `get_randao_mix` will remain unchanged as well, as it is
# constant until the next block.
randomData = withState(self.dag.headState):
get_randao_mix(forkyState.data, get_current_epoch(forkyState.data)).data
feeRecipient = self[].getFeeRecipient(
nextProposer, Opt.some(validatorIndex), nextWallSlot.epoch)
withdrawals =
if self.dag.headState.kind >= ConsensusFork.Capella:
# Head state is not eventual proposal state, but withdrawals will be
# identical within an epoch.
withState(self.dag.headState):
when consensusFork >= ConsensusFork.Capella:
Opt.some get_expected_withdrawals(forkyState.data)
else:
Opt.none(seq[Withdrawal])
else:
Opt.none(seq[Withdrawal])
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockHash = self.dag.loadExecutionBlockHash(beaconHead.blck)
if headBlockHash.isZero:
return
try:
let safeBlockHash = beaconHead.safeExecutionPayloadHash
withState(self.dag.headState):
template callForkchoiceUpdated(fcPayloadAttributes: auto) =
let (status, _) = await self.elManager.forkchoiceUpdated(
headBlockHash, safeBlockHash,
beaconHead.finalizedExecutionPayloadHash,
payloadAttributes = some fcPayloadAttributes)
debug "Fork-choice updated for proposal", status
static: doAssert high(ConsensusFork) == ConsensusFork.Deneb
when consensusFork >= ConsensusFork.Capella:
callForkchoiceUpdated(PayloadAttributesV2(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient,
withdrawals: toEngineWithdrawals get_expected_withdrawals(forkyState.data)))
else:
callForkchoiceUpdated(PayloadAttributesV1(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient))
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
proc updateHeadWithExecution*(
self: ref ConsensusManager, initialNewHead: BeaconHead,
getBeaconTimeFn: GetBeaconTimeFn) {.async.} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data
try:
# Ensure dag.updateHead has most current information
var
attempts = 0
newHead = initialNewHead
while (await self.updateExecutionClientHead(newHead)).isErr:
# This proc is called on every new block; guarantee timely return
inc attempts
const maxAttempts = 5
if attempts >= maxAttempts:
warn "updateHeadWithExecution: too many attempts to recover from invalid payload",
attempts, maxAttempts, newHead, initialNewHead
break
# Select new head for next attempt
let
wallTime = getBeaconTimeFn()
nextHead = self.attestationPool[].selectOptimisticHead(wallTime).valueOr:
warn "Head selection failed after invalid block, using previous head",
newHead, wallSlot = wallTime.slotOrZero
break
warn "updateHeadWithExecution: attempting to recover from invalid payload",
attempts, maxAttempts, newHead, initialNewHead, nextHead
newHead = nextHead
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(
newHead.blck, self.quarantine[],
self[].getKnownValidatorsForBlsChangeTracking(newHead.blck))
# If this node should propose next slot, start preparing payload. Both
# fcUs are useful: the updateExecutionClientHead(newHead) call updates
# the head state (including optimistic status) that self.dagUpdateHead
# needs while runProposalForkchoiceUpdated requires RANDAO information
# from the head state corresponding to the `newHead` block, which only
# self.dag.updateHead(...) sets up.
await self.runProposalForkchoiceUpdated(getBeaconTimeFn().slotOrZero)
self[].checkExpectedBlock()
except CatchableError as exc:
debug "updateHeadWithExecution error",
error = exc.msg
proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) =
## Prune unneeded and invalidated data after finalization
## - the DAG state checkpoints
## - the DAG EpochRef
## - the attestation pool/fork choice
# Cleanup DAG & fork choice if we have a finalized head
if self.dag.needStateCachesAndForkChoicePruning():
self.dag.pruneStateCachesDAG()
self.attestationPool[].prune()