Skip to content

Commit

Permalink
Merge pull request #453 from Cerebellum-Network/fix/dac-aggregates-do…
Browse files Browse the repository at this point in the history
…uble-spending

FIX: Preventing double spending in case of inconsistent aggregates for the same aggregation key
  • Loading branch information
yahortsaryk authored Oct 23, 2024
2 parents 8f25ae2 + 94ae205 commit 659d2a8
Show file tree
Hide file tree
Showing 3 changed files with 495 additions and 270 deletions.
183 changes: 115 additions & 68 deletions pallets/ddc-verification/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,31 +752,30 @@ pub mod pallet {
number_of_gets: u64,
}

/// The `ConsolidatedAggregate` struct represents a merging result of multiple aggregates
/// that have reached consensus on the usage criteria. This result should be taken into
/// consideration when choosing the intensity of the challenge.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ConsistentGroup<A: Aggregate>(pub ActivityHash, pub Vec<A>);
impl<A: Aggregate> ConsistentGroup<A> {
pub fn hash(&self) -> ActivityHash {
self.0
}

pub fn get(&self, idx: usize) -> Option<&A> {
self.1.get(idx)
}

pub fn len(&self) -> usize {
self.1.len()
}
pub(crate) struct ConsolidatedAggregate<A: Aggregate> {
/// The representative aggregate after consolidation
pub(crate) aggregate: A,
/// Number of aggregates that were consistent
pub(crate) count: u16,
/// Aggregators that provided consistent aggregates
pub(crate) aggregators: Vec<AggregatorInfo>,
}

pub fn _items(&self) -> &Vec<A> {
&self.1
impl<A: Aggregate> ConsolidatedAggregate<A> {
pub(crate) fn new(aggregate: A, count: u16, aggregators: Vec<AggregatorInfo>) -> Self {
ConsolidatedAggregate { aggregate, count, aggregators }
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ConsistentGroups<A: Aggregate> {
pub in_consensus: Vec<ConsistentGroup<A>>,
pub in_quorum: Vec<ConsistentGroup<A>>,
pub in_others: Vec<ConsistentGroup<A>>,
pub(crate) struct ConsistencyGroups<A: Aggregate> {
pub(crate) consensus: Vec<ConsolidatedAggregate<A>>,
pub(crate) quorum: Vec<ConsolidatedAggregate<A>>,
pub(crate) others: Vec<ConsolidatedAggregate<A>>,
}

#[derive(Debug, Clone, Encode, Decode, TypeInfo, PartialEq)]
Expand All @@ -785,12 +784,18 @@ pub mod pallet {
BucketSubAggregateKey(BucketId, String),
}

/// The 'Aggregate' trait defines a set of members common to activity aggregates, which reflect
/// the usage of a node or bucket within an Era..
pub(crate) trait Aggregate:
Clone + Ord + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Debug
{
/// Hash of the aggregate that is defined by it 'usage' values
fn hash<T: Config>(&self) -> ActivityHash;
/// Aggregation key of this aggregate, i.e. bucket composite key or node key
fn get_key(&self) -> AggregateKey;
/// Number of activity records this aggregated by this aggregate
fn get_number_of_leaves(&self) -> u64;
/// Aggregator provided this aggregate
fn get_aggregator(&self) -> AggregatorInfo;
}

Expand Down Expand Up @@ -1625,33 +1630,44 @@ pub mod pallet {
pub(crate) fn get_total_usage<A: Aggregate>(
cluster_id: &ClusterId,
era_id: DdcEra,
consistent_groups: ConsistentGroups<A>,
consistency_groups: ConsistencyGroups<A>,
) -> Result<Vec<A>, Vec<OCWError>> {
let mut total_usage: Vec<A> = vec![];
let mut total_usage = vec![];
let mut total_usage_keys = vec![];

// todo: run a light challenge for unanimous consensus
let in_consensus_usage = consistent_groups
.in_consensus
// todo: implement 'challenge_consensus' fn and run a light challenge for unanimous
// consensus
let in_consensus_usage = consistency_groups
.consensus
.clone()
.into_iter()
.map(|g| g.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned())
.collect::<Result<Vec<_>, _>>()?;
total_usage.extend(in_consensus_usage);

// todo: run a light challenge for quorum, i.e. for majority
let in_quorum_usage = consistent_groups
.in_quorum
.map(|ca| ca.aggregate.clone())
.collect::<Vec<_>>();
total_usage.extend(in_consensus_usage.clone());
total_usage_keys
.extend(in_consensus_usage.into_iter().map(|a| a.get_key()).collect::<Vec<_>>());

// todo: implement 'challenge_quorum' fn and run a light challenge for the quorum, i.e.
// for majority
let in_quorum_usage = consistency_groups
.quorum
.clone()
.into_iter()
.map(|g| g.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned())
.collect::<Result<Vec<_>, _>>()?;
total_usage.extend(in_quorum_usage);
.map(|ca| ca.aggregate.clone())
.collect::<Vec<_>>();
total_usage.extend(in_quorum_usage.clone());
total_usage_keys
.extend(in_quorum_usage.into_iter().map(|a| a.get_key()).collect::<Vec<_>>());

let verified_usage =
Self::challenge_others(cluster_id, era_id, consistent_groups.in_others)?;
let verified_usage = Self::challenge_others(
cluster_id,
era_id,
consistency_groups,
&mut total_usage_keys,
)?;

if !verified_usage.is_empty() {
total_usage.extend(verified_usage);
total_usage.extend(verified_usage.clone());
}

Ok(total_usage)
Expand All @@ -1660,36 +1676,51 @@ pub mod pallet {
pub(crate) fn challenge_others<A: Aggregate>(
_cluster_id: &ClusterId,
_era_id: DdcEra,
others: Vec<ConsistentGroup<A>>,
consistency_groups: ConsistencyGroups<A>,
accepted_keys: &mut Vec<AggregateKey>,
) -> Result<Vec<A>, Vec<OCWError>> {
let redundancy_factor = T::DAC_REDUNDANCY_FACTOR;
let mut verified_usage: Vec<A> = vec![];

for group in others {
if group.len() > redundancy_factor.into() {
log::info!(
"⚠️ Number of consistent aggregates exceeds the redundancy factor {:?}",
group.hash()
for consolidated_aggregate in consistency_groups.others {
let aggregate_key = consolidated_aggregate.aggregate.get_key();

if accepted_keys.contains(&aggregate_key) {
log::warn!(
"⚠️ The aggregate {:?} is inconsistent between aggregators.",
aggregate_key
);

let excessive_aggregate =
group.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()?;
// This prevents the double spending in case of inconsistencies between
// aggregators for the same aggregation key
continue;
}

if consolidated_aggregate.count > redundancy_factor {
let excessive_aggregate = consolidated_aggregate.aggregate.clone();

log::warn!(
"⚠️ Number of consistent aggregates with key {:?} exceeds the redundancy factor",
aggregate_key
);

log::info!(
"🔎‍ Challenging excessive aggregate {:?}",
"🔎‍ Challenging excessive aggregate with key {:?} and hash {:?}",
aggregate_key,
excessive_aggregate.hash::<T>()
);

// todo: run a challenge dedicated to the excessive number of aggregates.
// we assume it won't happen at the moment, so we just take the aggregate to
// payouts stage
verified_usage.push(excessive_aggregate);
accepted_keys.push(aggregate_key);
} else {
let defective_aggregate =
group.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()?;
let defective_aggregate = consolidated_aggregate.aggregate.clone();

log::info!(
"🔎‍ Challenging defective aggregate {:?}",
"🔎‍ Challenging defective aggregate with key {:?} and hash {:?}",
aggregate_key,
defective_aggregate.hash::<T>()
);

Expand All @@ -1701,6 +1732,7 @@ pub mod pallet {
// we assume all aggregates are valid at the moment, so we just take the
// aggregate to payouts stage
verified_usage.push(defective_aggregate);
accepted_keys.push(aggregate_key);
}
}
}
Expand Down Expand Up @@ -1934,7 +1966,7 @@ pub mod pallet {
buckets_aggregates_by_aggregator: Vec<(AggregatorInfo, Vec<BucketAggregateResponse>)>,
redundancy_factor: u16,
quorum: Percent,
) -> ConsistentGroups<BucketSubAggregate> {
) -> ConsistencyGroups<BucketSubAggregate> {
let mut buckets_sub_aggregates: Vec<BucketSubAggregate> = Vec::new();

log::info!(
Expand Down Expand Up @@ -1967,9 +1999,9 @@ pub mod pallet {
let buckets_sub_aggregates_groups =
Self::group_by_consistency(buckets_sub_aggregates, redundancy_factor, quorum);

log::info!("🏠🌕 Bucket Sub-Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_consensus);
log::info!("🏠🌗 Bucket Sub-Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_quorum);
log::info!("🏠🌘 Bucket Sub-Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_others);
log::info!("🏠🌕 Bucket Sub-Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.consensus);
log::info!("🏠🌗 Bucket Sub-Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.quorum);
log::info!("🏠🌘 Bucket Sub-Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.others);

buckets_sub_aggregates_groups
}
Expand Down Expand Up @@ -2926,7 +2958,7 @@ pub mod pallet {
nodes_aggregates_by_aggregator: Vec<(AggregatorInfo, Vec<NodeAggregateResponse>)>,
redundancy_factor: u16,
quorum: Percent,
) -> ConsistentGroups<NodeAggregate> {
) -> ConsistencyGroups<NodeAggregate> {
let mut nodes_aggregates: Vec<NodeAggregate> = Vec::new();

log::info!(
Expand Down Expand Up @@ -2954,9 +2986,9 @@ pub mod pallet {
let nodes_aggregates_groups =
Self::group_by_consistency(nodes_aggregates, redundancy_factor, quorum);

log::info!("🏠🌕 Node Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_consensus);
log::info!("🏠🌗 Node Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_quorum);
log::info!("🏠🌘 Node Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_others);
log::info!("🏠🌕 Node Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.consensus);
log::info!("🏠🌗 Node Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.quorum);
log::info!("🏠🌘 Node Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.others);

nodes_aggregates_groups
}
Expand All @@ -2965,7 +2997,7 @@ pub mod pallet {
aggregates: Vec<A>,
redundancy_factor: u16,
quorum: Percent,
) -> ConsistentGroups<A>
) -> ConsistencyGroups<A>
where
A: Aggregate + Clone,
{
Expand All @@ -2978,24 +3010,39 @@ pub mod pallet {
.push(aggregate.clone());
}

let mut in_consensus = Vec::new();
let mut in_quorum = Vec::new();
let mut in_others = Vec::new();
let mut consensus_group = Vec::new();
let mut quorum_group = Vec::new();
let mut others_group = Vec::new();

let max_aggregates_count = redundancy_factor;
let quorum_threshold = quorum * max_aggregates_count;

for (hash, group) in consistent_aggregates {
if group.len() == usize::from(max_aggregates_count) {
in_consensus.push(ConsistentGroup(hash, group));
} else if group.len() >= quorum_threshold.into() {
in_quorum.push(ConsistentGroup(hash, group));
for (_hash, group) in consistent_aggregates {
let aggregate = group.first().unwrap();
let aggregates_count = u16::try_from(group.len()).unwrap_or(u16::MAX);
let aggregators: Vec<AggregatorInfo> =
group.clone().into_iter().map(|a| a.get_aggregator()).collect();

let consolidated_aggregate = ConsolidatedAggregate::<A>::new(
aggregate.clone(),
aggregates_count,
aggregators,
);

if aggregates_count == max_aggregates_count {
consensus_group.push(consolidated_aggregate);
} else if aggregates_count >= quorum_threshold {
quorum_group.push(consolidated_aggregate);
} else {
in_others.push(ConsistentGroup(hash, group));
others_group.push(consolidated_aggregate);
}
}

ConsistentGroups { in_consensus, in_quorum, in_others }
ConsistencyGroups {
consensus: consensus_group,
quorum: quorum_group,
others: others_group,
}
}

/// Fetch cluster to validate.
Expand Down
Loading

0 comments on commit 659d2a8

Please sign in to comment.