Skip to content

Commit

Permalink
Store pubkey cache decompressed on disk (#5897)
Browse files Browse the repository at this point in the history
* Support uncompressed keys in crypto/bls

* Use uncompressed keys in cache

* Implement DB upgrade

* Implement downgrade

* More logging on v20 upgrade

* Revert "More logging on v20 upgrade"

This reverts commit cc5789b.

* Merge remote-tracking branch 'origin/unstable' into uncompressed-pubkeys

* Add a little more logging

* Merge remote-tracking branch 'origin/unstable' into uncompressed-pubkeys
  • Loading branch information
michaelsproul authored Jul 4, 2024
1 parent d9ad1f5 commit d84e3e3
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 25 deletions.
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod migration_schema_v17;
mod migration_schema_v18;
mod migration_schema_v19;
mod migration_schema_v20;
mod migration_schema_v21;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
Expand Down Expand Up @@ -87,6 +88,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v20::downgrade_from_v20::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(21)) => {
let ops = migration_schema_v21::upgrade_to_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(20)) => {
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub fn upgrade_to_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v19 to v20");

// Load a V15 op pool and transform it to V20.
let Some(PersistedOperationPoolV15::<T::EthSpec> {
attestations_v15,
Expand Down Expand Up @@ -52,6 +54,8 @@ pub fn downgrade_from_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Downgrading from v20 to v19");

// Load a V20 op pool and transform it to V15.
let Some(PersistedOperationPoolV20::<T::EthSpec> {
attestations,
Expand Down
83 changes: 83 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{
get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use types::{Hash256, PublicKey};

const LOG_EVERY: usize = 200_000;

pub fn upgrade_to_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v20 to v21");

let mut ops = vec![];

// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let pubkey = PublicKey::from_ssz_bytes(&value)?;
let decompressed = DatabasePubkey::from_pubkey(&pubkey);
ops.push(decompressed.as_kv_store_op(key));

if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Public key decompression in progress";
"keys_decompressed" => i
);
}
}
info!(log, "Public key decompression complete");

Ok(ops)
}

pub fn downgrade_from_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Downgrading from v21 to v20");

let mut ops = vec![];

// Iterate through all pubkeys and recompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let decompressed = DatabasePubkey::from_ssz_bytes(&value)?;
let (_, pubkey_bytes) = decompressed.as_pubkey().map_err(|e| Error::DBError {
message: format!("{e:?}"),
})?;

let db_key = get_key_for_col(DBColumn::PubkeyCache.into(), key.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
pubkey_bytes.as_ssz_bytes(),
));

if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Public key compression in progress";
"keys_compressed" => i
);
}
}

info!(log, "Public key compression complete");

Ok(ops)
}
58 changes: 38 additions & 20 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
use smallvec::SmallVec;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
Expand Down Expand Up @@ -49,14 +52,13 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
let mut pubkey_bytes = vec![];

for validator_index in 0.. {
if let Some(DatabasePubkey(pubkey)) =
if let Some(db_pubkey) =
store.get_item(&DatabasePubkey::key_for_index(validator_index))?
{
pubkeys.push((&pubkey).try_into().map_err(|e| {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
let (pk, pk_bytes) = DatabasePubkey::as_pubkey(&db_pubkey)?;
pubkeys.push(pk);
indices.insert(pk_bytes, validator_index);
pubkey_bytes.push(pk_bytes);
} else {
break;
}
Expand Down Expand Up @@ -104,29 +106,29 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.indices.reserve(validator_keys.len());

let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey in validator_keys {
for pubkey_bytes in validator_keys {
let i = self.pubkeys.len();

if self.indices.contains_key(&pubkey) {
if self.indices.contains_key(&pubkey_bytes) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}

let pubkey = (&pubkey_bytes)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;

// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
DatabasePubkey::from_pubkey(&pubkey)
.as_kv_store_op(DatabasePubkey::key_for_index(i)),
));

self.pubkeys.push(
(&pubkey)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.pubkey_bytes.push(pubkey);

self.indices.insert(pubkey, i);
self.pubkeys.push(pubkey);
self.pubkey_bytes.push(pubkey_bytes);
self.indices.insert(pubkey_bytes, i);
}

Ok(store_ops)
Expand Down Expand Up @@ -166,26 +168,42 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Wrapper for a public key stored in the database.
///
/// Keyed by the validator index as `Hash256::from_low_u64_be(index)`.
struct DatabasePubkey(PublicKeyBytes);
#[derive(Encode, Decode)]
pub struct DatabasePubkey {
pubkey: SmallVec<[u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN]>,
}

impl StoreItem for DatabasePubkey {
fn db_column() -> DBColumn {
DBColumn::PubkeyCache
}

fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?))
Ok(Self::from_ssz_bytes(bytes)?)
}
}

impl DatabasePubkey {
fn key_for_index(index: usize) -> Hash256 {
Hash256::from_low_u64_be(index as u64)
}

pub fn from_pubkey(pubkey: &PublicKey) -> Self {
Self {
pubkey: pubkey.serialize_uncompressed().into(),
}
}

pub fn as_pubkey(&self) -> Result<(PublicKey, PublicKeyBytes), BeaconChainError> {
let pubkey = PublicKey::deserialize_uncompressed(&self.pubkey)
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
let pubkey_bytes = pubkey.compress();
Ok((pubkey, pubkey_bytes))
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};

pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(20);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(21);

// All the keys that get stored under the `BeaconMeta` column.
//
Expand Down
24 changes: 24 additions & 0 deletions crypto/bls/src/generic_public_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use tree_hash::TreeHash;
/// The byte-length of a BLS public key when serialized in compressed form.
pub const PUBLIC_KEY_BYTES_LEN: usize = 48;

/// The byte-length of a BLS public key when serialized in uncompressed form.
pub const PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN: usize = 96;

/// Represents the public key at infinity.
pub const INFINITY_PUBLIC_KEY: [u8; PUBLIC_KEY_BYTES_LEN] = [
0xc0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
Expand All @@ -23,8 +26,17 @@ pub trait TPublicKey: Sized + Clone {
/// Serialize `self` as compressed bytes.
fn serialize(&self) -> [u8; PUBLIC_KEY_BYTES_LEN];

/// Serialize `self` as uncompressed bytes.
fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN];

/// Deserialize `self` from compressed bytes.
fn deserialize(bytes: &[u8]) -> Result<Self, Error>;

/// Deserialize `self` from uncompressed bytes.
///
/// This function *does not* perform thorough checks of the input bytes and should only be
/// used with bytes output from `Self::serialize_uncompressed`.
fn deserialize_uncompressed(bytes: &[u8]) -> Result<Self, Error>;
}

/// A BLS public key that is generic across some BLS point (`Pub`).
Expand Down Expand Up @@ -65,6 +77,11 @@ where
self.point.serialize()
}

/// Serialize `self` as uncompressed bytes.
pub fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] {
self.point.serialize_uncompressed()
}

/// Deserialize `self` from compressed bytes.
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
if bytes == &INFINITY_PUBLIC_KEY[..] {
Expand All @@ -75,6 +92,13 @@ where
})
}
}

/// Deserialize `self` from compressed bytes.
pub fn deserialize_uncompressed(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self {
point: Pub::deserialize_uncompressed(bytes)?,
})
}
}

impl<Pub: TPublicKey> Eq for GenericPublicKey<Pub> {}
Expand Down
23 changes: 21 additions & 2 deletions crypto/bls/src/impls/blst.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{
generic_aggregate_public_key::TAggregatePublicKey,
generic_aggregate_signature::TAggregateSignature,
generic_public_key::{GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN},
generic_public_key::{
GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN,
},
generic_secret_key::TSecretKey,
generic_signature::{TSignature, SIGNATURE_BYTES_LEN},
Error, Hash256, ZeroizeHash, INFINITY_SIGNATURE,
BlstError, Error, Hash256, ZeroizeHash, INFINITY_SIGNATURE,
};
pub use blst::min_pk as blst_core;
use blst::{blst_scalar, BLST_ERROR};
Expand Down Expand Up @@ -121,6 +123,10 @@ impl TPublicKey for blst_core::PublicKey {
self.compress()
}

fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] {
blst_core::PublicKey::serialize(self)
}

fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
// key_validate accepts uncompressed bytes too so enforce byte length here.
// It also does subgroup checks, noting infinity check is done in `generic_public_key.rs`.
Expand All @@ -132,6 +138,19 @@ impl TPublicKey for blst_core::PublicKey {
}
Self::key_validate(bytes).map_err(Into::into)
}

fn deserialize_uncompressed(bytes: &[u8]) -> Result<Self, Error> {
if bytes.len() != PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN {
return Err(Error::InvalidByteLength {
got: bytes.len(),
expected: PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN,
});
}
// Ensure we use the `blst` function rather than the one from this trait.
let result: Result<Self, BlstError> = Self::deserialize(bytes);
let key = result?;
Ok(key)
}
}

/// A wrapper that allows for `PartialEq` and `Clone` impls.
Expand Down
12 changes: 11 additions & 1 deletion crypto/bls/src/impls/fake_crypto.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
generic_aggregate_public_key::TAggregatePublicKey,
generic_aggregate_signature::TAggregateSignature,
generic_public_key::{GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN},
generic_public_key::{
GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN,
},
generic_secret_key::{TSecretKey, SECRET_KEY_BYTES_LEN},
generic_signature::{TSignature, SIGNATURE_BYTES_LEN},
Error, Hash256, ZeroizeHash, INFINITY_PUBLIC_KEY, INFINITY_SIGNATURE,
Expand Down Expand Up @@ -46,11 +48,19 @@ impl TPublicKey for PublicKey {
self.0
}

fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] {
panic!("fake_crypto does not support uncompressed keys")
}

fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
let mut pubkey = Self::infinity();
pubkey.0[..].copy_from_slice(&bytes[0..PUBLIC_KEY_BYTES_LEN]);
Ok(pubkey)
}

fn deserialize_uncompressed(_: &[u8]) -> Result<Self, Error> {
panic!("fake_crypto does not support uncompressed keys")
}
}

impl Eq for PublicKey {}
Expand Down
4 changes: 3 additions & 1 deletion crypto/bls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ mod zeroize_hash;

pub mod impls;

pub use generic_public_key::{INFINITY_PUBLIC_KEY, PUBLIC_KEY_BYTES_LEN};
pub use generic_public_key::{
INFINITY_PUBLIC_KEY, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN,
};
pub use generic_secret_key::SECRET_KEY_BYTES_LEN;
pub use generic_signature::{INFINITY_SIGNATURE, SIGNATURE_BYTES_LEN};
pub use get_withdrawal_credentials::get_withdrawal_credentials;
Expand Down
Loading

0 comments on commit d84e3e3

Please sign in to comment.