Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Allow per validator fee recipient via flag or file in validator client (similar to graffiti / graffiti-file) #2924

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fa66c7b
implemented per validator fee-recipient (fee-recipient / fee-recipien…
pk910 Jan 17, 2022
e30b328
run cargo fmt
pk910 Jan 17, 2022
d35ded6
fix clippy issues
pk910 Jan 17, 2022
51a1af6
fixed fee_recipient parameter formatting
pk910 Jan 17, 2022
49e27f6
reverted fee_recipient api parameter, added proposer preparation serv…
pk910 Jan 18, 2022
2feef90
Added prepare_beacon_proposer api endpoint
pk910 Jan 18, 2022
815b4de
cache proposer preparations
pk910 Jan 18, 2022
4330306
Fixed VC panic when fee-recipient-file is unreadable & reload cached …
pk910 Jan 18, 2022
fff7c2b
Merge remote-tracking branch 'origin/unstable' into fee-recipient-per…
pk910 Jan 19, 2022
34d8ada
cleanup implementation
pk910 Jan 19, 2022
62539f9
Add suggested fee recip. to val defs yaml
paulhauner Jan 24, 2022
eef70d2
Use fee recip from val defs
paulhauner Jan 25, 2022
ec1d253
Add incomplete changes
paulhauner Jan 25, 2022
128fb2c
Add incomplete docs
paulhauner Jan 25, 2022
1b7ccd9
Tidy, publish each slot
paulhauner Jan 25, 2022
bdade7f
Add "suggested-" prefix globally
paulhauner Jan 25, 2022
9576eff
Add comment
paulhauner Jan 25, 2022
15665f6
Tidy docs
paulhauner Jan 25, 2022
381176f
Clarify docs
paulhauner Jan 25, 2022
02847e4
Tidy logs
paulhauner Jan 25, 2022
f112f4f
Publish on startup
paulhauner Jan 25, 2022
56511d5
Add val defs tests
paulhauner Jan 25, 2022
f62912b
Fix test compilation failures
paulhauner Jan 25, 2022
abd12fe
Fix web3signer test failures
paulhauner Jan 25, 2022
77941b2
Fix account manager tests
paulhauner Jan 25, 2022
87a0e14
Merge pull request #1 from paulhauner/2924-paul
pk910 Jan 25, 2022
f7f0053
Added client IP to "Received proposer preparation data" log entry
pk910 Jan 25, 2022
c269bd5
Added optional 'suggested_fee_recipient' field in 'POST /lighthouse/v…
pk910 Jan 25, 2022
07a6a43
Add default fee recip to BN
paulhauner Jan 31, 2022
96d5cf9
Update beacon_node/http_api/src/lib.rs
pk910 Jan 31, 2022
4d9dd45
Merge remote-tracking branch 'origin/unstable' into fee-recipient-per…
pk910 Jan 31, 2022
e09873c
fix merge issue
pk910 Jan 31, 2022
e3a4166
fixed result handling in validator_client/src/preparation_service.rs
pk910 Jan 31, 2022
a16792b
fixed result handling in beacon_node/execution_layer/src/lib.rs
pk910 Jan 31, 2022
f27c3ef
cargo fmt
pk910 Jan 31, 2022
d59ea86
fixed another merge from unstable issue
pk910 Jan 31, 2022
44e303f
implemented preparation data cleanup routine
pk910 Feb 1, 2022
d4804e2
dummy commit to trigger test-suite again
pk910 Feb 1, 2022
3072e5a
shorten "Successfully published proposer preparation" log entry (it's…
pk910 Feb 2, 2022
a645948
Apply suggestions from code review
pk910 Feb 7, 2022
fe7ffdc
pass proposer_index as value to prepare_execution_payload
pk910 Feb 7, 2022
4bc27b2
Apply suggestions from code review
pk910 Feb 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions account_manager/src/validator/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,15 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin
eprintln!("Successfully imported keystore.");
num_imported_keystores += 1;

let validator_def =
ValidatorDefinition::new_keystore_with_password(&dest_keystore, password_opt, None)
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;
let graffiti = None;
let suggested_fee_recipient = None;
let validator_def = ValidatorDefinition::new_keystore_with_password(
&dest_keystore,
password_opt,
graffiti,
suggested_fee_recipient,
)
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;

defs.push(validator_def);

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
BeaconState::Merge(_) => {
let sync_aggregate = get_sync_aggregate()?;
let execution_payload = get_execution_payload(self, &state)?;
let execution_payload = get_execution_payload(self, &state, proposer_index)?;
BeaconBlock::Merge(BeaconBlockMerge {
slot,
proposer_index,
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,26 @@ pub fn validate_execution_payload_for_gossip<T: BeaconChainTypes>(
pub fn get_execution_payload<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<ExecutionPayload<T::EthSpec>, BlockProductionError> {
Ok(prepare_execution_payload_blocking(chain, state)?.unwrap_or_default())
Ok(prepare_execution_payload_blocking(chain, state, proposer_index)?.unwrap_or_default())
}

/// Wraps the async `prepare_execution_payload` function as a blocking task.
pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BlockProductionError::ExecutionLayerMissing)?;

execution_layer
.block_on_generic(|_| async { prepare_execution_payload(chain, state).await })
.block_on_generic(|_| async {
prepare_execution_payload(chain, state, proposer_index).await
})
.map_err(BlockProductionError::BlockingFailed)?
}

Expand All @@ -240,6 +244,7 @@ pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
pub async fn prepare_execution_payload<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
let spec = &chain.spec;
let execution_layer = chain
Expand Down Expand Up @@ -300,6 +305,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes>(
timestamp,
random,
finalized_block_hash.unwrap_or_else(Hash256::zero),
proposer_index,
)
.await
.map_err(BlockProductionError::GetPayloadFailed)?;
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ where

// Spawn a routine that tracks the status of the execution engines.
execution_layer.spawn_watchdog_routine(beacon_chain.slot_clock.clone());

// Spawn a routine that removes expired proposer preparations.
execution_layer.spawn_clean_proposer_preparation_routine::<TSlotClock, TEthSpec>(
beacon_chain.slot_clock.clone(),
);
}
}

Expand Down
151 changes: 143 additions & 8 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,7 +19,7 @@ use tokio::{
sync::{Mutex, MutexGuard},
time::{sleep, sleep_until, Instant},
};
use types::ChainSpec;
use types::{ChainSpec, Epoch, ProposerPreparationData};

pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};

Expand All @@ -30,6 +31,16 @@ pub mod test_utils;
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;

/// A fee recipient address for use during block production. Only used as a very last resort if
/// there is no address provided by the user.
///
/// ## Note
///
/// This is *not* the zero-address, since Geth has been known to return errors for a coinbase of
/// 0x00..00.
const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];

#[derive(Debug)]
pub enum Error {
NoEngines,
Expand All @@ -46,9 +57,16 @@ impl From<ApiError> for Error {
}
}

#[derive(Clone)]
pub struct ProposerPreparationDataEntry {
update_epoch: Epoch,
preparation_data: ProposerPreparationData,
}

struct Inner {
engines: Engines<HttpJsonRpc>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
executor: TaskExecutor,
log: Logger,
Expand Down Expand Up @@ -96,6 +114,7 @@ impl ExecutionLayer {
log: log.clone(),
},
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
Expand All @@ -116,17 +135,18 @@ impl ExecutionLayer {
&self.inner.executor
}

fn suggested_fee_recipient(&self) -> Result<Address, Error> {
self.inner
.suggested_fee_recipient
.ok_or(Error::FeeRecipientUnspecified)
}

/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
self.inner.execution_blocks.lock().await
}

/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn proposer_preparation_data(
&self,
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationDataEntry>> {
self.inner.proposer_preparation_data.lock().await
}

fn log(&self) -> &Logger {
&self.inner.log
}
Expand Down Expand Up @@ -234,11 +254,124 @@ impl ExecutionLayer {
self.engines().upcheck_not_synced(Logging::Disabled).await;
}

/// Spawns a routine which cleans the cached proposer preparations periodically.
pub fn spawn_clean_proposer_preparation_routine<S: SlotClock + 'static, T: EthSpec>(
&self,
slot_clock: S,
) {
let preparation_cleaner = |el: ExecutionLayer| async move {
// Start the loop to periodically clean proposer preparation cache.
loop {
if let Some(duration_to_next_epoch) =
slot_clock.duration_to_next_epoch(T::slots_per_epoch())
{
// Wait for next epoch
sleep(duration_to_next_epoch).await;

match slot_clock
.now()
.map(|slot| slot.epoch(T::slots_per_epoch()))
{
Some(current_epoch) => el
.clean_proposer_preparation(current_epoch)
.await
.map_err(|e| {
error!(
el.log(),
"Failed to clean proposer preparation cache";
"error" => format!("{:?}", e)
)
})
.unwrap_or(()),
None => error!(el.log(), "Failed to get current epoch from slot clock"),
}
} else {
error!(el.log(), "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot and retry.
sleep(slot_clock.slot_duration()).await;
}
}
};

self.spawn(preparation_cleaner, "exec_preparation_cleanup");
}

/// Returns `true` if there is at least one synced and reachable engine.
pub async fn is_synced(&self) -> bool {
self.engines().any_synced().await
}

/// Updates the proposer preparation data provided by validators
pub fn update_proposer_preparation_blocking(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
self.block_on_generic(|_| async move {
self.update_proposer_preparation(update_epoch, preparation_data)
.await
})?
}

/// Updates the proposer preparation data provided by validators
async fn update_proposer_preparation(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
for preparation_entry in preparation_data {
proposer_preparation_data.insert(
preparation_entry.validator_index,
ProposerPreparationDataEntry {
update_epoch,
preparation_data: preparation_entry.clone(),
},
);
}

Ok(())
}

/// Removes expired entries from cached proposer preparations
async fn clean_proposer_preparation(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;

// Keep all entries that have been updated in the last 2 epochs
let retain_epoch = current_epoch.saturating_sub(Epoch::new(2));
proposer_preparation_data.retain(|_validator_index, preparation_entry| {
preparation_entry.update_epoch >= retain_epoch
});

Ok(())
}

/// Returns the fee-recipient address that should be used to build a block
async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
if let Some(preparation_data_entry) =
self.proposer_preparation_data().await.get(&proposer_index)
{
// The values provided via the API have first priority.
preparation_data_entry.preparation_data.fee_recipient
} else if let Some(address) = self.inner.suggested_fee_recipient {
// If there has been no fee recipient provided via the API, but the BN has been provided
// with a global default address, use that.
address
} else {
// If there is no user-provided fee recipient, use a junk value and complain loudly.
crit!(
self.log(),
"Fee recipient unknown";
"msg" => "the suggested_fee_recipient was unknown during block production. \
a junk address was used, rewards were lost! \
check the --suggested-fee-recipient flag and VC configuration.",
"proposer_index" => ?proposer_index
);

Address::from_slice(&DEFAULT_SUGGESTED_FEE_RECIPIENT)
}
}

/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
Expand All @@ -254,8 +387,10 @@ impl ExecutionLayer {
timestamp: u64,
random: Hash256,
finalized_block_hash: Hash256,
proposer_index: u64,
) -> Result<ExecutionPayload<T>, Error> {
let suggested_fee_recipient = self.suggested_fee_recipient()?;
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;

debug!(
self.log(),
"Issuing engine_getPayload";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.await
.unwrap();

let validator_index = 0;
let payload = self
.el
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash)
.get_payload::<T>(
parent_hash,
timestamp,
random,
finalized_block_hash,
validator_index,
)
.await
.unwrap();
let block_hash = payload.block_hash;
Expand Down
54 changes: 51 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ForkName, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
EthSpec, ForkName, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SyncCommitteeMessage, SyncContributionData,
};
use version::{
add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection,
Expand Down Expand Up @@ -2186,6 +2186,53 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST validator/prepare_beacon_proposer
let post_validator_prepare_beacon_proposer = eth1_v1
.and(warp::path("validator"))
.and(warp::path("prepare_beacon_proposer"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(warp::addr::remote())
.and(log_filter.clone())
.and(warp::body::json())
.and_then(
|chain: Arc<BeaconChain<T>>,
client_addr: Option<SocketAddr>,
log: Logger,
preparation_data: Vec<ProposerPreparationData>| {
blocking_json_task(move || {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;

debug!(
log,
"Received proposer preparation data";
"count" => preparation_data.len(),
"client" => client_addr
.map(|a| a.to_string())
.unwrap_or_else(|| "unknown".to_string()),
);

execution_layer
.update_proposer_preparation_blocking(current_epoch, &preparation_data)
.map_err(|_e| {
warp_utils::reject::custom_bad_request(
"error processing proposer preparations".to_string(),
)
})?;

Ok(())
})
},
);

// POST validator/sync_committee_subscriptions
let post_validator_sync_committee_subscriptions = eth1_v1
.and(warp::path("validator"))
Expand Down Expand Up @@ -2710,6 +2757,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_validator_contribution_and_proofs.boxed())
.or(post_validator_beacon_committee_subscriptions.boxed())
.or(post_validator_sync_committee_subscriptions.boxed())
.or(post_validator_prepare_beacon_proposer.boxed())
.or(post_lighthouse_liveness.boxed())
.or(post_lighthouse_database_reconstruct.boxed())
.or(post_lighthouse_database_historical_blocks.boxed()),
Expand Down
Loading