Skip to content

Commit

Permalink
Merge pull request #14 from chainbound/nico/chore/delegations-vec
Browse files Browse the repository at this point in the history
feat(api): save delegations/revocations in batch
  • Loading branch information
thedevbirb authored Oct 17, 2024
2 parents 5898d70 + 0112adc commit 95d8c04
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 101 deletions.
108 changes: 54 additions & 54 deletions crates/api/src/constraints/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ethereum_consensus::signing::verify_signature;
use helix_utils::signing::{verify_signed_message, COMMIT_BOOST_DOMAIN};
use tracing::{info, warn, error};
use uuid::Uuid;
use std::{sync::Arc, time::{SystemTime, UNIX_EPOCH}};
use std::{collections::HashSet, sync::Arc, time::{SystemTime, UNIX_EPOCH}};
use tokio::{sync::broadcast, time::Instant};

use crate::constraints::error::ConstraintsApiError;
Expand Down Expand Up @@ -200,42 +200,42 @@ where
let body = req.into_body();
let body_bytes = to_bytes(body, MAX_REQUEST_LENGTH).await?;

// Decode the incoming request body into a `SignedDelegation`.
let mut signed_delegation = match serde_json::from_slice::<SignedDelegation>(&body_bytes) {
Ok(delegation) => match delegation.message.action {
DELEGATION_ACTION => delegation,
other => {
warn!(request_id = %request_id, action = other, "Invalid delegation action. expected 0");
// Decode the incoming request body into a `SignedDelegation` array.
let signed_delegations = match serde_json::from_slice::<Vec<SignedDelegation>>(&body_bytes) {
Ok(delegations) => {
let action = delegations.iter().map(|d| d.message.action).collect::<HashSet<_>>();
let are_all_actions_delegations = action.len() == 1 && action.contains(&DELEGATION_ACTION);
if !are_all_actions_delegations {
warn!(request_id = %request_id, actions = ?action, "Invalid delegations action. expected 0");
return Err(ConstraintsApiError::InvalidDelegation)
},
},
}
delegations
}
Err(e) => {
warn!(err = ?e, request_id = %request_id, "Failed to decode delegation");
warn!(err = ?e, request_id = %request_id, "Failed to decode delegations");
return Err(ConstraintsApiError::InvalidDelegation)
},
};
trace.decode = get_nanos_timestamp()?;

// Verify the delegation signature
if let Err(e) = verify_signed_message(
&signed_delegation.message.digest(),
&signed_delegation.signature,
&signed_delegation.message.validator_pubkey,
COMMIT_BOOST_DOMAIN,
&api.chain_info.context,
) {
warn!(err = ?e, request_id = %request_id, "Invalid delegation signature");
return Err(ConstraintsApiError::InvalidSignature);
};

for delegation in &signed_delegations {
if let Err(e) = verify_signed_message(
&delegation.message.digest(),
&delegation.signature,
&delegation.message.validator_pubkey,
COMMIT_BOOST_DOMAIN,
&api.chain_info.context,
) {
warn!(err = ?e, request_id = %request_id, "Invalid delegation signature");
return Err(ConstraintsApiError::InvalidSignature);
};
}
trace.verify_signature = get_nanos_timestamp()?;

// Store the delegation in the database
tokio::spawn( async move {
if let Err(err) = api.auctioneer.save_validator_delegation(signed_delegation).await {
error!(
error = %err,
"Failed to save delegation",
)
tokio::spawn(async move {
if let Err(err) = api.auctioneer.save_validator_delegations(signed_delegations).await {
error!(error = %err, "Failed to save delegations");
}
});

Expand Down Expand Up @@ -275,42 +275,42 @@ where
let body = req.into_body();
let body_bytes = to_bytes(body, MAX_REQUEST_LENGTH).await?;

// Decode the incoming request body into a `SignedRevocation`.
let mut signed_revocation = match serde_json::from_slice::<SignedRevocation>(&body_bytes) {
Ok(revocation ) => match revocation.message.action {
REVOCATION_ACTION => revocation,
other => {
warn!(request_id = %request_id, action = other, "Invalid revocation action. expected 1");
// Decode the incoming request body into a `SignedRevocation` array.
let signed_revocations = match serde_json::from_slice::<Vec<SignedRevocation>>(&body_bytes) {
Ok(revocations) => {
let action = revocations.iter().map(|r| r.message.action).collect::<HashSet<_>>();
let are_all_actions_revocations = action.len() == 1 && action.contains(&REVOCATION_ACTION);
if !are_all_actions_revocations {
warn!(request_id = %request_id, actions = ?action, "Invalid revocation action. expected 1");
return Err(ConstraintsApiError::InvalidRevocation)
},
},
}
revocations
}
Err(e) => {
warn!(err = ?e, request_id = %request_id, "Failed to decode revocation");
return Err(ConstraintsApiError::InvalidRevocation)
},
};
trace.decode = get_nanos_timestamp()?;

// Verify the revocation signature
if let Err(e) = verify_signed_message(
&signed_revocation.message.digest(),
&signed_revocation.signature,
&signed_revocation.message.validator_pubkey,
COMMIT_BOOST_DOMAIN,
&api.chain_info.context,
) {
warn!(err = ?e, request_id = %request_id, "Invalid revocation signature");
return Err(ConstraintsApiError::InvalidSignature);
};

for revocation in &signed_revocations {
if let Err(e) = verify_signed_message(
&revocation.message.digest(),
&revocation.signature,
&revocation.message.validator_pubkey,
COMMIT_BOOST_DOMAIN,
&api.chain_info.context,
) {
warn!(err = ?e, request_id = %request_id, "Invalid revocation signature");
return Err(ConstraintsApiError::InvalidSignature);
};
}
trace.verify_signature = get_nanos_timestamp()?;

// Store the delegation in the database
tokio::spawn( async move {
if let Err(err) = api.auctioneer.revoke_validator_delegation(signed_revocation).await {
error!(
error = %err,
"Failed to do revocation",
)
tokio::spawn(async move {
if let Err(err) = api.auctioneer.revoke_validator_delegations(signed_revocations).await {
error!(error = %err, "Failed to do revocation");
}
});

Expand Down
8 changes: 4 additions & 4 deletions crates/datastore/src/auctioneer/mock_auctioneer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ impl Auctioneer for MockAuctioneer {
Ok(vec![])
}

async fn save_validator_delegation(
async fn save_validator_delegations(
&self,
_signed_delegation: SignedDelegation,
_signed_delegations: Vec<SignedDelegation>,
) -> Result<(), AuctioneerError> {
Ok(())
}

async fn revoke_validator_delegation(
async fn revoke_validator_delegations(
&self,
_signed_revocation: SignedRevocation,
_signed_revocations: Vec<SignedRevocation>,
) -> Result<(), AuctioneerError> {
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions crates/datastore/src/auctioneer/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ pub trait Auctioneer: Send + Sync + Clone {
pub_key: BlsPublicKey,
) -> Result<Vec<SignedDelegation>, AuctioneerError>;

async fn save_validator_delegation(
async fn save_validator_delegations(
&self,
signed_delegation: SignedDelegation,
signed_delegations: Vec<SignedDelegation>,
) -> Result<(), AuctioneerError>;

async fn revoke_validator_delegation(
async fn revoke_validator_delegations(
&self,
signed_revocation: SignedRevocation,
signed_revocations: Vec<SignedRevocation>,
) -> Result<(), AuctioneerError>;

async fn save_constraints(
Expand Down
81 changes: 42 additions & 39 deletions crates/datastore/src/redis/redis_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,53 +543,56 @@ impl Auctioneer for RedisCache {
Ok(delegations.unwrap_or_default())
}

async fn save_validator_delegation(
async fn save_validator_delegations(
&self,
signed_delegation: SignedDelegation,
signed_delegations: Vec<SignedDelegation>,
) -> Result<(), AuctioneerError> {
let key = get_delegations_key(&signed_delegation.message.validator_pubkey);

// Attempt to get the existing delegations from the cache.
let delegations: Option<Vec<SignedDelegation>> =
self.get(&key).await.map_err(AuctioneerError::RedisError)?;

// Append the new delegation to the existing delegations or create a new Vec if none exist.
let mut all_delegations = match delegations {
Some(mut delegations) => {
delegations.push(signed_delegation);
delegations
}
None => Vec::from([signed_delegation]),
};
for signed_delegation in signed_delegations {
let key = get_delegations_key(&signed_delegation.message.validator_pubkey);

// Attempt to get the existing delegations from the cache.
let delegations: Option<Vec<SignedDelegation>> =
self.get(&key).await.map_err(AuctioneerError::RedisError)?;

// Append the new delegation to the existing delegations or create a new Vec if none exist.
let mut all_delegations = match delegations {
Some(mut delegations) => {
delegations.push(signed_delegation);
delegations
}
None => Vec::from([signed_delegation]),
};

// Save the updated delegations back to the cache.
self.set(&key, &all_delegations, None).await.map_err(AuctioneerError::RedisError)?;
}

// Save the updated delegations back to the cache.
self.set(&key, &all_delegations, None).await.map_err(AuctioneerError::RedisError)
Ok(())
}

async fn revoke_validator_delegation(
async fn revoke_validator_delegations(
&self,
signed_revocation: SignedRevocation,
signed_revocations: Vec<SignedRevocation>,
) -> Result<(), AuctioneerError> {
let key = get_delegations_key(&signed_revocation.message.validator_pubkey);

// Attempt to get the existing delegations from the cache.
let delegations: Option<Vec<SignedDelegation>> =
self.get(&key).await.map_err(AuctioneerError::RedisError)?;

// Filter out the revoked delegation.
let updated_delegations = match delegations {
Some(mut delegations) => {
delegations.retain(|delegation| {
delegation.message.delegatee_pubkey !=
signed_revocation.message.delegatee_pubkey
});
delegations
}
None => Vec::new(),
};
for signed_revocation in &signed_revocations {
let key = get_delegations_key(&signed_revocation.message.validator_pubkey);

// Attempt to get the existing delegations from the cache.
let mut delegations: Vec<SignedDelegation> =
self.get(&key).await.map_err(AuctioneerError::RedisError)?.unwrap_or_default();

// Filter out the revoked delegation.
let updated_delegations = delegations.retain(|delegation| {
signed_revocations.iter().all(|revocation| {
delegation.message.delegatee_pubkey != revocation.message.delegatee_pubkey
})
});

// Save the updated delegations back to the cache.
self.set(&key, &updated_delegations, None).await.map_err(AuctioneerError::RedisError)?;
}

// Save the updated delegations back to the cache.
self.set(&key, &updated_delegations, None).await.map_err(AuctioneerError::RedisError)
Ok(())
}

async fn save_constraints(
Expand Down

0 comments on commit 95d8c04

Please sign in to comment.