Skip to content

Commit

Permalink
Implement API for sync committee indices
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Aug 3, 2021
1 parent 7b3a0a1 commit 84830a8
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 3 deletions.
23 changes: 23 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(pubkey_cache.get_index(pubkey))
}

/// Return the validator indices of all public keys fetched from an iterator.
///
/// If any public key doesn't belong to a known validator then an error will be returned.
/// We could consider relaxing this by returning `Vec<Option<usize>>` in future.
pub fn validator_indices<'a>(
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

validator_pubkeys
.map(|pubkey| {
pubkey_cache
.get_index(pubkey)
.map(|id| id as u64)
.ok_or(Error::ValidatorPubkeyUnknown(*pubkey))
})
.collect()
}

/// Returns the validator pubkey (if any) for the given validator index.
///
/// ## Notes
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub enum BeaconChainError {
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String),
ValidatorIndexUnknown(usize),
ValidatorPubkeyUnknown(PublicKeyBytes),
OpPoolError(OpPoolError),
NaiveAggregationError(NaiveAggregationError),
ObservedAttestationsError(ObservedAttestationsError),
Expand Down
60 changes: 58 additions & 2 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttesterSlashing, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock,
Attestation, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
};
Expand Down Expand Up @@ -685,6 +685,61 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// GET beacon/states/{state_id}/sync_committees?epoch
let get_beacon_state_sync_committees = beacon_states_path
.clone()
.and(warp::path("sync_committees"))
.and(warp::query::<api_types::SyncCommitteesQuery>())
.and(warp::path::end())
.and_then(
|state_id: StateId,
chain: Arc<BeaconChain<T>>,
query: api_types::SyncCommitteesQuery| {
blocking_json_task(move || {
let sync_committee = state_id.map_state(&chain, |state| {
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
state
.get_built_sync_committee(epoch, &chain.spec)
.map(|committee| committee.clone())
.map_err(|e| match e {
BeaconStateError::SyncCommitteeNotKnown { .. } => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} has no sync committee for epoch {}",
current_epoch, epoch
))
}
BeaconStateError::IncorrectStateVariant => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} is not activated for Altair",
current_epoch,
))
}
e => warp_utils::reject::beacon_state_error(e),
})
})?;

let validators = chain
.validator_indices(sync_committee.pubkeys.iter())
.map_err(warp_utils::reject::beacon_chain_error)?;

let validator_aggregates = validators
.chunks_exact(T::EthSpec::sync_subcommittee_size())
.map(|indices| api_types::SyncSubcommittee {
indices: indices.to_vec(),
})
.collect();

let response = api_types::SyncCommitteeByValidatorIndices {
validators,
validator_aggregates,
};

Ok(api_types::GenericResponse::from(response))
})
},
);

// GET beacon/headers
//
// Note: this endpoint only returns information about blocks in the canonical chain. Given that
Expand Down Expand Up @@ -2414,6 +2469,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_committees.boxed())
.or(get_beacon_state_sync_committees.boxed())
.or(get_beacon_headers.boxed())
.or(get_beacon_headers_block_id.boxed())
.or(get_beacon_block.boxed())
Expand Down
113 changes: 112 additions & 1 deletion beacon_node/http_api/tests/fork_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Tests for API behaviour across fork boundaries.
use crate::common::*;
use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig};
use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec};
use eth2::types::{StateId, SyncSubcommittee};
use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot};

type E = MinimalEthSpec;

Expand Down Expand Up @@ -192,3 +193,113 @@ async fn sync_contributions_across_fork_with_skip_slots() {
.await
.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sync_committee_indices_across_fork() {
let validator_count = E::sync_committee_size();
let fork_epoch = Epoch::new(8);
let spec = altair_spec(fork_epoch);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let harness = &tester.harness;
let client = &tester.client;

let all_validators = harness.get_all_validators();

// Flatten subcommittees into a single vec.
let flatten = |subcommittees: &[SyncSubcommittee]| -> Vec<u64> {
subcommittees
.iter()
.flat_map(|sub| sub.indices.iter().copied())
.collect()
};

// Prior to the fork the `sync_committees` endpoint should return a 400 error.
assert_eq!(
client
.get_beacon_states_sync_committees(StateId::Slot(Slot::new(0)), None)
.await
.unwrap_err()
.status()
.unwrap(),
400
);
assert_eq!(
client
.get_beacon_states_sync_committees(StateId::Head, Some(Epoch::new(0)))
.await
.unwrap_err()
.status()
.unwrap(),
400
);

// If there's a skip slot at the fork slot, the endpoint will return a 400 until a block is
// applied.
let fork_slot = fork_epoch.start_slot(E::slots_per_epoch());
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
let (_, state) = harness
.add_attested_block_at_slot(
fork_slot - 1,
genesis_state,
genesis_state_root,
&all_validators,
)
.unwrap();

harness.advance_slot();
assert_eq!(harness.get_current_slot(), fork_slot);

// Using the head state must fail.
assert_eq!(
client
.get_beacon_states_sync_committees(StateId::Head, Some(fork_epoch))
.await
.unwrap_err()
.status()
.unwrap(),
400
);

// In theory we could do a state advance and make this work, but to keep things simple I've
// avoided doing that for now.
assert_eq!(
client
.get_beacon_states_sync_committees(StateId::Slot(fork_slot), None)
.await
.unwrap_err()
.status()
.unwrap(),
400
);

// Once the head is updated it should be useable for requests, including in the next sync
// committee period.
let state_root = state.canonical_root();
harness
.add_attested_block_at_slot(fork_slot + 1, state, state_root, &all_validators)
.unwrap();

let current_period = fork_epoch.sync_committee_period(&spec).unwrap();
let next_period_epoch = spec.epochs_per_sync_committee_period * (current_period + 1);
assert!(next_period_epoch > fork_epoch);

for epoch in [
None,
Some(fork_epoch),
Some(fork_epoch + 1),
Some(next_period_epoch),
Some(next_period_epoch + 1),
] {
let committee = client
.get_beacon_states_sync_committees(StateId::Head, epoch)
.await
.unwrap()
.data;
assert_eq!(committee.validators.len(), E::sync_committee_size());

assert_eq!(
committee.validators,
flatten(&committee.validator_aggregates)
);
}
}
23 changes: 23 additions & 0 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,29 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}

/// `GET beacon/states/{state_id}/sync_committees?epoch`
pub async fn get_beacon_states_sync_committees(
&self,
state_id: StateId,
epoch: Option<Epoch>,
) -> Result<GenericResponse<SyncCommitteeByValidatorIndices>, Error> {
let mut path = self.eth_path(V1)?;

path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("states")
.push(&state_id.to_string())
.push("sync_committees");

if let Some(epoch) = epoch {
path.query_pairs_mut()
.append_pair("epoch", &epoch.to_string());
}

self.get(path).await
}

/// `GET beacon/states/{state_id}/validators/{validator_id}`
///
/// Returns `Ok(None)` on a 404 error.
Expand Down
19 changes: 19 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ pub struct CommitteesQuery {
pub epoch: Option<Epoch>,
}

#[derive(Serialize, Deserialize)]
pub struct SyncCommitteesQuery {
pub epoch: Option<Epoch>,
}

#[derive(Serialize, Deserialize)]
pub struct AttestationPoolQuery {
pub slot: Option<Slot>,
Expand All @@ -439,6 +444,20 @@ pub struct CommitteeData {
pub validators: Vec<u64>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SyncCommitteeByValidatorIndices {
#[serde(with = "serde_utils::quoted_u64_vec")]
pub validators: Vec<u64>,
pub validator_aggregates: Vec<SyncSubcommittee>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SyncSubcommittee {
#[serde(with = "serde_utils::quoted_u64_vec")]
pub indices: Vec<u64>,
}

#[derive(Serialize, Deserialize)]
pub struct HeadersQuery {
pub slot: Option<Slot>,
Expand Down

0 comments on commit 84830a8

Please sign in to comment.