From 6f18526eb21b87c68e3781c171c7411b9a1c16a2 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Thu, 14 Nov 2024 09:58:51 -0800 Subject: [PATCH] Remove support for Poplar1. (#3482) DAP-11 removed multi-collection and, with it, the ability to usefully use Poplar1. This commit makes the following changes: * Application-level support for Poplar1 is removed. * Unit tests which used Poplar1 now use libprio-rs' dummy::Vdaf. * Introduce a test-only ability to configure the fake dummy::Vdaf for taskprov, required to migrate some taskprov unit tests. --- aggregator/src/aggregator.rs | 23 - .../src/aggregator/aggregate_init_tests.rs | 36 +- .../aggregator/aggregation_job_continue.rs | 95 +-- .../aggregation_job_driver/tests.rs | 623 ++++++++---------- .../src/aggregator/aggregation_job_writer.rs | 7 - .../tests/aggregation_job_continue.rs | 455 +++++-------- aggregator/src/aggregator/taskprov_tests.rs | 64 +- aggregator_core/src/datastore/models.rs | 17 +- aggregator_core/src/datastore/tests.rs | 138 ++-- aggregator_core/src/lib.rs | 5 - core/Cargo.toml | 1 + core/src/dp.rs | 19 +- core/src/vdaf.rs | 48 +- messages/src/taskprov.rs | 55 +- 14 files changed, 609 insertions(+), 977 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index a2c0ac07b..40c5ff714 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -90,9 +90,7 @@ use prio::{ topology::ping_pong::{PingPongState, PingPongTopology}, vdaf::{ self, - poplar1::Poplar1, prio3::{Prio3, Prio3Count, Prio3Histogram, Prio3Sum, Prio3SumVec}, - xof::XofTurboShake128, }, }; use rand::{thread_rng, Rng}; @@ -1004,12 +1002,6 @@ impl TaskAggregator { } }, - VdafInstance::Poplar1 { bits } => { - let vdaf = Poplar1::new_turboshake128(*bits); - let verify_key = task.vdaf_verify_key()?; - VdafOps::Poplar1(Arc::new(vdaf), verify_key) - } - #[cfg(feature = "test-util")] VdafInstance::Fake { rounds } => VdafOps::Fake(Arc::new(dummy::Vdaf::new(*rounds))), @@ -1300,10 +1292,6 @@ enum VdafOps { VerifyKey, vdaf_ops_strategies::Prio3FixedPointBoundedL2VecSum, ), - Poplar1( - Arc>, - VerifyKey, - ), #[cfg(feature = "test-util")] Fake(Arc), } @@ -1458,17 +1446,6 @@ macro_rules! vdaf_ops_dispatch { } } - crate::aggregator::VdafOps::Poplar1(vdaf, verify_key) => { - let $vdaf = vdaf; - let $verify_key = verify_key; - type $Vdaf = ::prio::vdaf::poplar1::Poplar1<::prio::vdaf::xof::XofTurboShake128, 16>; - const $VERIFY_KEY_LENGTH: usize = ::janus_core::vdaf::VERIFY_KEY_LENGTH; - type $DpStrategy = janus_core::dp::NoDifferentialPrivacy; - let $dp_strategy = &Arc::new(janus_core::dp::NoDifferentialPrivacy); - let body = $body; - body - } - #[cfg(feature = "test-util")] crate::aggregator::VdafOps::Fake(vdaf) => { let $vdaf = vdaf; diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index a8b76ac50..cc0f0bce0 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -30,12 +30,7 @@ use janus_messages::{ }; use prio::{ codec::Encode, - idpf::IdpfInput, - vdaf::{ - self, dummy, - poplar1::{Poplar1, Poplar1AggregationParam}, - xof::XofTurboShake128, - }, + vdaf::{self, dummy}, }; use rand::random; use serde_json::json; @@ -179,16 +174,12 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase<0, .await } -async fn setup_poplar1_aggregate_init_test( -) -> AggregationJobInitTestCase<16, Poplar1> { - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); +async fn setup_multi_step_aggregate_init_test() -> AggregationJobInitTestCase<0, dummy::Vdaf> { setup_aggregate_init_test_for_vdaf( - Poplar1::new_turboshake128(1), - VdafInstance::Poplar1 { bits: 1 }, - aggregation_param, - IdpfInput::from_bools(&[true]), + dummy::Vdaf::new(2), + VdafInstance::Fake { rounds: 2 }, + dummy::AggregationParam(7), + 13, ) .await } @@ -501,8 +492,9 @@ async fn aggregation_job_mutation_report_shares() { #[tokio::test] async fn aggregation_job_mutation_report_aggregations() { - // We must run Poplar1 in this test so that the aggregation job won't finish on the first step - let test_case = setup_poplar1_aggregate_init_test().await; + // We set up a multi-step VDAF in this test so that the aggregation job won't finish on the + // first step. + let test_case = setup_multi_step_aggregate_init_test().await; // Generate some new reports using the existing reports' metadata, but varying the measurement // values such that the prepare state computed during aggregation initializaton won't match the @@ -514,10 +506,7 @@ async fn aggregation_job_mutation_report_aggregations() { .map(|s| { test_case .prepare_init_generator - .next_with_metadata( - s.report_share().metadata().clone(), - &IdpfInput::from_bools(&[false]), - ) + .next_with_metadata(s.report_share().metadata().clone(), &13) .0 }) .collect(); @@ -614,8 +603,9 @@ async fn aggregation_job_intolerable_clock_skew() { #[tokio::test] async fn aggregation_job_init_two_step_vdaf_idempotence() { - // We must run Poplar1 in this test so that the aggregation job won't finish on the first step - let test_case = setup_poplar1_aggregate_init_test().await; + // We set up a multi-step VDAF in this test so that the aggregation job won't finish on the + // first step. + let test_case = setup_multi_step_aggregate_init_test().await; // Send the aggregation job init request again. We should get an identical response back. let mut response = put_aggregation_job( diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 437374010..963ba0861 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -422,7 +422,7 @@ mod tests { use janus_core::{ test_util::{install_test_trace_subscriber, runtime::TestRuntime}, time::{IntervalExt, MockClock}, - vdaf::{VdafInstance, VERIFY_KEY_LENGTH}, + vdaf::VdafInstance, }; use janus_messages::{ batch_mode::TimeInterval, AggregationJobContinueReq, AggregationJobId, @@ -430,13 +430,8 @@ mod tests { PartialBatchSelector, PrepareContinue, PrepareResp, PrepareStepResult, Role, }; use prio::{ - codec::Encode, - idpf::IdpfInput, - vdaf::{ - poplar1::{Poplar1, Poplar1AggregationParam}, - xof::XofTurboShake128, - Aggregator, - }, + codec::Encode as _, + vdaf::{dummy, Aggregator}, }; use rand::random; use serde_json::json; @@ -461,14 +456,14 @@ mod tests { /// Set up a helper with an aggregation job in step 0 #[allow(clippy::unit_arg)] - async fn setup_aggregation_job_continue_test( - ) -> AggregationJobContinueTestCase> { + async fn setup_aggregation_job_continue_test() -> AggregationJobContinueTestCase<0, dummy::Vdaf> + { // Prepare datastore & request. install_test_trace_subscriber(); let aggregation_job_id = random(); let task = - TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; @@ -476,26 +471,22 @@ mod tests { let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let keypair = datastore.put_global_hpke_key().await.unwrap(); - let aggregation_parameter = Poplar1AggregationParam::try_from_prefixes(Vec::from([ - IdpfInput::from_bools(&[false]), - ])) - .unwrap(); + let aggregation_parameter = dummy::AggregationParam(7); let prepare_init_generator = PrepareInitGenerator::new( clock.clone(), helper_task.clone(), keypair.config().clone(), - Poplar1::new_turboshake128(1), - aggregation_parameter.clone(), + dummy::Vdaf::new(2), + aggregation_parameter, ); - let (prepare_init, transcript) = - prepare_init_generator.next(&IdpfInput::from_bools(&[true])); + let (prepare_init, transcript) = prepare_init_generator.next(&13); datastore .run_unnamed_tx(|tx| { let (task, aggregation_param, prepare_init, transcript) = ( helper_task.clone(), - aggregation_parameter.clone(), + aggregation_parameter, prepare_init.clone(), transcript.clone(), ); @@ -506,11 +497,7 @@ mod tests { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -522,21 +509,18 @@ mod tests { .await .unwrap(); - tx.put_report_aggregation::>( - &ReportAggregation::new( - *task.id(), - aggregation_job_id, - *prepare_init.report_share().metadata().id(), - *prepare_init.report_share().metadata().time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new( + *task.id(), + aggregation_job_id, + *prepare_init.report_share().metadata().id(), + *prepare_init.report_share().metadata().time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript.helper_prepare_transitions[0] + .prepare_state(), + }, + )) .await .unwrap(); @@ -583,7 +567,7 @@ mod tests { /// Set up a helper with an aggregation job in step 1. #[allow(clippy::unit_arg)] async fn setup_aggregation_job_continue_step_recovery_test( - ) -> AggregationJobContinueTestCase> { + ) -> AggregationJobContinueTestCase<0, dummy::Vdaf> { let mut test_case = setup_aggregation_job_continue_test().await; let first_continue_response = post_aggregation_job_and_decode( @@ -728,9 +712,8 @@ mod tests { async fn aggregation_job_continue_step_recovery_mutate_continue_request() { let test_case = setup_aggregation_job_continue_step_recovery_test().await; - let (unrelated_prepare_init, unrelated_transcript) = test_case - .prepare_init_generator - .next(&IdpfInput::from_bools(&[false])); + let (unrelated_prepare_init, unrelated_transcript) = + test_case.prepare_init_generator.next(&13); let (before_aggregation_job, before_report_aggregations) = test_case .datastore @@ -746,7 +729,7 @@ mod tests { .unwrap(); let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( &task_id, &aggregation_job_id, ) @@ -754,8 +737,8 @@ mod tests { .unwrap(); let report_aggregations = tx - .get_report_aggregations_for_aggregation_job::>( - &Poplar1::new_turboshake128(1), + .get_report_aggregations_for_aggregation_job::<0, dummy::Vdaf>( + &dummy::Vdaf::new(2), &Role::Helper, &task_id, &aggregation_job_id, @@ -798,7 +781,7 @@ mod tests { (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( &task_id, &aggregation_job_id, ) @@ -806,8 +789,8 @@ mod tests { .unwrap(); let report_aggregations = tx - .get_report_aggregations_for_aggregation_job::>( - &Poplar1::new_turboshake128(1), + .get_report_aggregations_for_aggregation_job::<0, dummy::Vdaf>( + &dummy::Vdaf::new(2), &Role::Helper, &task_id, &aggregation_job_id, @@ -835,12 +818,8 @@ mod tests { let (task_id, aggregation_job_id) = (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { - // This is a cheat: dummy_vdaf only has a single step, so we artificially force - // this job into step 2 so that we can send a request for step 1 and force a - // step mismatch error instead of tripping the check for a request to continue - // to step 0. let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( &task_id, &aggregation_job_id, ) @@ -931,7 +910,7 @@ mod tests { (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( &task_id, &aggregation_job_id, ) @@ -948,9 +927,7 @@ mod tests { .unwrap(); // Subsequent attempts to initialize the job should fail. - let (prep_init, _) = test_case - .prepare_init_generator - .next(&IdpfInput::from_bools(&[true])); + let (prep_init, _) = test_case.prepare_init_generator.next(&13); let init_req = AggregationJobInitializeReq::new( test_case.aggregation_parameter.get_encoded().unwrap(), PartialBatchSelector::new_time_interval(), diff --git a/aggregator/src/aggregator/aggregation_job_driver/tests.rs b/aggregator/src/aggregator/aggregation_job_driver/tests.rs index be73d29a5..70c9c5590 100644 --- a/aggregator/src/aggregator/aggregation_job_driver/tests.rs +++ b/aggregator/src/aggregator/aggregation_job_driver/tests.rs @@ -44,11 +44,9 @@ use janus_messages::{ use mockito::ServerGuard; use prio::{ codec::Encode, - idpf::IdpfInput, vdaf::{ - poplar1::{Poplar1, Poplar1AggregationParam}, + dummy, prio3::{Prio3, Prio3Count}, - xof::XofTurboShake128, Aggregator, }, }; @@ -72,8 +70,8 @@ async fn aggregation_job_driver() { let mut runtime_manager = TestRuntimeManager::new(); let ephemeral_datastore = ephemeral_datastore().await; let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }) + let vdaf = Arc::new(dummy::Vdaf::new(2)); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }) .with_helper_aggregator_endpoint(server.url().parse().unwrap()) .build(); @@ -85,11 +83,9 @@ async fn aggregation_job_driver() { .unwrap(); let batch_identifier = TimeInterval::to_batch_identifier(&leader_task, &(), &time).unwrap(); let report_metadata = ReportMetadata::new(random(), time); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[true])])) - .unwrap(); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( vdaf.as_ref(), @@ -112,11 +108,8 @@ async fn aggregation_job_driver() { let aggregation_job_id = random(); ds.run_unnamed_tx(|tx| { - let (task, report, aggregation_param) = ( - leader_task.clone(), - report.clone(), - aggregation_param.clone(), - ); + let (task, report, aggregation_param) = + (leader_task.clone(), report.clone(), aggregation_param); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); tx.put_client_report(&report).await.unwrap(); @@ -127,14 +120,10 @@ async fn aggregation_job_driver() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -149,14 +138,10 @@ async fn aggregation_job_driver() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, Interval::from_time(&time).unwrap(), BatchAggregationState::Aggregating { @@ -272,44 +257,40 @@ async fn aggregation_job_driver() { mocked_aggregate.assert_async().await; } - let want_aggregation_job = - AggregationJob::>::new( + let want_aggregation_job: AggregationJob<0, TimeInterval, _> = + AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, AggregationJobStep::from(2), ); - let want_report_aggregation = - ReportAggregation::>::new( - *task.id(), - aggregation_job_id, - *report.metadata().id(), - *report.metadata().time(), - 0, - None, - ReportAggregationState::Finished, - ); - let want_batch_aggregations = Vec::from([BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + let want_report_aggregation = ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), - batch_identifier, - aggregation_param.clone(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), 0, - Interval::from_time(&time).unwrap(), - BatchAggregationState::Aggregating { - aggregate_share: Some(transcript.leader_output_share.clone()), - report_count: 1, - checksum: ReportIdChecksum::for_report_id(report.metadata().id()), - aggregation_jobs_created: 1, - aggregation_jobs_terminated: 1, - }, - )]); + None, + ReportAggregationState::Finished, + ); + let want_batch_aggregations = + Vec::from([BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( + *task.id(), + batch_identifier, + aggregation_param, + 0, + Interval::from_time(&time).unwrap(), + BatchAggregationState::Aggregating { + aggregate_share: Some(transcript.leader_aggregate_share), + report_count: 1, + checksum: ReportIdChecksum::for_report_id(report.metadata().id()), + aggregation_jobs_created: 1, + aggregation_jobs_terminated: 1, + }, + )]); let (got_aggregation_job, got_report_aggregation, got_batch_aggregations) = ds .run_unnamed_tx(|tx| { @@ -319,11 +300,12 @@ async fn aggregation_job_driver() { Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( task.id(), &aggregation_job_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let report_aggregation = tx .get_report_aggregation_by_report_id( @@ -333,12 +315,16 @@ async fn aggregation_job_driver() { &aggregation_job_id, &report_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let batch_aggregations = merge_batch_aggregations_by_batch( - tx.get_batch_aggregations_for_task::>(&vdaf, task.id()) - .await - .unwrap(), + tx.get_batch_aggregations_for_task::<0, TimeInterval, dummy::Vdaf>( + &vdaf, + task.id(), + ) + .await + .unwrap(), ); Ok((aggregation_job, report_aggregation, batch_aggregations)) }) @@ -691,9 +677,9 @@ async fn step_time_interval_aggregation_job_init_two_steps() { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); + let vdaf = Arc::new(dummy::Vdaf::new(2)); - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }) + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }) .with_helper_aggregator_endpoint(server.url().parse().unwrap()) .build(); @@ -705,11 +691,9 @@ async fn step_time_interval_aggregation_job_init_two_steps() { .unwrap(); let batch_identifier = TimeInterval::to_batch_identifier(&leader_task, &(), &time).unwrap(); let report_metadata = ReportMetadata::new(random(), time); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[true])])) - .unwrap(); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( vdaf.as_ref(), @@ -732,11 +716,8 @@ async fn step_time_interval_aggregation_job_init_two_steps() { let lease = ds .run_unnamed_tx(|tx| { - let (task, report, aggregation_param) = ( - leader_task.clone(), - report.clone(), - aggregation_param.clone(), - ); + let (task, report, aggregation_param) = + (leader_task.clone(), report.clone(), aggregation_param); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); tx.put_client_report(&report).await.unwrap(); @@ -744,14 +725,10 @@ async fn step_time_interval_aggregation_job_init_two_steps() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -767,11 +744,7 @@ async fn step_time_interval_aggregation_job_init_two_steps() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), batch_identifier, aggregation_param, @@ -858,49 +831,44 @@ async fn step_time_interval_aggregation_job_init_two_steps() { // Verify. mocked_aggregate_success.assert_async().await; - let want_aggregation_job = - AggregationJob::>::new( - *task.id(), - aggregation_job_id, - aggregation_param.clone(), - (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), - AggregationJobState::InProgress, - AggregationJobStep::from(1), - ); - let want_report_aggregation = - ReportAggregation::>::new( - *task.id(), - aggregation_job_id, - *report.metadata().id(), - *report.metadata().time(), - 0, - None, - ReportAggregationState::WaitingLeader { - transition: transcript.leader_prepare_transitions[1] - .transition - .clone() - .unwrap(), - }, - ); - let want_batch_aggregations = Vec::from([BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + let want_aggregation_job = AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), - batch_identifier, + aggregation_job_id, aggregation_param, + (), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + AggregationJobState::InProgress, + AggregationJobStep::from(1), + ); + let want_report_aggregation = ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), 0, - Interval::from_time(&time).unwrap(), - BatchAggregationState::Aggregating { - aggregate_share: None, - report_count: 0, - checksum: ReportIdChecksum::default(), - aggregation_jobs_created: 1, - aggregation_jobs_terminated: 0, + None, + ReportAggregationState::WaitingLeader { + transition: transcript.leader_prepare_transitions[1] + .transition + .clone() + .unwrap(), }, - )]); + ); + let want_batch_aggregations = + Vec::from([BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( + *task.id(), + batch_identifier, + aggregation_param, + 0, + Interval::from_time(&time).unwrap(), + BatchAggregationState::Aggregating { + aggregate_share: None, + report_count: 0, + checksum: ReportIdChecksum::default(), + aggregation_jobs_created: 1, + aggregation_jobs_terminated: 0, + }, + )]); let (got_aggregation_job, got_report_aggregation, got_batch_aggregations) = ds .run_unnamed_tx(|tx| { @@ -908,11 +876,12 @@ async fn step_time_interval_aggregation_job_init_two_steps() { (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( task.id(), &aggregation_job_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let report_aggregation = tx .get_report_aggregation_by_report_id( @@ -922,12 +891,16 @@ async fn step_time_interval_aggregation_job_init_two_steps() { &aggregation_job_id, &report_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let batch_aggregations = merge_batch_aggregations_by_batch( - tx.get_batch_aggregations_for_task::>(&vdaf, task.id()) - .await - .unwrap(), + tx.get_batch_aggregations_for_task::<0, TimeInterval, dummy::Vdaf>( + &vdaf, + task.id(), + ) + .await + .unwrap(), ); Ok((aggregation_job, report_aggregation, batch_aggregations)) }) @@ -1603,14 +1576,14 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); + let vdaf = Arc::new(dummy::Vdaf::new(2)); let task = TaskBuilder::new( BatchMode::LeaderSelected { max_batch_size: Some(10), batch_time_window_size: None, }, - VdafInstance::Poplar1 { bits: 1 }, + VdafInstance::Fake { rounds: 2 }, ) .with_helper_aggregator_endpoint(server.url().parse().unwrap()) .build(); @@ -1624,11 +1597,9 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { .to_batch_interval_start(task.time_precision()) .unwrap(), ); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[true])])) - .unwrap(); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( vdaf.as_ref(), @@ -1652,11 +1623,8 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { let lease = ds .run_unnamed_tx(|tx| { - let (task, report, aggregation_param) = ( - leader_task.clone(), - report.clone(), - aggregation_param.clone(), - ); + let (task, report, aggregation_param) = + (leader_task.clone(), report.clone(), aggregation_param); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); tx.put_client_report(&report).await.unwrap(); @@ -1664,14 +1632,10 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, batch_id, Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -1687,14 +1651,10 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), batch_id, - aggregation_param.clone(), + aggregation_param, 0, Interval::from_time(report.metadata().time()).unwrap(), BatchAggregationState::Aggregating { @@ -1778,49 +1738,44 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { // Verify. mocked_aggregate_success.assert_async().await; - let want_aggregation_job = - AggregationJob::>::new( - *task.id(), - aggregation_job_id, - aggregation_param.clone(), - batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), - AggregationJobState::InProgress, - AggregationJobStep::from(1), - ); - let want_report_aggregation = - ReportAggregation::>::new( - *task.id(), - aggregation_job_id, - *report.metadata().id(), - *report.metadata().time(), - 0, - None, - ReportAggregationState::WaitingLeader { - transition: transcript.leader_prepare_transitions[1] - .transition - .clone() - .unwrap(), - }, - ); - let want_batch_aggregations = Vec::from([BatchAggregation::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + let want_aggregation_job = AggregationJob::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), + aggregation_job_id, + aggregation_param, batch_id, - aggregation_param.clone(), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + AggregationJobState::InProgress, + AggregationJobStep::from(1), + ); + let want_report_aggregation = ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), 0, - Interval::from_time(report.metadata().time()).unwrap(), - BatchAggregationState::Aggregating { - aggregate_share: None, - report_count: 0, - checksum: ReportIdChecksum::default(), - aggregation_jobs_created: 1, - aggregation_jobs_terminated: 0, + None, + ReportAggregationState::WaitingLeader { + transition: transcript.leader_prepare_transitions[1] + .transition + .clone() + .unwrap(), }, - )]); + ); + let want_batch_aggregations = + Vec::from([BatchAggregation::<0, LeaderSelected, dummy::Vdaf>::new( + *task.id(), + batch_id, + aggregation_param, + 0, + Interval::from_time(report.metadata().time()).unwrap(), + BatchAggregationState::Aggregating { + aggregate_share: None, + report_count: 0, + checksum: ReportIdChecksum::default(), + aggregation_jobs_created: 1, + aggregation_jobs_terminated: 0, + }, + )]); let (got_aggregation_job, got_report_aggregation, got_batch_aggregations) = ds .run_unnamed_tx(|tx| { @@ -1828,11 +1783,12 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, LeaderSelected, dummy::Vdaf>( task.id(), &aggregation_job_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let report_aggregation = tx .get_report_aggregation_by_report_id( @@ -1842,12 +1798,16 @@ async fn step_leader_selected_aggregation_job_init_two_steps() { &aggregation_job_id, &report_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let batch_aggregations = merge_batch_aggregations_by_batch( - tx.get_batch_aggregations_for_task::>(&vdaf, task.id()) - .await - .unwrap(), + tx.get_batch_aggregations_for_task::<0, LeaderSelected, dummy::Vdaf>( + &vdaf, + task.id(), + ) + .await + .unwrap(), ); Ok((aggregation_job, report_aggregation, batch_aggregations)) }) @@ -1872,9 +1832,9 @@ async fn step_time_interval_aggregation_job_continue() { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); + let vdaf = Arc::new(dummy::Vdaf::new(2)); - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }) + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }) .with_helper_aggregator_endpoint(server.url().parse().unwrap()) .build(); let leader_task = task.leader_view().unwrap(); @@ -1893,17 +1853,15 @@ async fn step_time_interval_aggregation_job_continue() { ) .unwrap(); let report_metadata = ReportMetadata::new(random(), time); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( vdaf.as_ref(), verify_key.as_bytes(), &aggregation_param, report_metadata.id(), - &IdpfInput::from_bools(&[true]), + &13, ); let agg_auth_token = task.aggregator_auth_token(); @@ -1921,7 +1879,7 @@ async fn step_time_interval_aggregation_job_continue() { .run_unnamed_tx(|tx| { let (task, aggregation_param, report, transcript) = ( leader_task.clone(), - aggregation_param.clone(), + aggregation_param, report.clone(), transcript.clone(), ); @@ -1935,14 +1893,10 @@ async fn step_time_interval_aggregation_job_continue() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -1952,10 +1906,7 @@ async fn step_time_interval_aggregation_job_continue() { .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id, *report.metadata().id(), @@ -1972,14 +1923,10 @@ async fn step_time_interval_aggregation_job_continue() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), active_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, Interval::from_time(report.metadata().time()).unwrap(), BatchAggregationState::Aggregating { @@ -1992,14 +1939,10 @@ async fn step_time_interval_aggregation_job_continue() { )) .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), other_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, Interval::EMPTY, BatchAggregationState::Aggregating { @@ -2099,46 +2042,44 @@ async fn step_time_interval_aggregation_job_continue() { mocked_aggregate_failure.assert_async().await; mocked_aggregate_success.assert_async().await; - let want_aggregation_job = - AggregationJob::>::new( - *task.id(), - aggregation_job_id, - aggregation_param.clone(), - (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), - AggregationJobState::Finished, - AggregationJobStep::from(2), - ); - let want_report_aggregation = - ReportAggregation::>::new( - *task.id(), - aggregation_job_id, - *report.metadata().id(), - *report.metadata().time(), - 0, - None, - ReportAggregationState::Finished, - ); + let want_aggregation_job = AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + aggregation_param, + (), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + AggregationJobState::Finished, + AggregationJobStep::from(2), + ); + let want_report_aggregation = ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), + 0, + None, + ReportAggregationState::Finished, + ); let want_batch_aggregations = Vec::from([ - BatchAggregation::>::new( + BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), active_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, Interval::from_time(report.metadata().time()).unwrap(), BatchAggregationState::Aggregating { - aggregate_share: Some(transcript.leader_output_share.clone()), + aggregate_share: Some(transcript.leader_aggregate_share), report_count: 1, checksum: ReportIdChecksum::for_report_id(report.metadata().id()), aggregation_jobs_created: 1, aggregation_jobs_terminated: 1, }, ), - BatchAggregation::>::new( + BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), other_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, Interval::EMPTY, BatchAggregationState::Aggregating { @@ -2151,11 +2092,7 @@ async fn step_time_interval_aggregation_job_continue() { ), ]); - let ( - got_aggregation_job, - got_report_aggregation, - got_batch_aggregations, - ) = ds + let (got_aggregation_job, got_report_aggregation, got_batch_aggregations) = ds .run_unnamed_tx(|tx| { let vdaf = Arc::clone(&vdaf); let task = leader_task.clone(); @@ -2163,11 +2100,12 @@ async fn step_time_interval_aggregation_job_continue() { Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( task.id(), &aggregation_job_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let report_aggregation = tx .get_report_aggregation_by_report_id( @@ -2177,19 +2115,19 @@ async fn step_time_interval_aggregation_job_continue() { &aggregation_job_id, report_metadata.id(), ) - .await.unwrap() + .await + .unwrap() .unwrap(); let batch_aggregations = merge_batch_aggregations_by_batch( - tx.get_batch_aggregations_for_task::>(&vdaf, task.id()) - .await - .unwrap(), + tx.get_batch_aggregations_for_task::<0, TimeInterval, dummy::Vdaf>( + &vdaf, + task.id(), + ) + .await + .unwrap(), ); - Ok(( - aggregation_job, - report_aggregation, - batch_aggregations, - )) + Ok((aggregation_job, report_aggregation, batch_aggregations)) }) }) .await @@ -2212,14 +2150,14 @@ async fn step_leader_selected_aggregation_job_continue() { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); + let vdaf = Arc::new(dummy::Vdaf::new(2)); let task = TaskBuilder::new( BatchMode::LeaderSelected { max_batch_size: Some(10), batch_time_window_size: None, }, - VdafInstance::Poplar1 { bits: 1 }, + VdafInstance::Fake { rounds: 2 }, ) .with_helper_aggregator_endpoint(server.url().parse().unwrap()) .build(); @@ -2232,17 +2170,15 @@ async fn step_leader_selected_aggregation_job_continue() { .to_batch_interval_start(task.time_precision()) .unwrap(), ); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( vdaf.as_ref(), verify_key.as_bytes(), &aggregation_param, report_metadata.id(), - &IdpfInput::from_bools(&[true]), + &13, ); let agg_auth_token = task.aggregator_auth_token(); @@ -2257,7 +2193,7 @@ async fn step_leader_selected_aggregation_job_continue() { let batch_id = random(); let aggregation_job_id = random(); let leader_aggregate_share = vdaf - .aggregate(&aggregation_param, [transcript.leader_output_share.clone()]) + .aggregate(&aggregation_param, [transcript.leader_output_share]) .unwrap(); let lease = ds @@ -2265,7 +2201,7 @@ async fn step_leader_selected_aggregation_job_continue() { let (task, report, aggregation_param, transcript) = ( leader_task.clone(), report.clone(), - aggregation_param.clone(), + aggregation_param, transcript.clone(), ); Box::pin(async move { @@ -2275,14 +2211,10 @@ async fn step_leader_selected_aggregation_job_continue() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, batch_id, Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -2292,10 +2224,7 @@ async fn step_leader_selected_aggregation_job_continue() { .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id, *report.metadata().id(), @@ -2312,14 +2241,10 @@ async fn step_leader_selected_aggregation_job_continue() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), batch_id, - aggregation_param.clone(), + aggregation_param, 0, Interval::from_time(report.metadata().time()).unwrap(), BatchAggregationState::Aggregating { @@ -2408,64 +2333,56 @@ async fn step_leader_selected_aggregation_job_continue() { mocked_aggregate_failure.assert_async().await; mocked_aggregate_success.assert_async().await; - let want_aggregation_job = - AggregationJob::>::new( - *task.id(), - aggregation_job_id, - aggregation_param.clone(), - batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), - AggregationJobState::Finished, - AggregationJobStep::from(2), - ); - let want_report_aggregation = - ReportAggregation::>::new( - *task.id(), - aggregation_job_id, - *report.metadata().id(), - *report.metadata().time(), - 0, - None, - ReportAggregationState::Finished, - ); - let want_batch_aggregations = Vec::from([BatchAggregation::< - VERIFY_KEY_LENGTH, - LeaderSelected, - Poplar1, - >::new( + let want_aggregation_job = AggregationJob::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), + aggregation_job_id, + aggregation_param, batch_id, - aggregation_param.clone(), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + AggregationJobState::Finished, + AggregationJobStep::from(2), + ); + let want_report_aggregation = ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), 0, - Interval::from_time(report.metadata().time()).unwrap(), - BatchAggregationState::Aggregating { - aggregate_share: Some(leader_aggregate_share), - report_count: 1, - checksum: ReportIdChecksum::for_report_id(report.metadata().id()), - aggregation_jobs_created: 1, - aggregation_jobs_terminated: 1, - }, - )]); + None, + ReportAggregationState::Finished, + ); + let want_batch_aggregations = + Vec::from([BatchAggregation::<0, LeaderSelected, dummy::Vdaf>::new( + *task.id(), + batch_id, + aggregation_param, + 0, + Interval::from_time(report.metadata().time()).unwrap(), + BatchAggregationState::Aggregating { + aggregate_share: Some(leader_aggregate_share), + report_count: 1, + checksum: ReportIdChecksum::for_report_id(report.metadata().id()), + aggregation_jobs_created: 1, + aggregation_jobs_terminated: 1, + }, + )]); - let ( - got_aggregation_job, - got_report_aggregation, - got_batch_aggregations, - ) = ds + let (got_aggregation_job, got_report_aggregation, got_batch_aggregations) = ds .run_unnamed_tx(|tx| { let (vdaf, task, report_metadata, aggregation_param) = ( Arc::clone(&vdaf), leader_task.clone(), report.metadata().clone(), - aggregation_param.clone(), + aggregation_param, ); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, LeaderSelected, dummy::Vdaf>( task.id(), &aggregation_job_id, ) - .await.unwrap() + .await + .unwrap() .unwrap(); let report_aggregation = tx .get_report_aggregation_by_report_id( @@ -2475,20 +2392,26 @@ async fn step_leader_selected_aggregation_job_continue() { &aggregation_job_id, report_metadata.id(), ) - .await.unwrap() + .await + .unwrap() .unwrap(); - let batch_aggregations = - merge_batch_aggregations_by_batch(LeaderSelected::get_batch_aggregations_for_collection_identifier::< - VERIFY_KEY_LENGTH, - Poplar1, + let batch_aggregations = merge_batch_aggregations_by_batch( + LeaderSelected::get_batch_aggregations_for_collection_identifier::< + 0, + dummy::Vdaf, _, - >(tx, task.id(), task.time_precision(), &vdaf, &batch_id, &aggregation_param) - .await.unwrap()); - Ok(( - aggregation_job, - report_aggregation, - batch_aggregations, - )) + >( + tx, + task.id(), + task.time_precision(), + &vdaf, + &batch_id, + &aggregation_param, + ) + .await + .unwrap(), + ); + Ok((aggregation_job, report_aggregation, batch_aggregations)) }) }) .await diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index f6f0253a6..f53198cfc 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -795,13 +795,6 @@ where )], ), - Poplar1 { bits } => metrics - .aggregated_report_share_dimension_histogram - .record( - u64::try_from(*bits).unwrap_or(u64::MAX), - &[KeyValue::new("type", "Poplar1")], - ), - #[cfg(feature = "test-util")] Fake { rounds: _ } | FakeFailsPrepInit diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs index 342c27af1..d772e5991 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs @@ -22,7 +22,7 @@ use janus_core::{ report_id::ReportIdChecksumExt, test_util::run_vdaf, time::{Clock, IntervalExt, MockClock, TimeExt}, - vdaf::{VdafInstance, VERIFY_KEY_LENGTH}, + vdaf::VdafInstance, }; use janus_messages::{ batch_mode::TimeInterval, AggregationJobContinueReq, AggregationJobResp, AggregationJobStep, @@ -31,14 +31,8 @@ use janus_messages::{ Time, }; use prio::{ - idpf::IdpfInput, topology::ping_pong::PingPongMessage, - vdaf::{ - dummy, - poplar1::{Poplar1, Poplar1AggregationParam}, - xof::XofTurboShake128, - Aggregator, - }, + vdaf::{dummy, Aggregator}, }; use rand::random; use std::sync::Arc; @@ -56,14 +50,13 @@ async fn aggregate_continue() { } = HttpHandlerTest::new().await; let aggregation_job_id = random(); - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); - let vdaf = Arc::new(Poplar1::::new(1)); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(vec![measurement.clone()]).unwrap(); + let vdaf = Arc::new(dummy::Vdaf::new(2)); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); // report_share_0 is a "happy path" report. let report_metadata_0 = ReportMetadata::new( @@ -82,7 +75,7 @@ async fn aggregate_continue() { ); let helper_prep_state_0 = transcript_0.helper_prepare_transitions[0].prepare_state(); let leader_prep_message_0 = &transcript_0.leader_prepare_transitions[1].message; - let report_share_0 = generate_helper_report_share::>( + let report_share_0 = generate_helper_report_share::( *task.id(), report_metadata_0.clone(), hpke_key.config(), @@ -108,7 +101,7 @@ async fn aggregate_continue() { ); let helper_prep_state_1 = transcript_1.helper_prepare_transitions[0].prepare_state(); - let report_share_1 = generate_helper_report_share::>( + let report_share_1 = generate_helper_report_share::( *task.id(), report_metadata_1.clone(), hpke_key.config(), @@ -137,7 +130,7 @@ async fn aggregate_continue() { ); let helper_prep_state_2 = transcript_2.helper_prepare_transitions[0].prepare_state(); let leader_prep_message_2 = &transcript_2.leader_prepare_transitions[1].message; - let report_share_2 = generate_helper_report_share::>( + let report_share_2 = generate_helper_report_share::( *task.id(), report_metadata_2.clone(), hpke_key.config(), @@ -155,16 +148,15 @@ async fn aggregate_continue() { report_share_2.clone(), ); let (helper_prep_state_0, helper_prep_state_1, helper_prep_state_2) = ( - helper_prep_state_0.clone(), - helper_prep_state_1.clone(), - helper_prep_state_2.clone(), + *helper_prep_state_0, + *helper_prep_state_1, + *helper_prep_state_2, ); let (report_metadata_0, report_metadata_1, report_metadata_2) = ( report_metadata_0.clone(), report_metadata_1.clone(), report_metadata_2.clone(), ); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); @@ -179,14 +171,10 @@ async fn aggregate_continue() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -196,60 +184,50 @@ async fn aggregate_continue() { .await .unwrap(); - tx.put_report_aggregation::>( - &ReportAggregation::new( - *task.id(), - aggregation_job_id, - *report_metadata_0.id(), - *report_metadata_0.time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: helper_prep_state_0, - }, - ), - ) + tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new( + *task.id(), + aggregation_job_id, + *report_metadata_0.id(), + *report_metadata_0.time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: helper_prep_state_0, + }, + )) .await .unwrap(); - tx.put_report_aggregation::>( - &ReportAggregation::new( - *task.id(), - aggregation_job_id, - *report_metadata_1.id(), - *report_metadata_1.time(), - 1, - None, - ReportAggregationState::WaitingHelper { - prepare_state: helper_prep_state_1, - }, - ), - ) + tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new( + *task.id(), + aggregation_job_id, + *report_metadata_1.id(), + *report_metadata_1.time(), + 1, + None, + ReportAggregationState::WaitingHelper { + prepare_state: helper_prep_state_1, + }, + )) .await .unwrap(); - tx.put_report_aggregation::>( - &ReportAggregation::new( - *task.id(), - aggregation_job_id, - *report_metadata_2.id(), - *report_metadata_2.time(), - 2, - None, - ReportAggregationState::WaitingHelper { - prepare_state: helper_prep_state_2, - }, - ), - ) + tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new( + *task.id(), + aggregation_job_id, + *report_metadata_2.id(), + *report_metadata_2.time(), + 2, + None, + ReportAggregationState::WaitingHelper { + prepare_state: helper_prep_state_2, + }, + )) .await .unwrap(); // Write collected batch aggregations for the interval that report_share_2 falls // into, which will cause it to fail to prepare. try_join_all( - empty_batch_aggregations::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >( + empty_batch_aggregations::<0, TimeInterval, dummy::Vdaf>( &task, BATCH_AGGREGATION_SHARD_COUNT, &Interval::new(Time::from_seconds_since_epoch(0), *task.time_precision()) @@ -299,7 +277,7 @@ async fn aggregate_continue() { let (vdaf, task) = (Arc::clone(&vdaf), task.clone()); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( task.id(), &aggregation_job_id, ) @@ -395,7 +373,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .. } = HttpHandlerTest::new().await; - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); let aggregation_job_id_0 = random(); let aggregation_job_id_1 = random(); @@ -407,11 +385,10 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .unwrap(), ); - let vdaf = Poplar1::new(1); - let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([measurement.clone()])).unwrap(); + let vdaf = dummy::Vdaf::new(2); + let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); // report_share_0 is a "happy path" report. let report_time_0 = first_batch_interval_clock @@ -428,7 +405,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_0 = transcript_0.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_0 = &transcript_0.leader_prepare_transitions[1].message; - let report_share_0 = generate_helper_report_share::>( + let report_share_0 = generate_helper_report_share::( *task.id(), report_metadata_0.clone(), hpke_key.config(), @@ -453,7 +430,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_1 = transcript_1.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_1 = &transcript_1.leader_prepare_transitions[1].message; - let report_share_1 = generate_helper_report_share::>( + let report_share_1 = generate_helper_report_share::( *task.id(), report_metadata_1.clone(), hpke_key.config(), @@ -480,7 +457,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_2 = transcript_2.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_2 = &transcript_2.leader_prepare_transitions[1].message; - let report_share_2 = generate_helper_report_share::>( + let report_share_2 = generate_helper_report_share::( *task.id(), report_metadata_2.clone(), hpke_key.config(), @@ -512,7 +489,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .unwrap(); let second_batch_want_batch_aggregations: Vec<_> = - empty_batch_aggregations::>( + empty_batch_aggregations::<0, TimeInterval, dummy::Vdaf>( &helper_task, BATCH_AGGREGATION_SHARD_COUNT, &second_batch_identifier, @@ -524,7 +501,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { BatchAggregation::new( *ba.task_id(), *ba.batch_identifier(), - ba.aggregation_parameter().clone(), + *ba.aggregation_parameter(), ba.ord(), second_batch_identifier, ba.state().clone(), @@ -541,16 +518,15 @@ async fn aggregate_continue_accumulate_batch_aggregation() { report_share_2.clone(), ); let (helper_prep_state_0, helper_prep_state_1, helper_prep_state_2) = ( - helper_prep_state_0.clone(), - helper_prep_state_1.clone(), - helper_prep_state_2.clone(), + *helper_prep_state_0, + *helper_prep_state_1, + *helper_prep_state_2, ); let (report_metadata_0, report_metadata_1, report_metadata_2) = ( report_metadata_0.clone(), report_metadata_1.clone(), report_metadata_2.clone(), ); - let aggregation_param = aggregation_param.clone(); let second_batch_want_batch_aggregations = second_batch_want_batch_aggregations.clone(); Box::pin(async move { @@ -566,14 +542,10 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id_0, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -583,10 +555,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_0, *report_metadata_0.id(), @@ -599,10 +568,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { )) .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_0, *report_metadata_1.id(), @@ -615,10 +581,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { )) .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_0, *report_metadata_2.id(), @@ -632,14 +595,10 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .await .unwrap(); - tx.put_batch_aggregation(&BatchAggregation::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_batch_aggregation(&BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), first_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, first_batch_identifier, BatchAggregationState::Aggregating { @@ -684,13 +643,12 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .run_unnamed_tx(|tx| { let task = helper_task.clone(); let vdaf = vdaf.clone(); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { Ok(merge_batch_aggregations_by_batch( TimeInterval::get_batch_aggregations_for_collection_identifier::< - VERIFY_KEY_LENGTH, - Poplar1, + 0, + dummy::Vdaf, _, >( tx, @@ -712,8 +670,8 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .aggregate( &aggregation_param, [ - transcript_0.helper_output_share.clone(), - transcript_1.helper_output_share.clone(), + transcript_0.helper_output_share, + transcript_1.helper_output_share, ], ) .unwrap(); @@ -725,7 +683,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { Vec::from([BatchAggregation::new( *task.id(), first_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, first_batch_interval, BatchAggregationState::Aggregating { @@ -742,13 +700,12 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .run_unnamed_tx(|tx| { let task = helper_task.clone(); let vdaf = vdaf.clone(); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { let mut got_batch_aggregations = TimeInterval::get_batch_aggregations_for_collection_identifier::< - VERIFY_KEY_LENGTH, - Poplar1, + 0, + dummy::Vdaf, _, >( tx, @@ -795,7 +752,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_3 = transcript_3.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_3 = &transcript_3.leader_prepare_transitions[1].message; - let report_share_3 = generate_helper_report_share::>( + let report_share_3 = generate_helper_report_share::( *task.id(), report_metadata_3.clone(), hpke_key.config(), @@ -822,7 +779,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_4 = transcript_4.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_4 = &transcript_4.leader_prepare_transitions[1].message; - let report_share_4 = generate_helper_report_share::>( + let report_share_4 = generate_helper_report_share::( *task.id(), report_metadata_4.clone(), hpke_key.config(), @@ -849,7 +806,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { ); let helper_prep_state_5 = transcript_5.helper_prepare_transitions[0].prepare_state(); let ping_pong_leader_message_5 = &transcript_5.leader_prepare_transitions[1].message; - let report_share_5 = generate_helper_report_share::>( + let report_share_5 = generate_helper_report_share::( *task.id(), report_metadata_5.clone(), hpke_key.config(), @@ -867,16 +824,15 @@ async fn aggregate_continue_accumulate_batch_aggregation() { report_share_5.clone(), ); let (helper_prep_state_3, helper_prep_state_4, helper_prep_state_5) = ( - helper_prep_state_3.clone(), - helper_prep_state_4.clone(), - helper_prep_state_5.clone(), + *helper_prep_state_3, + *helper_prep_state_4, + *helper_prep_state_5, ); let (report_metadata_3, report_metadata_4, report_metadata_5) = ( report_metadata_3.clone(), report_metadata_4.clone(), report_metadata_5.clone(), ); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { tx.put_scrubbed_report(task.id(), &report_share_3) @@ -889,11 +845,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id_1, aggregation_param, @@ -906,10 +858,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_1, *report_metadata_3.id(), @@ -922,10 +871,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { )) .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_1, *report_metadata_4.id(), @@ -938,10 +884,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { )) .await .unwrap(); - tx.put_report_aggregation(&ReportAggregation::< - VERIFY_KEY_LENGTH, - Poplar1, - >::new( + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( *task.id(), aggregation_job_id_1, *report_metadata_5.id(), @@ -979,13 +922,12 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .run_unnamed_tx(|tx| { let task = helper_task.clone(); let vdaf = vdaf.clone(); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { Ok(merge_batch_aggregations_by_batch( TimeInterval::get_batch_aggregations_for_collection_identifier::< - VERIFY_KEY_LENGTH, - Poplar1, + 0, + dummy::Vdaf, _, >( tx, @@ -1025,7 +967,7 @@ async fn aggregate_continue_accumulate_batch_aggregation() { Vec::from([BatchAggregation::new( *task.id(), first_batch_identifier, - aggregation_param.clone(), + aggregation_param, 0, first_batch_interval, BatchAggregationState::Aggregating { @@ -1042,13 +984,12 @@ async fn aggregate_continue_accumulate_batch_aggregation() { .run_unnamed_tx(|tx| { let task = helper_task.clone(); let vdaf = vdaf.clone(); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { let mut got_batch_aggregations = TimeInterval::get_batch_aggregations_for_collection_identifier::< - VERIFY_KEY_LENGTH, - Poplar1, + 0, + dummy::Vdaf, _, >( tx, @@ -1089,18 +1030,16 @@ async fn aggregate_continue_leader_sends_non_continue_or_finish_transition() { } = HttpHandlerTest::new().await; // Prepare parameters. - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); let report_id = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( - &Poplar1::new_turboshake128(1), + &dummy::Vdaf::new(2), task.vdaf_verify_key().unwrap().as_bytes(), &aggregation_param, &report_id, - &IdpfInput::from_bools(&[false]), + &13, ); let aggregation_job_id = random(); let report_metadata = ReportMetadata::new( @@ -1113,7 +1052,7 @@ async fn aggregate_continue_leader_sends_non_continue_or_finish_transition() { .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( helper_task.clone(), - aggregation_param.clone(), + aggregation_param, report_metadata.clone(), transcript.clone(), ); @@ -1134,11 +1073,7 @@ async fn aggregate_continue_leader_sends_non_continue_or_finish_transition() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - 16, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -1150,21 +1085,17 @@ async fn aggregate_continue_leader_sends_non_continue_or_finish_transition() { )) .await .unwrap(); - tx.put_report_aggregation( - &ReportAggregation::<16, Poplar1>::new( - *task.id(), - aggregation_job_id, - *report_metadata.id(), - *report_metadata.time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report_metadata.id(), + *report_metadata.time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript.helper_prepare_transitions[0].prepare_state(), + }, + )) .await }) }) @@ -1213,23 +1144,21 @@ async fn aggregate_continue_prep_step_fails() { } = HttpHandlerTest::new().await; // Prepare parameters. - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); - let vdaf = Poplar1::new_turboshake128(1); + let vdaf = dummy::Vdaf::new(2); let report_id = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( &vdaf, task.vdaf_verify_key().unwrap().as_bytes(), &aggregation_param, &report_id, - &IdpfInput::from_bools(&[false]), + &13, ); let aggregation_job_id = random(); let report_metadata = ReportMetadata::new(report_id, Time::from_seconds_since_epoch(54321)); - let helper_report_share = generate_helper_report_share::>( + let helper_report_share = generate_helper_report_share::( *task.id(), report_metadata.clone(), hpke_key.config(), @@ -1243,7 +1172,7 @@ async fn aggregate_continue_prep_step_fails() { .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript, helper_report_share) = ( helper_task.clone(), - aggregation_param.clone(), + aggregation_param, report_metadata.clone(), transcript.clone(), helper_report_share.clone(), @@ -1254,11 +1183,7 @@ async fn aggregate_continue_prep_step_fails() { tx.put_scrubbed_report(task.id(), &helper_report_share) .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - 16, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -1270,21 +1195,17 @@ async fn aggregate_continue_prep_step_fails() { )) .await .unwrap(); - tx.put_report_aggregation( - &ReportAggregation::<16, Poplar1>::new( - *task.id(), - aggregation_job_id, - *report_metadata.id(), - *report_metadata.time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report_metadata.id(), + *report_metadata.time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript.helper_prepare_transitions[0].prepare_state(), + }, + )) .await }) }) @@ -1320,7 +1241,7 @@ async fn aggregate_continue_prep_step_fails() { (vdaf.clone(), task.clone(), report_metadata.clone()); Box::pin(async move { let aggregation_job = tx - .get_aggregation_job::<16, TimeInterval, Poplar1>( + .get_aggregation_job::<0, TimeInterval, dummy::Vdaf>( task.id(), &aggregation_job_id, ) @@ -1393,18 +1314,16 @@ async fn aggregate_continue_unexpected_transition() { } = HttpHandlerTest::new().await; // Prepare parameters. - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); let report_id = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript = run_vdaf( - &Poplar1::new_turboshake128(1), + &dummy::Vdaf::new(2), task.vdaf_verify_key().unwrap().as_bytes(), &aggregation_param, &report_id, - &IdpfInput::from_bools(&[false]), + &13, ); let aggregation_job_id = random(); let report_metadata = ReportMetadata::new(report_id, Time::from_seconds_since_epoch(54321)); @@ -1414,7 +1333,7 @@ async fn aggregate_continue_unexpected_transition() { .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( helper_task.clone(), - aggregation_param.clone(), + aggregation_param, report_metadata.clone(), transcript.clone(), ); @@ -1435,11 +1354,7 @@ async fn aggregate_continue_unexpected_transition() { ) .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - 16, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -1451,21 +1366,17 @@ async fn aggregate_continue_unexpected_transition() { )) .await .unwrap(); - tx.put_report_aggregation( - &ReportAggregation::<16, Poplar1>::new( - *task.id(), - aggregation_job_id, - *report_metadata.id(), - *report_metadata.time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report_metadata.id(), + *report_metadata.time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript.helper_prepare_transitions[0].prepare_state(), + }, + )) .await }) }) @@ -1516,27 +1427,25 @@ async fn aggregate_continue_out_of_order_transition() { } = HttpHandlerTest::new().await; // Prepare parameters. - let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 2 }).build(); let helper_task = task.helper_view().unwrap(); let report_id_0 = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let aggregation_param = dummy::AggregationParam(7); let transcript_0 = run_vdaf( - &Poplar1::new_turboshake128(1), + &dummy::Vdaf::new(2), task.vdaf_verify_key().unwrap().as_bytes(), &aggregation_param, &report_id_0, - &IdpfInput::from_bools(&[false]), + &13, ); let report_metadata_0 = ReportMetadata::new(report_id_0, Time::from_seconds_since_epoch(54321)); let report_id_1 = random(); let transcript_1 = run_vdaf( - &Poplar1::new_turboshake128(1), + &dummy::Vdaf::new(2), task.vdaf_verify_key().unwrap().as_bytes(), &aggregation_param, &report_id_1, - &IdpfInput::from_bools(&[false]), + &13, ); let report_metadata_1 = ReportMetadata::new(report_id_1, Time::from_seconds_since_epoch(54321)); let aggregation_job_id = random(); @@ -1553,7 +1462,7 @@ async fn aggregate_continue_out_of_order_transition() { transcript_1, ) = ( helper_task.clone(), - aggregation_param.clone(), + aggregation_param, report_metadata_0.clone(), report_metadata_1.clone(), transcript_0.clone(), @@ -1592,14 +1501,10 @@ async fn aggregate_continue_out_of_order_transition() { .await .unwrap(); - tx.put_aggregation_job(&AggregationJob::< - 16, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -1609,38 +1514,30 @@ async fn aggregate_continue_out_of_order_transition() { .await .unwrap(); - tx.put_report_aggregation( - &ReportAggregation::<16, Poplar1>::new( - *task.id(), - aggregation_job_id, - *report_metadata_0.id(), - *report_metadata_0.time(), - 0, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript_0.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report_metadata_0.id(), + *report_metadata_0.time(), + 0, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript_0.helper_prepare_transitions[0].prepare_state(), + }, + )) .await .unwrap(); - tx.put_report_aggregation( - &ReportAggregation::<16, Poplar1>::new( - *task.id(), - aggregation_job_id, - *report_metadata_1.id(), - *report_metadata_1.time(), - 1, - None, - ReportAggregationState::WaitingHelper { - prepare_state: transcript_1.helper_prepare_transitions[0] - .prepare_state() - .clone(), - }, - ), - ) + tx.put_report_aggregation(&ReportAggregation::<0, dummy::Vdaf>::new( + *task.id(), + aggregation_job_id, + *report_metadata_1.id(), + *report_metadata_1.time(), + 1, + None, + ReportAggregationState::WaitingHelper { + prepare_state: *transcript_1.helper_prepare_transitions[0].prepare_state(), + }, + )) .await }) }) diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index a98bdfbec..2a9cf6062 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -30,7 +30,7 @@ use janus_core::{ taskprov::TASKPROV_HEADER, test_util::{install_test_trace_subscriber, runtime::TestRuntime, VdafTranscript}, time::{Clock, DurationExt, MockClock, TimeExt}, - vdaf::{new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128, VERIFY_KEY_LENGTH}, + vdaf::new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128, }; use janus_messages::{ batch_mode::LeaderSelected, @@ -47,12 +47,7 @@ use janus_messages::{ }; use prio::{ flp::gadgets::ParallelSumMultithreaded, - idpf::IdpfInput, - vdaf::{ - poplar1::{Poplar1, Poplar1AggregationParam}, - xof::XofTurboShake128, - Aggregator, Client, Vdaf, - }, + vdaf::{dummy, Aggregator, Client, Vdaf}, }; use rand::random; use ring::digest::{digest, SHA256}; @@ -67,9 +62,7 @@ use url::Url; use super::http_handlers::AggregatorHandlerBuilder; -type TestVdaf = Poplar1; - -pub struct TaskprovTestCase { +pub struct TaskprovTestCase { _ephemeral_datastore: EphemeralDatastore, clock: MockClock, collector_hpke_keypair: HpkeKeypair, @@ -85,26 +78,21 @@ pub struct TaskprovTestCase { global_hpke_key: HpkeKeypair, } -impl TaskprovTestCase { +impl TaskprovTestCase<0, dummy::Vdaf> { async fn new() -> Self { - // We use a real VDAF since taskprov doesn't have any allowance for a test VDAF, and we use - // Poplar1 so that the VDAF wil take more than one step, so we can exercise aggregation - // continuation. - let vdaf = Poplar1::new(1); + let vdaf = dummy::Vdaf::new(2); let vdaf_config = VdafConfig::new( DpConfig::new(DpMechanism::None), - VdafType::Poplar1 { bits: 1 }, + VdafType::Fake { rounds: 2 }, ) .unwrap(); - let measurement = IdpfInput::from_bools(&[true]); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[true])])) - .unwrap(); + let measurement = 13; + let aggregation_param = dummy::AggregationParam(7); Self::with_vdaf(vdaf_config, vdaf, measurement, aggregation_param).await } } -impl TaskprovTestCase +impl TaskprovTestCase where V: Vdaf + Client<16> + Aggregator + Clone, { @@ -374,7 +362,7 @@ async fn taskprov_aggregate_init() { let task_id = test.task_id; Box::pin(async move { Ok(( - tx.get_aggregation_jobs_for_task::<16, LeaderSelected, TestVdaf>(&task_id) + tx.get_aggregation_jobs_for_task::<0, LeaderSelected, dummy::Vdaf>(&task_id) .await .unwrap(), tx.get_aggregator_task(&task_id).await.unwrap(), @@ -471,7 +459,7 @@ async fn taskprov_aggregate_init_missing_extension() { let task_id = test.task_id; Box::pin(async move { Ok(( - tx.get_aggregation_jobs_for_task::<16, LeaderSelected, TestVdaf>(&task_id) + tx.get_aggregation_jobs_for_task::<0, LeaderSelected, dummy::Vdaf>(&task_id) .await .unwrap(), tx.get_aggregator_task(&task_id).await.unwrap(), @@ -554,7 +542,7 @@ async fn taskprov_aggregate_init_malformed_extension() { let task_id = test.task_id; Box::pin(async move { Ok(( - tx.get_aggregation_jobs_for_task::<16, LeaderSelected, TestVdaf>(&task_id) + tx.get_aggregation_jobs_for_task::<0, LeaderSelected, dummy::Vdaf>(&task_id) .await .unwrap(), tx.get_aggregator_task(&task_id).await.unwrap(), @@ -668,7 +656,7 @@ async fn taskprov_opt_out_mismatched_task_id() { task_expiration, VdafConfig::new( DpConfig::new(DpMechanism::None), - VdafType::Poplar1 { bits: 1 }, + VdafType::Fake { rounds: 2 }, ) .unwrap(), ) @@ -747,7 +735,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() { task_expiration, VdafConfig::new( DpConfig::new(DpMechanism::None), - VdafType::Poplar1 { bits: 1 }, + VdafType::Fake { rounds: 2 }, ) .unwrap(), ) @@ -827,7 +815,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() { task_expiration, VdafConfig::new( DpConfig::new(DpMechanism::None), - VdafType::Poplar1 { bits: 1 }, + VdafType::Fake { rounds: 2 }, ) .unwrap(), ) @@ -883,7 +871,6 @@ async fn taskprov_aggregate_continue() { let task = test.task.clone(); let report_share = report_share.clone(); let transcript = transcript.clone(); - let aggregation_param = aggregation_param.clone(); Box::pin(async move { // Aggregate continue is only possible if the task has already been inserted. @@ -892,14 +879,10 @@ async fn taskprov_aggregate_continue() { tx.put_scrubbed_report(task.id(), &report_share).await?; - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - LeaderSelected, - TestVdaf, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), aggregation_job_id, - aggregation_param.clone(), + aggregation_param, batch_id, Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), @@ -908,7 +891,7 @@ async fn taskprov_aggregate_continue() { )) .await?; - tx.put_report_aggregation::(&ReportAggregation::new( + tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new( *task.id(), aggregation_job_id, *report_share.metadata().id(), @@ -916,14 +899,12 @@ async fn taskprov_aggregate_continue() { 0, None, ReportAggregationState::WaitingHelper { - prepare_state: transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), + prepare_state: *transcript.helper_prepare_transitions[0].prepare_state(), }, )) .await?; - tx.put_aggregate_share_job::( + tx.put_aggregate_share_job::<0, LeaderSelected, dummy::Vdaf>( &AggregateShareJob::new( *task.id(), batch_id, @@ -1028,14 +1009,13 @@ async fn taskprov_aggregate_share() { let interval = Interval::new(Time::from_seconds_since_epoch(6000), *task.time_precision()) .unwrap(); - let aggregation_param = aggregation_param.clone(); let transcript = transcript.clone(); Box::pin(async move { tx.put_aggregator_task(&task.taskprov_helper_view().unwrap()) .await?; - tx.put_batch_aggregation(&BatchAggregation::<16, LeaderSelected, TestVdaf>::new( + tx.put_batch_aggregation(&BatchAggregation::<0, LeaderSelected, dummy::Vdaf>::new( *task.id(), batch_id, aggregation_param, @@ -1133,7 +1113,7 @@ async fn taskprov_aggregate_share() { /// This runs aggregation job init, aggregation job continue, and aggregate share requests against a /// taskprov-enabled helper, and confirms that correct results are returned. #[tokio::test] -async fn end_to_end_poplar1() { +async fn end_to_end() { let test = TaskprovTestCase::new().await; let (auth_header_name, auth_header_value) = test .peer_aggregator diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index df05aefac..d97b6a341 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -1609,18 +1609,21 @@ pub fn merge_batch_aggregations_by_batch< mut batch_aggregations: Vec>, ) -> Vec> where - A::AggregationParam: PartialEq, + A::AggregationParam: Ord, { use itertools::Itertools as _; - // We sort using encoded aggregation param, removing requirement for AggregationParam: Ord, - // which is not satisfied by all VDAFs (e.g. Poplar1). - batch_aggregations.sort_by_key(|ba| { + batch_aggregations.sort_by(|ba_l, ba_r| { ( - *ba.task_id(), - ba.batch_identifier().clone(), - ba.aggregation_parameter().get_encoded().unwrap(), + ba_l.task_id(), + ba_l.batch_identifier(), + ba_l.aggregation_parameter(), ) + .cmp(&( + ba_r.task_id(), + ba_r.batch_identifier(), + ba_r.aggregation_parameter(), + )) }); batch_aggregations .into_iter() diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index d3fbee723..916159635 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -42,14 +42,8 @@ use prio::{ dp::{ distributions::PureDpDiscreteLaplace, DifferentialPrivacyStrategy, PureDpBudget, Rational, }, - idpf::IdpfInput, topology::ping_pong::PingPongMessage, - vdaf::{ - dummy, - poplar1::{Poplar1, Poplar1AggregationParam}, - prio3::Prio3Count, - xof::XofTurboShake128, - }, + vdaf::{dummy, prio3::Prio3Count}, }; use rand::{distributions::Standard, random, thread_rng, Rng}; use std::{ @@ -200,8 +194,6 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { }, Role::Leader, ), - (VdafInstance::Poplar1 { bits: 8 }, Role::Helper), - (VdafInstance::Poplar1 { bits: 64 }, Role::Helper), ] { let task = TaskBuilder::new(task::BatchMode::TimeInterval, vdaf) .with_report_expiry_age(Some(Duration::from_seconds(3600))) @@ -2267,29 +2259,20 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { install_test_trace_subscriber(); let report_id = random(); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); - let verify_key: [u8; VERIFY_KEY_LENGTH] = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); - let vdaf_transcript = run_vdaf( - vdaf.as_ref(), - &verify_key, - &aggregation_param, - &report_id, - &IdpfInput::from_bools(&[false]), - ); + let vdaf = Arc::new(dummy::Vdaf::new(2)); + let aggregation_param = dummy::AggregationParam(5); + let vdaf_transcript = run_vdaf(vdaf.as_ref(), &[], &aggregation_param, &report_id, &13); for (ord, (role, state)) in [ ( Role::Leader, ReportAggregationState::StartLeader { - public_share: vdaf_transcript.public_share.clone(), + public_share: vdaf_transcript.public_share, leader_extensions: Vec::from([ Extension::new(ExtensionType::Tbd, Vec::from("extension_data_0")), Extension::new(ExtensionType::Tbd, Vec::from("extension_data_1")), ]), - leader_input_share: vdaf_transcript.leader_input_share.clone(), + leader_input_share: vdaf_transcript.leader_input_share, helper_encrypted_input_share: HpkeCiphertext::new( HpkeConfigId::from(13), Vec::from("encapsulated_context"), @@ -2309,9 +2292,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { ( Role::Helper, ReportAggregationState::WaitingHelper { - prepare_state: vdaf_transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), + prepare_state: *vdaf_transcript.helper_prepare_transitions[0].prepare_state(), }, ), (Role::Leader, ReportAggregationState::Finished), @@ -2337,7 +2318,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { let task = TaskBuilder::new( task::BatchMode::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, + VdafInstance::Fake { rounds: 2 }, ) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) .build() @@ -2349,14 +2330,10 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { let want_report_aggregation = ds .run_tx("test-put-report-aggregations", |tx| { let (task, state, aggregation_param) = - (task.clone(), state.clone(), aggregation_param.clone()); + (task.clone(), state.clone(), aggregation_param); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -2598,23 +2575,14 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme let ds = ephemeral_datastore.datastore(clock.clone()).await; let report_id = random(); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); - let verify_key: [u8; VERIFY_KEY_LENGTH] = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let vdaf = Arc::new(dummy::Vdaf::new(2)); + let aggregation_param = dummy::AggregationParam(7); - let vdaf_transcript = run_vdaf( - vdaf.as_ref(), - &verify_key, - &aggregation_param, - &report_id, - &IdpfInput::from_bools(&[false]), - ); + let vdaf_transcript = run_vdaf(vdaf.as_ref(), &[], &aggregation_param, &report_id, &13); let task = TaskBuilder::new( task::BatchMode::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, + VdafInstance::Fake { rounds: 2 }, ) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) .build() @@ -2624,19 +2592,12 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme let want_report_aggregations = ds .run_unnamed_tx(|tx| { - let (task, vdaf_transcript, aggregation_param) = ( - task.clone(), - vdaf_transcript.clone(), - aggregation_param.clone(), - ); + let (task, vdaf_transcript, aggregation_param) = + (task.clone(), vdaf_transcript.clone(), aggregation_param); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -2652,9 +2613,9 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme let mut want_report_aggregations = Vec::new(); for (ord, state) in [ ReportAggregationState::StartLeader { - public_share: vdaf_transcript.public_share.clone(), + public_share: vdaf_transcript.public_share, leader_extensions: Vec::new(), - leader_input_share: vdaf_transcript.leader_input_share.clone(), + leader_input_share: vdaf_transcript.leader_input_share, helper_encrypted_input_share: HpkeCiphertext::new( HpkeConfigId::from(13), Vec::from("encapsulated_context"), @@ -2662,9 +2623,8 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme ), }, ReportAggregationState::WaitingHelper { - prepare_state: vdaf_transcript.helper_prepare_transitions[0] - .prepare_state() - .clone(), + prepare_state: *vdaf_transcript.helper_prepare_transitions[0] + .prepare_state(), }, ReportAggregationState::Finished, ReportAggregationState::Failed { @@ -2762,23 +2722,14 @@ async fn create_report_aggregation_from_client_reports_table( let ds = ephemeral_datastore.datastore(clock.clone()).await; let report_id = random(); - let vdaf = Arc::new(Poplar1::new_turboshake128(1)); - let verify_key: [u8; VERIFY_KEY_LENGTH] = random(); - let aggregation_param = - Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])])) - .unwrap(); + let vdaf = Arc::new(dummy::Vdaf::new(2)); + let aggregation_param = dummy::AggregationParam(7); - let vdaf_transcript = run_vdaf( - vdaf.as_ref(), - &verify_key, - &aggregation_param, - &report_id, - &IdpfInput::from_bools(&[false]), - ); + let vdaf_transcript = run_vdaf(vdaf.as_ref(), &[], &aggregation_param, &report_id, &13); let task = TaskBuilder::new( task::BatchMode::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, + VdafInstance::Fake { rounds: 2 }, ) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) .build() @@ -2790,14 +2741,10 @@ async fn create_report_aggregation_from_client_reports_table( let clock = clock.clone(); let task = task.clone(); let vdaf_transcript = vdaf_transcript.clone(); - let aggregation_param = aggregation_param.clone(); + Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH, - TimeInterval, - Poplar1, - >::new( + tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new( *task.id(), aggregation_job_id, aggregation_param, @@ -2812,19 +2759,18 @@ async fn create_report_aggregation_from_client_reports_table( let report_id = random(); let timestamp = clock.now(); - let leader_stored_report = - LeaderStoredReport::<16, Poplar1>::new( - *task.id(), - ReportMetadata::new(report_id, timestamp), - vdaf_transcript.public_share, - Vec::new(), - vdaf_transcript.leader_input_share, - HpkeCiphertext::new( - HpkeConfigId::from(9), - Vec::from(b"encapsulated"), - Vec::from(b"encrypted helper share"), - ), - ); + let leader_stored_report = LeaderStoredReport::<0, dummy::Vdaf>::new( + *task.id(), + ReportMetadata::new(report_id, timestamp), + (), + Vec::new(), + vdaf_transcript.leader_input_share, + HpkeCiphertext::new( + HpkeConfigId::from(9), + Vec::from(b"encapsulated"), + Vec::from(b"encrypted helper share"), + ), + ); tx.put_client_report(&leader_stored_report).await.unwrap(); let report_aggregation_metadata = ReportAggregationMetadata::new( @@ -2846,10 +2792,10 @@ async fn create_report_aggregation_from_client_reports_table( timestamp, 0, None, - ReportAggregationState::<16, Poplar1>::StartLeader { - public_share: leader_stored_report.public_share().clone(), + ReportAggregationState::<0, dummy::Vdaf>::StartLeader { + public_share: *leader_stored_report.public_share(), leader_extensions: leader_stored_report.leader_extensions().to_owned(), - leader_input_share: leader_stored_report.leader_input_share().clone(), + leader_input_share: *leader_stored_report.leader_input_share(), helper_encrypted_input_share: leader_stored_report .helper_encrypted_input_share() .clone(), diff --git a/aggregator_core/src/lib.rs b/aggregator_core/src/lib.rs index 767334493..7e96ef227 100644 --- a/aggregator_core/src/lib.rs +++ b/aggregator_core/src/lib.rs @@ -37,11 +37,6 @@ impl AsRef<[u8]> for SecretBytes { /// A marker trait for VDAFs that have an aggregation parameter other than the unit type. pub trait VdafHasAggregationParameter {} -impl VdafHasAggregationParameter - for prio::vdaf::poplar1::Poplar1 -{ -} - #[cfg(feature = "test-util")] impl VdafHasAggregationParameter for prio::vdaf::dummy::Vdaf {} diff --git a/core/Cargo.toml b/core/Cargo.toml index 31d4470fe..bac14c6c4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,6 +26,7 @@ test-util = [ "dep:tokio-stream", "dep:tracing-log", "dep:tracing-subscriber", + "janus_messages/test-util", "kube/ws", "prio/test-util", "tokio/macros", diff --git a/core/src/dp.rs b/core/src/dp.rs index 5c18aa7fd..8a179d343 100644 --- a/core/src/dp.rs +++ b/core/src/dp.rs @@ -7,7 +7,7 @@ use prio::flp::{ types::fixedpoint_l2::{compatible_float::CompatibleFloat, FixedPointBoundedL2VecSum}, }; #[cfg(feature = "test-util")] -use prio::vdaf::dummy::Vdaf; +use prio::vdaf::{dummy, AggregatorWithNoise}; use prio::{ dp::{ DifferentialPrivacyBudget, DifferentialPrivacyDistribution, DifferentialPrivacyStrategy, @@ -18,7 +18,6 @@ use prio::{ gadgets::{Mul, ParallelSumGadget}, TypeWithNoise, }, - vdaf::{xof::XofTurboShake128, AggregatorWithNoise}, }; use serde::{Deserialize, Serialize}; @@ -50,7 +49,7 @@ impl DifferentialPrivacyStrategy for NoDifferentialPrivacy { // identity strategy implementations for vdafs from janus #[cfg(feature = "test-util")] -impl AggregatorWithNoise<0, 16, NoDifferentialPrivacy> for Vdaf { +impl AggregatorWithNoise<0, 16, NoDifferentialPrivacy> for dummy::Vdaf { fn add_noise_to_agg_share( &self, _dp_strategy: &NoDifferentialPrivacy, @@ -144,17 +143,3 @@ where Ok(()) } } - -impl AggregatorWithNoise<16, 16, NoDifferentialPrivacy> - for prio::vdaf::poplar1::Poplar1 -{ - fn add_noise_to_agg_share( - &self, - _dp_strategy: &NoDifferentialPrivacy, - _agg_param: &Self::AggregationParam, - _agg_share: &mut Self::AggregateShare, - _num_measurements: usize, - ) -> Result<(), prio::vdaf::VdafError> { - Ok(()) - } -} diff --git a/core/src/vdaf.rs b/core/src/vdaf.rs index 6ee9dc376..7fdae75d2 100644 --- a/core/src/vdaf.rs +++ b/core/src/vdaf.rs @@ -11,7 +11,7 @@ use prio::{ use serde::{Deserialize, Serialize}; use std::str; -/// The length of the verify key parameter for Prio3 and Poplar1 VDAF instantiations using +/// The length of the verify key parameter for Prio3 VDAF instantiations using /// [`XofTurboShake128`][prio::vdaf::xof::XofTurboShake128]. pub const VERIFY_KEY_LENGTH: usize = 16; @@ -131,8 +131,6 @@ pub enum VdafInstance { dp_strategy: vdaf_dp_strategies::Prio3FixedPointBoundedL2VecSum, length: usize, }, - /// The `poplar1` VDAF. Support for this VDAF is experimental. - Poplar1 { bits: usize }, /// A fake, no-op VDAF, which uses an aggregation parameter and a variable number of rounds. #[cfg(feature = "test-util")] @@ -161,7 +159,7 @@ impl VdafInstance { VERIFY_KEY_LENGTH_HMACSHA256_AES128 } - // All other VDAFs (Prio3 as-specified and Poplar1) have the same verify key length. + // All other VDAFs (Prio3 as-specified) have the same verify key length. _ => VERIFY_KEY_LENGTH, } } @@ -206,9 +204,10 @@ impl TryFrom<&taskprov::VdafType> for VdafInstance { chunk_length: *chunk_length as usize, dp_strategy: vdaf_dp_strategies::Prio3Histogram::NoDifferentialPrivacy, }), - taskprov::VdafType::Poplar1 { bits } => Ok(Self::Poplar1 { - bits: *bits as usize, - }), + + #[cfg(feature = "test-util")] + taskprov::VdafType::Fake { rounds } => Ok(Self::Fake { rounds: *rounds }), + _ => Err("unknown VdafType"), } } @@ -349,16 +348,6 @@ macro_rules! vdaf_dispatch_impl_base { } } - ::janus_core::vdaf::VdafInstance::Poplar1 { bits } => { - let $vdaf = ::prio::vdaf::poplar1::Poplar1::new_turboshake128(*bits); - type $Vdaf = - ::prio::vdaf::poplar1::Poplar1<::prio::vdaf::xof::XofTurboShake128, 16>; - const $VERIFY_KEY_LEN: usize = ::janus_core::vdaf::VERIFY_KEY_LENGTH; - type $DpStrategy = janus_core::dp::NoDifferentialPrivacy; - let $dp_strategy = janus_core::dp::NoDifferentialPrivacy; - $body - } - _ => unreachable!(), } }; @@ -480,8 +469,7 @@ macro_rules! vdaf_dispatch_impl { | ::janus_core::vdaf::VdafInstance::Prio3Sum { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVec { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVecField64MultiproofHmacSha256Aes128 { .. } - | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } - | ::janus_core::vdaf::VdafInstance::Poplar1 { .. } => { + | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } => { ::janus_core::vdaf_dispatch_impl_base!(impl match base $vdaf_instance, ($vdaf, $Vdaf, $VERIFY_KEY_LEN, $dp_strategy, $DpStrategy) => $body) } @@ -510,8 +498,7 @@ macro_rules! vdaf_dispatch_impl { | ::janus_core::vdaf::VdafInstance::Prio3Sum { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVec { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVecField64MultiproofHmacSha256Aes128 { .. } - | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } - | ::janus_core::vdaf::VdafInstance::Poplar1 { .. } => { + | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } => { ::janus_core::vdaf_dispatch_impl_base!(impl match base $vdaf_instance, ($vdaf, $Vdaf, $VERIFY_KEY_LEN, $dp_strategy, $DpStrategy) => $body) } @@ -534,8 +521,7 @@ macro_rules! vdaf_dispatch_impl { | ::janus_core::vdaf::VdafInstance::Prio3Sum { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVec { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVecField64MultiproofHmacSha256Aes128 { .. } - | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } - | ::janus_core::vdaf::VdafInstance::Poplar1 { .. } => { + | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } => { ::janus_core::vdaf_dispatch_impl_base!(impl match base $vdaf_instance, ($vdaf, $Vdaf, $VERIFY_KEY_LEN, $dp_strategy, $DpStrategy) => $body) } @@ -560,8 +546,7 @@ macro_rules! vdaf_dispatch_impl { | ::janus_core::vdaf::VdafInstance::Prio3Sum { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVec { .. } | ::janus_core::vdaf::VdafInstance::Prio3SumVecField64MultiproofHmacSha256Aes128 { .. } - | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } - | ::janus_core::vdaf::VdafInstance::Poplar1 { .. } => { + | ::janus_core::vdaf::VdafInstance::Prio3Histogram { .. } => { ::janus_core::vdaf_dispatch_impl_base!(impl match base $vdaf_instance, ($vdaf, $Vdaf, $VERIFY_KEY_LEN, $dp_strategy, $DpStrategy) => $body) } @@ -791,19 +776,6 @@ mod tests { Token::StructVariantEnd, ], ); - assert_tokens( - &VdafInstance::Poplar1 { bits: 64 }, - &[ - Token::StructVariant { - name: "VdafInstance", - variant: "Poplar1", - len: 1, - }, - Token::Str("bits"), - Token::U64(64), - Token::StructVariantEnd, - ], - ); assert_tokens( &VdafInstance::Fake { rounds: 17 }, &[ diff --git a/messages/src/taskprov.rs b/messages/src/taskprov.rs index d655c17a9..29b60e666 100644 --- a/messages/src/taskprov.rs +++ b/messages/src/taskprov.rs @@ -331,9 +331,12 @@ pub enum VdafType { /// Size of each proof chunk. chunk_length: u32, }, - Poplar1 { - /// Bit length of the input string. - bits: u16, + + /// A fake, no-op VDAF, which uses an aggregation parameter and a variable number of rounds. + #[cfg(feature = "test-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] + Fake { + rounds: u32, }, } @@ -343,7 +346,9 @@ impl VdafType { const PRIO3SUMVEC: u32 = 0x00000002; const PRIO3HISTOGRAM: u32 = 0x00000003; const PRIO3SUMVECFIELD64MULTIPROOFHMACSHA256AES128: u32 = 0xFFFF1003; - const POPLAR1: u32 = 0x00001000; + + #[cfg(feature = "test-util")] + const FAKE: u32 = 0xFFFF0000; // Chosen from the "reserved for private use" space. fn vdaf_type_code(&self) -> u32 { match self { @@ -354,7 +359,9 @@ impl VdafType { Self::PRIO3SUMVECFIELD64MULTIPROOFHMACSHA256AES128 } Self::Prio3Histogram { .. } => Self::PRIO3HISTOGRAM, - Self::Poplar1 { .. } => Self::POPLAR1, + + #[cfg(feature = "test-util")] + Self::Fake { .. } => Self::FAKE, } } } @@ -394,8 +401,10 @@ impl Encode for VdafType { length.encode(bytes)?; chunk_length.encode(bytes)?; } - Self::Poplar1 { bits } => { - bits.encode(bytes)?; + + #[cfg(feature = "test-util")] + Self::Fake { rounds } => { + rounds.encode(bytes)?; } } Ok(()) @@ -409,7 +418,9 @@ impl Encode for VdafType { Self::Prio3SumVec { .. } => 9, Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { .. } => 10, Self::Prio3Histogram { .. } => 8, - Self::Poplar1 { .. } => 2, + + #[cfg(feature = "test-util")] + Self::Fake { .. } => 4, }, ) } @@ -440,9 +451,12 @@ impl Decode for VdafType { length: u32::decode(bytes)?, chunk_length: u32::decode(bytes)?, }, - Self::POPLAR1 => Self::Poplar1 { - bits: u16::decode(bytes)?, + + #[cfg(feature = "test-util")] + Self::FAKE => Self::Fake { + rounds: u32::decode(bytes)?, }, + val => { return Err(CodecError::Other( anyhow!("unexpected VDAF type code value {}", val).into(), @@ -655,27 +669,6 @@ mod tests { "00000012", // chunk_length ), ), - ( - VdafType::Poplar1 { bits: u16::MIN }, - concat!( - "00001000", // vdaf_type_code - "0000", // bits - ), - ), - ( - VdafType::Poplar1 { bits: 0xABAB }, - concat!( - "00001000", // vdaf_type_code - "ABAB", // bits - ), - ), - ( - VdafType::Poplar1 { bits: u16::MAX }, - concat!( - "00001000", // vdaf_type_code - "FFFF" // bits - ), - ), ]) }