From 3e1a037c18607231bbbae6b56179387deac7f10a Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Tue, 22 Oct 2024 21:53:46 +0200 Subject: [PATCH 1/5] feat: consolidating consistent aggregates --- pallets/ddc-verification/src/lib.rs | 92 +++++----- pallets/ddc-verification/src/tests.rs | 241 ++++++++++---------------- runtime/cere-dev/src/lib.rs | 2 +- 3 files changed, 138 insertions(+), 197 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 92ffacce3..70cde5f7f 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -753,30 +753,23 @@ pub mod pallet { } #[derive(Debug, Clone, PartialEq)] - pub(crate) struct ConsistentGroup(pub ActivityHash, pub Vec); - impl ConsistentGroup { - pub fn hash(&self) -> ActivityHash { - self.0 - } - - pub fn get(&self, idx: usize) -> Option<&A> { - self.1.get(idx) - } - - pub fn len(&self) -> usize { - self.1.len() - } + pub(crate) struct ConsolidatedAggregate { + pub(crate) aggregate: A, + pub(crate) count: u16, + pub(crate) aggregators: Vec, + } - pub fn _items(&self) -> &Vec { - &self.1 + impl ConsolidatedAggregate { + pub(crate) fn new(aggregate: A, count: u16, aggregators: Vec) -> Self { + ConsolidatedAggregate { aggregate, count, aggregators } } } #[derive(Debug, Clone, PartialEq)] - pub(crate) struct ConsistentGroups { - pub in_consensus: Vec>, - pub in_quorum: Vec>, - pub in_others: Vec>, + pub(crate) struct ConsistencyGroups { + pub(crate) in_consensus: Vec>, + pub(crate) in_quorum: Vec>, + pub(crate) in_others: Vec>, } #[derive(Debug, Clone, Encode, Decode, TypeInfo, PartialEq)] @@ -1625,30 +1618,30 @@ pub mod pallet { pub(crate) fn get_total_usage( cluster_id: &ClusterId, era_id: DdcEra, - consistent_groups: ConsistentGroups, + consistency_groups: ConsistencyGroups, ) -> Result, Vec> { let mut total_usage: Vec = vec![]; // todo: run a light challenge for unanimous consensus - let in_consensus_usage = consistent_groups + let in_consensus_usage = consistency_groups .in_consensus .clone() .into_iter() - .map(|g| g.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()) - .collect::, _>>()?; + .map(|a| a.aggregate.clone()) + .collect::>(); total_usage.extend(in_consensus_usage); // todo: run a light challenge for quorum, i.e. for majority - let in_quorum_usage = consistent_groups + let in_quorum_usage = consistency_groups .in_quorum .clone() .into_iter() - .map(|g| g.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()) - .collect::, _>>()?; + .map(|a| a.aggregate.clone()) + .collect::>(); total_usage.extend(in_quorum_usage); let verified_usage = - Self::challenge_others(cluster_id, era_id, consistent_groups.in_others)?; + Self::challenge_others(cluster_id, era_id, consistency_groups.in_others)?; if !verified_usage.is_empty() { total_usage.extend(verified_usage); @@ -1660,21 +1653,20 @@ pub mod pallet { pub(crate) fn challenge_others( _cluster_id: &ClusterId, _era_id: DdcEra, - others: Vec>, + others: Vec>, ) -> Result, Vec> { let redundancy_factor = T::DAC_REDUNDANCY_FACTOR; let mut verified_usage: Vec = vec![]; for group in others { - if group.len() > redundancy_factor.into() { + if group.count > redundancy_factor { + let excessive_aggregate = group.aggregate.clone(); + log::info!( "⚠️ Number of consistent aggregates exceeds the redundancy factor {:?}", - group.hash() + excessive_aggregate.hash::() ); - let excessive_aggregate = - group.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()?; - log::info!( "🔎‍ Challenging excessive aggregate {:?}", excessive_aggregate.hash::() @@ -1685,8 +1677,7 @@ pub mod pallet { // payouts stage verified_usage.push(excessive_aggregate); } else { - let defective_aggregate = - group.get(0).ok_or(vec![OCWError::EmptyConsistentGroup]).cloned()?; + let defective_aggregate = group.aggregate.clone(); log::info!( "🔎‍ Challenging defective aggregate {:?}", @@ -1934,7 +1925,7 @@ pub mod pallet { buckets_aggregates_by_aggregator: Vec<(AggregatorInfo, Vec)>, redundancy_factor: u16, quorum: Percent, - ) -> ConsistentGroups { + ) -> ConsistencyGroups { let mut buckets_sub_aggregates: Vec = Vec::new(); log::info!( @@ -2926,7 +2917,7 @@ pub mod pallet { nodes_aggregates_by_aggregator: Vec<(AggregatorInfo, Vec)>, redundancy_factor: u16, quorum: Percent, - ) -> ConsistentGroups { + ) -> ConsistencyGroups { let mut nodes_aggregates: Vec = Vec::new(); log::info!( @@ -2965,7 +2956,7 @@ pub mod pallet { aggregates: Vec, redundancy_factor: u16, quorum: Percent, - ) -> ConsistentGroups + ) -> ConsistencyGroups where A: Aggregate + Clone, { @@ -2985,17 +2976,28 @@ pub mod pallet { let max_aggregates_count = redundancy_factor; let quorum_threshold = quorum * max_aggregates_count; - for (hash, group) in consistent_aggregates { - if group.len() == usize::from(max_aggregates_count) { - in_consensus.push(ConsistentGroup(hash, group)); - } else if group.len() >= quorum_threshold.into() { - in_quorum.push(ConsistentGroup(hash, group)); + for (_hash, group) in consistent_aggregates { + let aggregate = group.first().unwrap(); + let aggregates_count = u16::try_from(group.len()).unwrap_or(u16::MAX); + let aggregators: Vec = + group.clone().into_iter().map(|a| a.get_aggregator()).collect(); + + let consolidated_aggregate = ConsolidatedAggregate::::new( + aggregate.clone(), + aggregates_count, + aggregators, + ); + + if aggregates_count == max_aggregates_count { + in_consensus.push(consolidated_aggregate); + } else if aggregates_count >= quorum_threshold { + in_quorum.push(consolidated_aggregate); } else { - in_others.push(ConsistentGroup(hash, group)); + in_others.push(consolidated_aggregate); } } - ConsistentGroups { in_consensus, in_quorum, in_others } + ConsistencyGroups { in_consensus, in_quorum, in_others } } /// Fetch cluster to validate. diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index 0396a6dee..630da8bd0 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -1025,24 +1025,13 @@ fn buckets_sub_aggregates_grouped_by_consistency() { assert_eq!(groups.in_quorum.len(), 0); assert_eq!(groups.in_others.len(), 0); - let agg1 = groups.in_consensus[0].get(0).unwrap(); - let agg2 = groups.in_consensus[0].get(1).unwrap(); - let agg3 = groups.in_consensus[0].get(2).unwrap(); - - assert_eq!(agg1.stored_bytes, 100); - assert_eq!(agg1.transferred_bytes, 50); - assert_eq!(agg1.number_of_puts, 10); - assert_eq!(agg1.number_of_gets, 20); - - assert_eq!(agg2.stored_bytes, 100); - assert_eq!(agg2.transferred_bytes, 50); - assert_eq!(agg2.number_of_puts, 10); - assert_eq!(agg2.number_of_gets, 20); - - assert_eq!(agg3.stored_bytes, 100); - assert_eq!(agg3.transferred_bytes, 50); - assert_eq!(agg3.number_of_puts, 10); - assert_eq!(agg3.number_of_gets, 20); + let consolidated_aggregate = groups.in_consensus[0].aggregate.clone(); + assert_eq!(consolidated_aggregate.stored_bytes, 100); + assert_eq!(consolidated_aggregate.transferred_bytes, 50); + assert_eq!(consolidated_aggregate.number_of_puts, 10); + assert_eq!(consolidated_aggregate.number_of_gets, 20); + assert_eq!(groups.in_consensus[0].count, 3); + assert_eq!(groups.in_consensus[0].aggregators.len(), 3); } #[test] @@ -1130,49 +1119,23 @@ fn buckets_sub_aggregates_grouped_by_consistency_2() { assert_eq!(groups.in_quorum.len(), 0); assert_eq!(groups.in_others.len(), 0); - let g1_agg1 = groups.in_consensus[0].get(0).unwrap(); - let g1_agg2 = groups.in_consensus[0].get(1).unwrap(); - let g1_agg3 = groups.in_consensus[0].get(2).unwrap(); - - assert_eq!(g1_agg1.bucket_id, 1); - assert_eq!(g1_agg1.stored_bytes, 100); - assert_eq!(g1_agg1.transferred_bytes, 50); - assert_eq!(g1_agg1.number_of_puts, 10); - assert_eq!(g1_agg1.number_of_gets, 20); - - assert_eq!(g1_agg2.bucket_id, 1); - assert_eq!(g1_agg2.stored_bytes, 100); - assert_eq!(g1_agg2.transferred_bytes, 50); - assert_eq!(g1_agg2.number_of_puts, 10); - assert_eq!(g1_agg2.number_of_gets, 20); - - assert_eq!(g1_agg3.bucket_id, 1); - assert_eq!(g1_agg3.stored_bytes, 100); - assert_eq!(g1_agg3.transferred_bytes, 50); - assert_eq!(g1_agg3.number_of_puts, 10); - assert_eq!(g1_agg3.number_of_gets, 20); - - let g2_agg1 = groups.in_consensus[1].get(0).unwrap(); - let g2_agg2 = groups.in_consensus[1].get(1).unwrap(); - let g2_agg3 = groups.in_consensus[1].get(2).unwrap(); - - assert_eq!(g2_agg1.bucket_id, 2); - assert_eq!(g2_agg1.stored_bytes, 110); - assert_eq!(g2_agg1.transferred_bytes, 50); - assert_eq!(g2_agg1.number_of_puts, 10); - assert_eq!(g2_agg1.number_of_gets, 20); - - assert_eq!(g2_agg2.bucket_id, 2); - assert_eq!(g2_agg2.stored_bytes, 110); - assert_eq!(g2_agg2.transferred_bytes, 50); - assert_eq!(g2_agg2.number_of_puts, 10); - assert_eq!(g2_agg2.number_of_gets, 20); - - assert_eq!(g2_agg3.bucket_id, 2); - assert_eq!(g2_agg3.stored_bytes, 110); - assert_eq!(g2_agg3.transferred_bytes, 50); - assert_eq!(g2_agg3.number_of_puts, 10); - assert_eq!(g2_agg3.number_of_gets, 20); + let consolidated_aggregate_in_consensus_1 = groups.in_consensus[0].aggregate.clone(); + assert_eq!(consolidated_aggregate_in_consensus_1.bucket_id, 1); + assert_eq!(consolidated_aggregate_in_consensus_1.stored_bytes, 100); + assert_eq!(consolidated_aggregate_in_consensus_1.transferred_bytes, 50); + assert_eq!(consolidated_aggregate_in_consensus_1.number_of_puts, 10); + assert_eq!(consolidated_aggregate_in_consensus_1.number_of_gets, 20); + assert_eq!(groups.in_consensus[0].count, 3); + assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + + let consolidated_aggregate_in_consensus_2 = groups.in_consensus[1].aggregate.clone(); + assert_eq!(consolidated_aggregate_in_consensus_2.bucket_id, 2); + assert_eq!(consolidated_aggregate_in_consensus_2.stored_bytes, 110); + assert_eq!(consolidated_aggregate_in_consensus_2.transferred_bytes, 50); + assert_eq!(consolidated_aggregate_in_consensus_2.number_of_puts, 10); + assert_eq!(consolidated_aggregate_in_consensus_2.number_of_gets, 20); + assert_eq!(groups.in_consensus[1].count, 3); + assert_eq!(groups.in_consensus[1].aggregators.len(), 3); } #[test] @@ -1229,24 +1192,13 @@ fn nodes_aggregates_grouped_by_consistency() { assert_eq!(groups.in_quorum.len(), 0); assert_eq!(groups.in_others.len(), 0); - let agg1 = groups.in_consensus[0].get(0).unwrap(); - let agg2 = groups.in_consensus[0].get(1).unwrap(); - let agg3 = groups.in_consensus[0].get(2).unwrap(); - - assert_eq!(agg1.stored_bytes, 100); - assert_eq!(agg1.transferred_bytes, 50); - assert_eq!(agg1.number_of_puts, 10); - assert_eq!(agg1.number_of_gets, 20); - - assert_eq!(agg2.stored_bytes, 100); - assert_eq!(agg2.transferred_bytes, 50); - assert_eq!(agg2.number_of_puts, 10); - assert_eq!(agg2.number_of_gets, 20); - - assert_eq!(agg3.stored_bytes, 100); - assert_eq!(agg3.transferred_bytes, 50); - assert_eq!(agg3.number_of_puts, 10); - assert_eq!(agg3.number_of_gets, 20); + let consolidated_aggregate_in_consensus = groups.in_consensus[0].aggregate.clone(); + assert_eq!(consolidated_aggregate_in_consensus.stored_bytes, 100); + assert_eq!(consolidated_aggregate_in_consensus.transferred_bytes, 50); + assert_eq!(consolidated_aggregate_in_consensus.number_of_puts, 10); + assert_eq!(consolidated_aggregate_in_consensus.number_of_gets, 20); + assert_eq!(groups.in_consensus[0].count, 3); + assert_eq!(groups.in_consensus[0].aggregators.len(), 3); } #[test] @@ -1327,49 +1279,24 @@ fn nodes_aggregates_grouped_by_consistency_2() { assert_eq!(groups.in_quorum.len(), 0); assert_eq!(groups.in_others.len(), 0); - let g1_agg1 = groups.in_consensus[0].get(0).unwrap(); - let g1_agg2 = groups.in_consensus[0].get(1).unwrap(); - let g1_agg3 = groups.in_consensus[0].get(2).unwrap(); - - assert_eq!(g1_agg1.node_id, "2".to_string()); - assert_eq!(g1_agg1.stored_bytes, 110); - assert_eq!(g1_agg1.transferred_bytes, 50); - assert_eq!(g1_agg1.number_of_puts, 10); - assert_eq!(g1_agg1.number_of_gets, 20); - - assert_eq!(g1_agg2.node_id, "2".to_string()); - assert_eq!(g1_agg2.stored_bytes, 110); - assert_eq!(g1_agg2.transferred_bytes, 50); - assert_eq!(g1_agg2.number_of_puts, 10); - assert_eq!(g1_agg2.number_of_gets, 20); - - assert_eq!(g1_agg3.node_id, "2".to_string()); - assert_eq!(g1_agg3.stored_bytes, 110); - assert_eq!(g1_agg3.transferred_bytes, 50); - assert_eq!(g1_agg3.number_of_puts, 10); - assert_eq!(g1_agg3.number_of_gets, 20); - - let g2_agg1 = groups.in_consensus[1].get(0).unwrap(); - let g2_agg2 = groups.in_consensus[1].get(1).unwrap(); - let g2_agg3 = groups.in_consensus[1].get(2).unwrap(); - - assert_eq!(g2_agg1.node_id, "1".to_string()); - assert_eq!(g2_agg1.stored_bytes, 100); - assert_eq!(g2_agg1.transferred_bytes, 50); - assert_eq!(g2_agg1.number_of_puts, 10); - assert_eq!(g2_agg1.number_of_gets, 20); - - assert_eq!(g2_agg2.node_id, "1".to_string()); - assert_eq!(g2_agg2.stored_bytes, 100); - assert_eq!(g2_agg2.transferred_bytes, 50); - assert_eq!(g2_agg2.number_of_puts, 10); - assert_eq!(g2_agg2.number_of_gets, 20); - - assert_eq!(g2_agg3.node_id, "1".to_string()); - assert_eq!(g2_agg3.stored_bytes, 100); - assert_eq!(g2_agg3.transferred_bytes, 50); - assert_eq!(g2_agg3.number_of_puts, 10); - assert_eq!(g2_agg3.number_of_gets, 20); + let consolidated_aggregate_1 = groups.in_consensus[0].aggregate.clone(); + assert_eq!(consolidated_aggregate_1.node_id, "2".to_string()); + assert_eq!(consolidated_aggregate_1.stored_bytes, 110); + assert_eq!(consolidated_aggregate_1.transferred_bytes, 50); + assert_eq!(consolidated_aggregate_1.number_of_puts, 10); + assert_eq!(consolidated_aggregate_1.number_of_gets, 20); + assert_eq!(groups.in_consensus[0].count, 3); + assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + + let consolidated_aggregate_2 = groups.in_consensus[1].aggregate.clone(); + + assert_eq!(consolidated_aggregate_2.node_id, "1".to_string()); + assert_eq!(consolidated_aggregate_2.stored_bytes, 100); + assert_eq!(consolidated_aggregate_2.transferred_bytes, 50); + assert_eq!(consolidated_aggregate_2.number_of_puts, 10); + assert_eq!(consolidated_aggregate_2.number_of_gets, 20); + assert_eq!(groups.in_consensus[1].count, 3); + assert_eq!(groups.in_consensus[1].aggregators.len(), 3); } #[test] @@ -1564,23 +1491,16 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { assert_eq!( groups.in_consensus, vec![ - ConsistentGroup(bucket_sub_aggregate_in_consensus.hash::(), vec![ - bucket_sub_aggregate_in_consensus.clone(), - BucketSubAggregate { - aggregator: AggregatorInfo { - node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), - node_params: node_params2.clone(), - }, - ..bucket_sub_aggregate_in_consensus.clone() - }, - BucketSubAggregate { - aggregator: AggregatorInfo { - node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), - node_params: node_params3.clone(), - }, - ..bucket_sub_aggregate_in_consensus.clone() - }, - ]) + ConsolidatedAggregate::new(bucket_sub_aggregate_in_consensus, 3, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), + node_params: node_params1.clone(), + }, AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), + node_params: node_params2.clone(), + }, AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), + node_params: node_params3.clone(), + }]) ] ); @@ -1602,13 +1522,13 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { assert_eq!( groups.in_quorum, vec![ - ConsistentGroup(bucket_sub_aggregate_in_quorum.hash::(), vec![bucket_sub_aggregate_in_quorum.clone(), BucketSubAggregate {aggregator: AggregatorInfo { - node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), - node_params: node_params3.clone(), - }, - ..bucket_sub_aggregate_in_quorum.clone() - }, - ]), + ConsolidatedAggregate::new(bucket_sub_aggregate_in_quorum, 2, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), + node_params: node_params1.clone(), + }, AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), + node_params: node_params3.clone(), + }]) ] ); @@ -1644,8 +1564,14 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { assert_eq!( groups.in_others, vec![ - ConsistentGroup(bucket_sub_aggregate2_in_others.hash::(), vec![bucket_sub_aggregate2_in_others]), - ConsistentGroup(bucket_sub_aggregate1_in_others.hash::(), vec![bucket_sub_aggregate1_in_others]), + ConsolidatedAggregate::new(bucket_sub_aggregate2_in_others, 1, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([5; 32])), + node_params: node_params5.clone(), + }]), + ConsolidatedAggregate::new(bucket_sub_aggregate1_in_others, 1, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), + node_params: node_params2.clone(), + }]), ] ); }); @@ -1826,7 +1752,10 @@ fn node_aggregates_are_fetched_and_grouped() { assert_eq!( groups.in_consensus, - vec![ConsistentGroup(node_aggregate_in_consensus.hash::(), vec![node_aggregate_in_consensus.clone(), NodeAggregate { aggregator: AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), node_params: node_params2.clone(), }, ..node_aggregate_in_consensus.clone() }, NodeAggregate { aggregator: AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), node_params: node_params3.clone(), }, ..node_aggregate_in_consensus.clone() } ])] + vec![ConsolidatedAggregate::new(node_aggregate_in_consensus.clone(), 3, vec![ + AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), node_params: node_params1.clone() }, + AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), node_params: node_params2.clone() }, + AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), node_params: node_params3.clone() }])] ); // Node aggregates which are in quorum @@ -1844,7 +1773,10 @@ fn node_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_quorum, vec![ConsistentGroup(node_aggregate_in_quorum.hash::(), vec![node_aggregate_in_quorum.clone(), NodeAggregate {aggregator: AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([4; 32])), node_params: node_params4.clone(), }, ..node_aggregate_in_quorum.clone() }])] + groups.in_quorum, vec![ConsolidatedAggregate::new(node_aggregate_in_quorum.clone(), 2, vec![ + AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), node_params: node_params1.clone() }, + AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([4; 32])), node_params: node_params4.clone() } + ])] ); // Others nodes aggregates @@ -1875,7 +1807,14 @@ fn node_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_others, vec![ConsistentGroup(node_aggregate2_in_others.hash::(), vec![node_aggregate2_in_others]), ConsistentGroup(node_aggregate1_in_others.hash::(), vec![node_aggregate1_in_others])] + groups.in_others, vec![ + ConsolidatedAggregate::new(node_aggregate2_in_others.clone(), 1, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([5; 32])), + node_params: node_params5.clone(), + }]), ConsolidatedAggregate::new(node_aggregate1_in_others.clone(), 1, vec![AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), + node_params: node_params2.clone(), + }])] ); }); } diff --git a/runtime/cere-dev/src/lib.rs b/runtime/cere-dev/src/lib.rs index 53a9ac6e2..0d25f2199 100644 --- a/runtime/cere-dev/src/lib.rs +++ b/runtime/cere-dev/src/lib.rs @@ -149,7 +149,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 60000, + spec_version: 60001, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 19, From d2da38c46342d654190fab4cf9a2c60731c7c4d2 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Tue, 22 Oct 2024 22:54:38 +0200 Subject: [PATCH 2/5] fix: preventing double-spending in case of aggregates inconsistencies --- pallets/ddc-verification/src/lib.rs | 98 ++++++++++++++-------- pallets/ddc-verification/src/tests.rs | 114 +++++++++++++------------- 2 files changed, 123 insertions(+), 89 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 70cde5f7f..7c8df399c 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -767,9 +767,9 @@ pub mod pallet { #[derive(Debug, Clone, PartialEq)] pub(crate) struct ConsistencyGroups { - pub(crate) in_consensus: Vec>, - pub(crate) in_quorum: Vec>, - pub(crate) in_others: Vec>, + pub(crate) consensus: Vec>, + pub(crate) quorum: Vec>, + pub(crate) others: Vec>, } #[derive(Debug, Clone, Encode, Decode, TypeInfo, PartialEq)] @@ -1624,24 +1624,23 @@ pub mod pallet { // todo: run a light challenge for unanimous consensus let in_consensus_usage = consistency_groups - .in_consensus + .consensus .clone() .into_iter() - .map(|a| a.aggregate.clone()) + .map(|ca| ca.aggregate.clone()) .collect::>(); total_usage.extend(in_consensus_usage); // todo: run a light challenge for quorum, i.e. for majority let in_quorum_usage = consistency_groups - .in_quorum + .quorum .clone() .into_iter() - .map(|a| a.aggregate.clone()) + .map(|ca| ca.aggregate.clone()) .collect::>(); total_usage.extend(in_quorum_usage); - let verified_usage = - Self::challenge_others(cluster_id, era_id, consistency_groups.in_others)?; + let verified_usage = Self::challenge_others(cluster_id, era_id, consistency_groups)?; if !verified_usage.is_empty() { total_usage.extend(verified_usage); @@ -1653,22 +1652,52 @@ pub mod pallet { pub(crate) fn challenge_others( _cluster_id: &ClusterId, _era_id: DdcEra, - others: Vec>, + consistency_groups: ConsistencyGroups, ) -> Result, Vec> { let redundancy_factor = T::DAC_REDUNDANCY_FACTOR; let mut verified_usage: Vec = vec![]; - for group in others { - if group.count > redundancy_factor { - let excessive_aggregate = group.aggregate.clone(); + let in_consensus_keys = consistency_groups + .consensus + .clone() + .into_iter() + .map(|ac| ac.aggregate.get_key()) + .collect::>(); - log::info!( - "⚠️ Number of consistent aggregates exceeds the redundancy factor {:?}", - excessive_aggregate.hash::() + let in_quorum_keys = consistency_groups + .quorum + .clone() + .into_iter() + .map(|ac| ac.aggregate.get_key()) + .collect::>(); + + for consolidated_aggregate in consistency_groups.others { + let aggregate_key = consolidated_aggregate.aggregate.get_key(); + + if in_consensus_keys.contains(&aggregate_key) || + in_quorum_keys.contains(&aggregate_key) + { + log::warn!( + "⚠️ The aggregate {:?} is inconsistent between aggregators.", + aggregate_key + ); + + // This prevents the double spending in case of inconsistencies between + // aggregators for the same aggregation key + continue; + } + + if consolidated_aggregate.count > redundancy_factor { + let excessive_aggregate = consolidated_aggregate.aggregate.clone(); + + log::warn!( + "⚠️ Number of consistent aggregates with key {:?} exceeds the redundancy factor", + aggregate_key ); log::info!( - "🔎‍ Challenging excessive aggregate {:?}", + "🔎‍ Challenging excessive aggregate with key {:?} and hash {:?}", + aggregate_key, excessive_aggregate.hash::() ); @@ -1677,10 +1706,11 @@ pub mod pallet { // payouts stage verified_usage.push(excessive_aggregate); } else { - let defective_aggregate = group.aggregate.clone(); + let defective_aggregate = consolidated_aggregate.aggregate.clone(); log::info!( - "🔎‍ Challenging defective aggregate {:?}", + "🔎‍ Challenging defective aggregate with key {:?} and hash {:?}", + aggregate_key, defective_aggregate.hash::() ); @@ -1958,9 +1988,9 @@ pub mod pallet { let buckets_sub_aggregates_groups = Self::group_by_consistency(buckets_sub_aggregates, redundancy_factor, quorum); - log::info!("🏠🌕 Bucket Sub-Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_consensus); - log::info!("🏠🌗 Bucket Sub-Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_quorum); - log::info!("🏠🌘 Bucket Sub-Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.in_others); + log::info!("🏠🌕 Bucket Sub-Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.consensus); + log::info!("🏠🌗 Bucket Sub-Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.quorum); + log::info!("🏠🌘 Bucket Sub-Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, buckets_sub_aggregates_groups.others); buckets_sub_aggregates_groups } @@ -2945,9 +2975,9 @@ pub mod pallet { let nodes_aggregates_groups = Self::group_by_consistency(nodes_aggregates, redundancy_factor, quorum); - log::info!("🏠🌕 Node Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_consensus); - log::info!("🏠🌗 Node Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_quorum); - log::info!("🏠🌘 Node Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.in_others); + log::info!("🏠🌕 Node Aggregates, which are in consensus for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.consensus); + log::info!("🏠🌗 Node Aggregates, which are in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.quorum); + log::info!("🏠🌘 Node Aggregates, which are neither in consensus nor in quorum for cluster_id: {:?} for era_id: {:?}::: {:?}", cluster_id, era_id, nodes_aggregates_groups.others); nodes_aggregates_groups } @@ -2969,9 +2999,9 @@ pub mod pallet { .push(aggregate.clone()); } - let mut in_consensus = Vec::new(); - let mut in_quorum = Vec::new(); - let mut in_others = Vec::new(); + let mut consensus_group = Vec::new(); + let mut quorum_group = Vec::new(); + let mut others_group = Vec::new(); let max_aggregates_count = redundancy_factor; let quorum_threshold = quorum * max_aggregates_count; @@ -2989,15 +3019,19 @@ pub mod pallet { ); if aggregates_count == max_aggregates_count { - in_consensus.push(consolidated_aggregate); + consensus_group.push(consolidated_aggregate); } else if aggregates_count >= quorum_threshold { - in_quorum.push(consolidated_aggregate); + quorum_group.push(consolidated_aggregate); } else { - in_others.push(consolidated_aggregate); + others_group.push(consolidated_aggregate); } } - ConsistencyGroups { in_consensus, in_quorum, in_others } + ConsistencyGroups { + consensus: consensus_group, + quorum: quorum_group, + others: others_group, + } } /// Fetch cluster to validate. diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index 630da8bd0..ef9b9917f 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -390,9 +390,9 @@ fn buckets_sub_aggregates_in_consensus_merged() { redundancy_factor, quorum, ); - assert_eq!(groups.in_consensus.len(), 1); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 1); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -512,9 +512,9 @@ fn buckets_sub_aggregates_in_quorum_merged() { redundancy_factor, quorum, ); - assert_eq!(groups.in_consensus.len(), 0); - assert_eq!(groups.in_quorum.len(), 1); - assert_eq!(groups.in_others.len(), 1); + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 1); + assert_eq!(groups.others.len(), 1); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -635,9 +635,9 @@ fn buckets_sub_aggregates_in_others_merged() { quorum, ); - assert_eq!(groups.in_consensus.len(), 0); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 2); + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 2); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -743,9 +743,9 @@ fn nodes_aggregates_in_consensus_merged() { redundancy_factor, quorum, ); - assert_eq!(groups.in_consensus.len(), 1); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 1); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -844,9 +844,9 @@ fn nodes_aggregates_in_quorum_merged() { redundancy_factor, quorum, ); - assert_eq!(groups.in_consensus.len(), 0); - assert_eq!(groups.in_quorum.len(), 1); - assert_eq!(groups.in_others.len(), 1); + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 1); + assert_eq!(groups.others.len(), 1); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -946,9 +946,9 @@ fn nodes_aggregates_in_others_merged() { quorum, ); - assert_eq!(groups.in_consensus.len(), 0); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 2); + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 2); let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -1021,17 +1021,17 @@ fn buckets_sub_aggregates_grouped_by_consistency() { let groups = DdcVerification::group_by_consistency(buckets_sub_aggregates, redundancy_factor, quorum); - assert_eq!(groups.in_consensus.len(), 1); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 1); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); - let consolidated_aggregate = groups.in_consensus[0].aggregate.clone(); + let consolidated_aggregate = groups.consensus[0].aggregate.clone(); assert_eq!(consolidated_aggregate.stored_bytes, 100); assert_eq!(consolidated_aggregate.transferred_bytes, 50); assert_eq!(consolidated_aggregate.number_of_puts, 10); assert_eq!(consolidated_aggregate.number_of_gets, 20); - assert_eq!(groups.in_consensus[0].count, 3); - assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + assert_eq!(groups.consensus[0].count, 3); + assert_eq!(groups.consensus[0].aggregators.len(), 3); } #[test] @@ -1115,27 +1115,27 @@ fn buckets_sub_aggregates_grouped_by_consistency_2() { let groups = DdcVerification::group_by_consistency(buckets_sub_aggregates, redundancy_factor, quorum); - assert_eq!(groups.in_consensus.len(), 2); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 2); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); - let consolidated_aggregate_in_consensus_1 = groups.in_consensus[0].aggregate.clone(); + let consolidated_aggregate_in_consensus_1 = groups.consensus[0].aggregate.clone(); assert_eq!(consolidated_aggregate_in_consensus_1.bucket_id, 1); assert_eq!(consolidated_aggregate_in_consensus_1.stored_bytes, 100); assert_eq!(consolidated_aggregate_in_consensus_1.transferred_bytes, 50); assert_eq!(consolidated_aggregate_in_consensus_1.number_of_puts, 10); assert_eq!(consolidated_aggregate_in_consensus_1.number_of_gets, 20); - assert_eq!(groups.in_consensus[0].count, 3); - assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + assert_eq!(groups.consensus[0].count, 3); + assert_eq!(groups.consensus[0].aggregators.len(), 3); - let consolidated_aggregate_in_consensus_2 = groups.in_consensus[1].aggregate.clone(); + let consolidated_aggregate_in_consensus_2 = groups.consensus[1].aggregate.clone(); assert_eq!(consolidated_aggregate_in_consensus_2.bucket_id, 2); assert_eq!(consolidated_aggregate_in_consensus_2.stored_bytes, 110); assert_eq!(consolidated_aggregate_in_consensus_2.transferred_bytes, 50); assert_eq!(consolidated_aggregate_in_consensus_2.number_of_puts, 10); assert_eq!(consolidated_aggregate_in_consensus_2.number_of_gets, 20); - assert_eq!(groups.in_consensus[1].count, 3); - assert_eq!(groups.in_consensus[1].aggregators.len(), 3); + assert_eq!(groups.consensus[1].count, 3); + assert_eq!(groups.consensus[1].aggregators.len(), 3); } #[test] @@ -1188,17 +1188,17 @@ fn nodes_aggregates_grouped_by_consistency() { let groups = DdcVerification::group_by_consistency(nodes_aggregates, redundancy_factor, quorum); - assert_eq!(groups.in_consensus.len(), 1); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 1); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); - let consolidated_aggregate_in_consensus = groups.in_consensus[0].aggregate.clone(); + let consolidated_aggregate_in_consensus = groups.consensus[0].aggregate.clone(); assert_eq!(consolidated_aggregate_in_consensus.stored_bytes, 100); assert_eq!(consolidated_aggregate_in_consensus.transferred_bytes, 50); assert_eq!(consolidated_aggregate_in_consensus.number_of_puts, 10); assert_eq!(consolidated_aggregate_in_consensus.number_of_gets, 20); - assert_eq!(groups.in_consensus[0].count, 3); - assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + assert_eq!(groups.consensus[0].count, 3); + assert_eq!(groups.consensus[0].aggregators.len(), 3); } #[test] @@ -1275,28 +1275,28 @@ fn nodes_aggregates_grouped_by_consistency_2() { let groups = DdcVerification::group_by_consistency(nodes_aggregates, redundancy_factor, quorum); - assert_eq!(groups.in_consensus.len(), 2); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 2); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); - let consolidated_aggregate_1 = groups.in_consensus[0].aggregate.clone(); + let consolidated_aggregate_1 = groups.consensus[0].aggregate.clone(); assert_eq!(consolidated_aggregate_1.node_id, "2".to_string()); assert_eq!(consolidated_aggregate_1.stored_bytes, 110); assert_eq!(consolidated_aggregate_1.transferred_bytes, 50); assert_eq!(consolidated_aggregate_1.number_of_puts, 10); assert_eq!(consolidated_aggregate_1.number_of_gets, 20); - assert_eq!(groups.in_consensus[0].count, 3); - assert_eq!(groups.in_consensus[0].aggregators.len(), 3); + assert_eq!(groups.consensus[0].count, 3); + assert_eq!(groups.consensus[0].aggregators.len(), 3); - let consolidated_aggregate_2 = groups.in_consensus[1].aggregate.clone(); + let consolidated_aggregate_2 = groups.consensus[1].aggregate.clone(); assert_eq!(consolidated_aggregate_2.node_id, "1".to_string()); assert_eq!(consolidated_aggregate_2.stored_bytes, 100); assert_eq!(consolidated_aggregate_2.transferred_bytes, 50); assert_eq!(consolidated_aggregate_2.number_of_puts, 10); assert_eq!(consolidated_aggregate_2.number_of_gets, 20); - assert_eq!(groups.in_consensus[1].count, 3); - assert_eq!(groups.in_consensus[1].aggregators.len(), 3); + assert_eq!(groups.consensus[1].count, 3); + assert_eq!(groups.consensus[1].aggregators.len(), 3); } #[test] @@ -1307,9 +1307,9 @@ fn empty_bucket_sub_aggregates() { let empty = Vec::::new(); let groups = DdcVerification::group_by_consistency(empty, redundancy_factor, quorum); - assert_eq!(groups.in_consensus.len(), 0); - assert_eq!(groups.in_quorum.len(), 0); - assert_eq!(groups.in_others.len(), 0); + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 0); } #[test] @@ -1489,7 +1489,7 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_consensus, + groups.consensus, vec![ ConsolidatedAggregate::new(bucket_sub_aggregate_in_consensus, 3, vec![AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), @@ -1520,7 +1520,7 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_quorum, + groups.quorum, vec![ ConsolidatedAggregate::new(bucket_sub_aggregate_in_quorum, 2, vec![AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), @@ -1562,7 +1562,7 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_others, + groups.others, vec![ ConsolidatedAggregate::new(bucket_sub_aggregate2_in_others, 1, vec![AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([5; 32])), @@ -1751,7 +1751,7 @@ fn node_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_consensus, + groups.consensus, vec![ConsolidatedAggregate::new(node_aggregate_in_consensus.clone(), 3, vec![ AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), node_params: node_params1.clone() }, AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), node_params: node_params2.clone() }, @@ -1773,7 +1773,7 @@ fn node_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_quorum, vec![ConsolidatedAggregate::new(node_aggregate_in_quorum.clone(), 2, vec![ + groups.quorum, vec![ConsolidatedAggregate::new(node_aggregate_in_quorum.clone(), 2, vec![ AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), node_params: node_params1.clone() }, AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([4; 32])), node_params: node_params4.clone() } ])] @@ -1807,7 +1807,7 @@ fn node_aggregates_are_fetched_and_grouped() { }; assert_eq!( - groups.in_others, vec![ + groups.others, vec![ ConsolidatedAggregate::new(node_aggregate2_in_others.clone(), 1, vec![AggregatorInfo { node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([5; 32])), node_params: node_params5.clone(), From fa0e7d02b0e1bc291a0ca63656574f0dde8f50d0 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Wed, 23 Oct 2024 00:14:31 +0200 Subject: [PATCH 3/5] fix: checking the 'others' group for processed aggregation keys and tests --- pallets/ddc-verification/src/lib.rs | 47 +++++++++++++-------------- pallets/ddc-verification/src/tests.rs | 42 ++++++++++++------------ 2 files changed, 43 insertions(+), 46 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 7c8df399c..2ff7aad92 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -1620,30 +1620,42 @@ pub mod pallet { era_id: DdcEra, consistency_groups: ConsistencyGroups, ) -> Result, Vec> { - let mut total_usage: Vec = vec![]; + let mut total_usage = vec![]; + let mut total_usage_keys = vec![]; - // todo: run a light challenge for unanimous consensus + // todo: implement 'challenge_consensus' fn and run a light challenge for unanimous + // consensus let in_consensus_usage = consistency_groups .consensus .clone() .into_iter() .map(|ca| ca.aggregate.clone()) .collect::>(); - total_usage.extend(in_consensus_usage); + total_usage.extend(in_consensus_usage.clone()); + total_usage_keys + .extend(in_consensus_usage.into_iter().map(|a| a.get_key()).collect::>()); - // todo: run a light challenge for quorum, i.e. for majority + // todo: implement 'challenge_quorum' fn and run a light challenge for the quorum, i.e. + // for majority let in_quorum_usage = consistency_groups .quorum .clone() .into_iter() .map(|ca| ca.aggregate.clone()) .collect::>(); - total_usage.extend(in_quorum_usage); + total_usage.extend(in_quorum_usage.clone()); + total_usage_keys + .extend(in_quorum_usage.into_iter().map(|a| a.get_key()).collect::>()); - let verified_usage = Self::challenge_others(cluster_id, era_id, consistency_groups)?; + let verified_usage = Self::challenge_others( + cluster_id, + era_id, + consistency_groups, + &mut total_usage_keys, + )?; if !verified_usage.is_empty() { - total_usage.extend(verified_usage); + total_usage.extend(verified_usage.clone()); } Ok(total_usage) @@ -1653,30 +1665,15 @@ pub mod pallet { _cluster_id: &ClusterId, _era_id: DdcEra, consistency_groups: ConsistencyGroups, + accepted_keys: &mut Vec, ) -> Result, Vec> { let redundancy_factor = T::DAC_REDUNDANCY_FACTOR; let mut verified_usage: Vec = vec![]; - let in_consensus_keys = consistency_groups - .consensus - .clone() - .into_iter() - .map(|ac| ac.aggregate.get_key()) - .collect::>(); - - let in_quorum_keys = consistency_groups - .quorum - .clone() - .into_iter() - .map(|ac| ac.aggregate.get_key()) - .collect::>(); - for consolidated_aggregate in consistency_groups.others { let aggregate_key = consolidated_aggregate.aggregate.get_key(); - if in_consensus_keys.contains(&aggregate_key) || - in_quorum_keys.contains(&aggregate_key) - { + if accepted_keys.contains(&aggregate_key) { log::warn!( "⚠️ The aggregate {:?} is inconsistent between aggregators.", aggregate_key @@ -1705,6 +1702,7 @@ pub mod pallet { // we assume it won't happen at the moment, so we just take the aggregate to // payouts stage verified_usage.push(excessive_aggregate); + accepted_keys.push(aggregate_key); } else { let defective_aggregate = consolidated_aggregate.aggregate.clone(); @@ -1722,6 +1720,7 @@ pub mod pallet { // we assume all aggregates are valid at the moment, so we just take the // aggregate to payouts stage verified_usage.push(defective_aggregate); + accepted_keys.push(aggregate_key); } } } diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index ef9b9917f..b44f1cc72 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -398,6 +398,8 @@ fn buckets_sub_aggregates_in_consensus_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key + let usage = usages.first().unwrap(); assert_eq!(usage.stored_bytes, 100); assert_eq!(usage.transferred_bytes, 50); @@ -520,6 +522,8 @@ fn buckets_sub_aggregates_in_quorum_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key + let usage = usages.first().unwrap(); assert_eq!(usage.stored_bytes, 100); assert_eq!(usage.transferred_bytes, 50); @@ -643,18 +647,13 @@ fn buckets_sub_aggregates_in_others_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key - let usage1 = usages.first().unwrap(); - assert_eq!(usage1.stored_bytes, 100); - assert_eq!(usage1.transferred_bytes, 50); - assert_eq!(usage1.number_of_puts, 10); - assert_eq!(usage1.number_of_gets, 20); - - let usage2 = usages.get(1).unwrap(); - assert_eq!(usage2.stored_bytes, 200); - assert_eq!(usage2.transferred_bytes, 50); - assert_eq!(usage2.number_of_puts, 10); - assert_eq!(usage2.number_of_gets, 20); + let usage = usages.first().unwrap(); + assert_eq!(usage.stored_bytes, 100); + assert_eq!(usage.transferred_bytes, 50); + assert_eq!(usage.number_of_puts, 10); + assert_eq!(usage.number_of_gets, 20); } #[test] @@ -751,6 +750,8 @@ fn nodes_aggregates_in_consensus_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key + let usage = usages.first().unwrap(); assert_eq!(usage.stored_bytes, 100); assert_eq!(usage.transferred_bytes, 50); @@ -852,6 +853,8 @@ fn nodes_aggregates_in_quorum_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key + let usage = usages.first().unwrap(); assert_eq!(usage.stored_bytes, 100); assert_eq!(usage.transferred_bytes, 50); @@ -954,18 +957,13 @@ fn nodes_aggregates_in_others_merged() { assert!(result.is_ok()); let usages = result.unwrap(); + assert_eq!(usages.len(), 1); // 1 consolidated aggregate per 1 aggregation key - let usage1 = usages.first().unwrap(); - assert_eq!(usage1.stored_bytes, 200); - assert_eq!(usage1.transferred_bytes, 50); - assert_eq!(usage1.number_of_puts, 10); - assert_eq!(usage1.number_of_gets, 20); - - let usage2 = usages.get(1).unwrap(); - assert_eq!(usage2.stored_bytes, 100); - assert_eq!(usage2.transferred_bytes, 50); - assert_eq!(usage2.number_of_puts, 10); - assert_eq!(usage2.number_of_gets, 20); + let usage = usages.first().unwrap(); + assert_eq!(usage.stored_bytes, 200); + assert_eq!(usage.transferred_bytes, 50); + assert_eq!(usage.number_of_puts, 10); + assert_eq!(usage.number_of_gets, 20); } #[test] From c24863c25db29d4ea15f7c198b457335e2ffbf7b Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Wed, 23 Oct 2024 16:13:19 +0200 Subject: [PATCH 4/5] docs: code documentation is added --- pallets/ddc-verification/src/lib.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 2ff7aad92..91fa00a85 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -752,10 +752,15 @@ pub mod pallet { number_of_gets: u64, } - #[derive(Debug, Clone, PartialEq)] + /// The `ConsolidatedAggregate` struct represents a merging result of multiple aggregates + /// that have reached consensus on the usage criteria. This result should be taken into + /// consideration when choosing the intensity of the challenge. pub(crate) struct ConsolidatedAggregate { + /// The representative aggregate after consolidation pub(crate) aggregate: A, + /// Number of aggregates that were consistent pub(crate) count: u16, + /// Aggregators that provided consistent aggregates pub(crate) aggregators: Vec, } @@ -778,12 +783,18 @@ pub mod pallet { BucketSubAggregateKey(BucketId, String), } + /// The 'Aggregate' trait defines a set of members common to activity aggregates, which reflect + /// the usage of a node or bucket within an Era.. pub(crate) trait Aggregate: Clone + Ord + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Debug { + /// Hash of the aggregate that is defined by it 'usage' values fn hash(&self) -> ActivityHash; + /// Aggregation key of this aggregate, i.e. bucket composite key or node key fn get_key(&self) -> AggregateKey; + /// Number of activity records this aggregated by this aggregate fn get_number_of_leaves(&self) -> u64; + /// Aggregator provided this aggregate fn get_aggregator(&self) -> AggregatorInfo; } From 94ae205c8770c70cc3b1447910abb9b24de70e14 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Wed, 23 Oct 2024 17:03:46 +0200 Subject: [PATCH 5/5] fix: missed attributes and tests --- pallets/ddc-verification/src/lib.rs | 1 + pallets/ddc-verification/src/tests.rs | 249 +++++++++++++++++++++++++- 2 files changed, 246 insertions(+), 4 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 91fa00a85..632f03076 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -755,6 +755,7 @@ pub mod pallet { /// The `ConsolidatedAggregate` struct represents a merging result of multiple aggregates /// that have reached consensus on the usage criteria. This result should be taken into /// consideration when choosing the intensity of the challenge. + #[derive(Debug, Clone, PartialEq)] pub(crate) struct ConsolidatedAggregate { /// The representative aggregate after consolidation pub(crate) aggregate: A, diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index b44f1cc72..f9cb21f32 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -515,8 +515,8 @@ fn buckets_sub_aggregates_in_quorum_merged() { quorum, ); assert_eq!(groups.consensus.len(), 0); - assert_eq!(groups.quorum.len(), 1); - assert_eq!(groups.others.len(), 1); + assert_eq!(groups.quorum.len(), 1); // 2 consistent aggregates merged into 1 in 'quorum' + assert_eq!(groups.others.len(), 1); // 1 inconsistent aggregate goes to 'others' let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -656,6 +656,137 @@ fn buckets_sub_aggregates_in_others_merged() { assert_eq!(usage.number_of_gets, 20); } +#[test] +fn buckets_sub_aggregates_in_others_merged_2() { + let redundancy_factor = 3; + let quorum = Percent::from_percent(100); + let cluster_id = ClusterId::from([1; 20]); + let era_id = 476817; + + let aggregator1 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "178.251.228.236".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example1.com".to_vec(), + }, + }; + + let aggregator2 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "95.217.8.119".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example2.com".to_vec(), + }, + }; + + let aggregator3 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "178.251.228.42".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example3.com".to_vec(), + }, + }; + + let resp1 = ( + aggregator1, + vec![BucketAggregateResponse { + stored_bytes: 100, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + bucket_id: 1, + sub_aggregates: vec![BucketSubAggregateResponse { + NodeID: "1".to_string(), + stored_bytes: 100, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + }], + }], + ); + + let resp2 = ( + aggregator2, + vec![BucketAggregateResponse { + stored_bytes: 200, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + bucket_id: 2, + sub_aggregates: vec![BucketSubAggregateResponse { + NodeID: "1".to_string(), + stored_bytes: 200, + transferred_bytes: 500, + number_of_puts: 30, + number_of_gets: 40, + }], + }], + ); + + let resp3 = ( + aggregator3, + vec![BucketAggregateResponse { + stored_bytes: 100, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + bucket_id: 1, + sub_aggregates: vec![BucketSubAggregateResponse { + NodeID: "1".to_string(), + stored_bytes: 100, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + }], + }], + ); + + let groups = DdcVerification::group_buckets_sub_aggregates_by_consistency( + &cluster_id, + era_id, + vec![resp1, resp2, resp3], + redundancy_factor, + quorum, + ); + + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 2); // 2 inconsistent aggregates + + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + + assert!(result.is_ok()); + let usages = result.unwrap(); + assert_eq!(usages.len(), 2); + + let usage1 = usages.first().unwrap(); + assert_eq!(usage1.stored_bytes, 100); + assert_eq!(usage1.transferred_bytes, 50); + assert_eq!(usage1.number_of_puts, 10); + assert_eq!(usage1.number_of_gets, 20); + + let usage2 = usages.get(1).unwrap(); + assert_eq!(usage2.stored_bytes, 200); + assert_eq!(usage2.transferred_bytes, 500); + assert_eq!(usage2.number_of_puts, 30); + assert_eq!(usage2.number_of_gets, 40); +} + #[test] fn nodes_aggregates_in_consensus_merged() { let redundancy_factor = 3; @@ -846,8 +977,8 @@ fn nodes_aggregates_in_quorum_merged() { quorum, ); assert_eq!(groups.consensus.len(), 0); - assert_eq!(groups.quorum.len(), 1); - assert_eq!(groups.others.len(), 1); + assert_eq!(groups.quorum.len(), 1); // 2 consistent aggregates merged into 1 in 'quorum' + assert_eq!(groups.others.len(), 1); // 1 inconsistent aggregate goes to 'others' let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); @@ -966,6 +1097,116 @@ fn nodes_aggregates_in_others_merged() { assert_eq!(usage.number_of_gets, 20); } +#[test] +fn nodes_aggregates_in_others_merged_2() { + let redundancy_factor = 3; + let quorum = Percent::from_percent(100); + let cluster_id = ClusterId::from([1; 20]); + let era_id = 476817; + + let aggregator1 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([1; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "178.251.228.236".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example1.com".to_vec(), + }, + }; + + let aggregator2 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([2; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "95.217.8.119".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example2.com".to_vec(), + }, + }; + + let aggregator3 = AggregatorInfo { + node_pub_key: NodePubKey::StoragePubKey(AccountId32::new([3; 32])), + node_params: StorageNodeParams { + ssl: false, + host: "178.251.228.42".as_bytes().to_vec(), + http_port: 8080, + mode: StorageNodeMode::DAC, + p2p_port: 5555, + grpc_port: 4444, + domain: b"example3.com".to_vec(), + }, + }; + + let resp1 = ( + aggregator1, + vec![NodeAggregateResponse { + node_id: "2".to_string(), + stored_bytes: 1000, + transferred_bytes: 500, + number_of_puts: 15, + number_of_gets: 30, + }], + ); + + let resp2 = ( + aggregator2, + vec![NodeAggregateResponse { + node_id: "1".to_string(), + stored_bytes: 200, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + }], + ); + + let resp3 = ( + aggregator3, + vec![NodeAggregateResponse { + node_id: "1".to_string(), + stored_bytes: 100, + transferred_bytes: 50, + number_of_puts: 10, + number_of_gets: 20, + }], + ); + + let groups = DdcVerification::group_nodes_aggregates_by_consistency( + &cluster_id, + era_id, + vec![resp1, resp2, resp3], + redundancy_factor, + quorum, + ); + + assert_eq!(groups.consensus.len(), 0); + assert_eq!(groups.quorum.len(), 0); + assert_eq!(groups.others.len(), 3); // 3 inconsistent aggregates + + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + + assert!(result.is_ok()); + let usages = result.unwrap(); + assert_eq!(usages.len(), 2); + + let usage1 = usages.get(1).unwrap(); + assert_eq!(usage1.stored_bytes, 200); + assert_eq!(usage1.transferred_bytes, 50); + assert_eq!(usage1.number_of_puts, 10); + assert_eq!(usage1.number_of_gets, 20); + + let usage2 = usages.first().unwrap(); + assert_eq!(usage2.stored_bytes, 1000); + assert_eq!(usage2.transferred_bytes, 500); + assert_eq!(usage2.number_of_puts, 15); + assert_eq!(usage2.number_of_gets, 30); +} + #[test] fn buckets_sub_aggregates_grouped_by_consistency() { let redundancy_factor = 3;