Skip to content

Commit

Permalink
[Experimental] Remove time-interval query type.
Browse files Browse the repository at this point in the history
  • Loading branch information
branlwyd committed Jul 20, 2023
1 parent d46f047 commit 96bae78
Show file tree
Hide file tree
Showing 31 changed files with 1,229 additions and 6,452 deletions.
158 changes: 10 additions & 148 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ use janus_core::{
time::{Clock, DurationExt, IntervalExt, TimeExt},
};
use janus_messages::{
problem_type::DapProblemType,
query_type::{FixedSize, TimeInterval},
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobRound,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeCiphertext,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareStep, PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role,
TaskId,
problem_type::DapProblemType, query_type::FixedSize, AggregateShare, AggregateShareAad,
AggregateShareReq, AggregationJobContinueReq, AggregationJobId, AggregationJobInitializeReq,
AggregationJobResp, AggregationJobRound, BatchSelector, Collection, CollectionJobId,
CollectionReq, Duration, HpkeCiphertext, HpkeConfigList, InputShareAad, Interval,
PartialBatchSelector, PlaintextInputShare, PrepareStep, PrepareStepResult, Report,
ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId,
};
use opentelemetry::{
metrics::{Counter, Histogram, Meter},
Expand Down Expand Up @@ -836,20 +834,6 @@ impl VdafOps {
report: Report,
) -> Result<(), Arc<Error>> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_upload_generic::<VERIFY_KEY_LENGTH, TimeInterval, VdafType, _>(
Arc::clone(vdaf),
clock,
upload_decrypt_failure_counter,
upload_decode_failure_counter,
task,
report_writer,
report,
)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_upload_generic::<VERIFY_KEY_LENGTH, FixedSize, VdafType, _>(
Expand Down Expand Up @@ -883,20 +867,6 @@ impl VdafOps {
req_bytes: &[u8],
) -> Result<AggregationJobResp, Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_init_generic::<VERIFY_KEY_LENGTH, TimeInterval, VdafType, _>(
datastore,
vdaf,
aggregate_step_failure_counter,
task,
aggregation_job_id,
verify_key,
req_bytes,
)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_init_generic::<VERIFY_KEY_LENGTH, FixedSize, VdafType, _>(
Expand Down Expand Up @@ -930,21 +900,6 @@ impl VdafOps {
request_hash: [u8; 32],
) -> Result<AggregationJobResp, Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_continue_generic::<VERIFY_KEY_LENGTH, TimeInterval, VdafType, _>(
datastore,
Arc::clone(vdaf),
aggregate_step_failure_counter,
task,
batch_aggregation_shard_count,
aggregation_job_id,
req,
request_hash,
)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_continue_generic::<VERIFY_KEY_LENGTH, FixedSize, VdafType, _>(
Expand Down Expand Up @@ -1714,17 +1669,6 @@ impl VdafOps {
collection_req_bytes: &[u8],
) -> Result<(), Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_create_collection_job_generic::<
VERIFY_KEY_LENGTH,
TimeInterval,
VdafType,
_,
>(datastore, task, Arc::clone(vdaf), collection_job_id, collection_req_bytes)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_create_collection_job_generic::<
Expand Down Expand Up @@ -2015,17 +1959,6 @@ impl VdafOps {
collection_job_id: &CollectionJobId,
) -> Result<Option<Vec<u8>>, Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_get_collection_job_generic::<
VERIFY_KEY_LENGTH,
TimeInterval,
VdafType,
_,
>(datastore, task, Arc::clone(vdaf), collection_job_id)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_get_collection_job_generic::<
Expand Down Expand Up @@ -2189,17 +2122,6 @@ impl VdafOps {
collection_job_id: &CollectionJobId,
) -> Result<(), Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_delete_collection_job_generic::<
VERIFY_KEY_LENGTH,
TimeInterval,
VdafType,
_,
>(datastore, task, Arc::clone(vdaf), collection_job_id)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_delete_collection_job_generic::<
Expand Down Expand Up @@ -2272,17 +2194,6 @@ impl VdafOps {
req_bytes: &[u8],
) -> Result<AggregateShare, Error> {
match task.query_type() {
task::QueryType::TimeInterval => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_share_generic::<
VERIFY_KEY_LENGTH,
TimeInterval,
VdafType,
_,
>(datastore, clock, task, Arc::clone(vdaf), req_bytes, batch_aggregation_shard_count)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_ops_dispatch!(self, (vdaf, _, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_share_generic::<
Expand Down Expand Up @@ -2636,7 +2547,6 @@ mod tests {
use futures::future::try_join_all;
use janus_aggregator_core::{
datastore::{
models::{CollectionJob, CollectionJobState},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore,
},
Expand All @@ -2645,14 +2555,13 @@ mod tests {
};
use janus_core::{
hpke::{self, HpkeApplicationInfo, Label},
task::{VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
task::VdafInstance,
test_util::{dummy_vdaf, install_test_trace_subscriber},
time::{Clock, MockClock, TimeExt},
};
use janus_messages::{
query_type::TimeInterval, Duration, Extension, HpkeCiphertext, HpkeConfig, HpkeConfigId,
InputShareAad, Interval, PlaintextInputShare, Report, ReportId, ReportMetadata,
ReportShare, Role, TaskId, Time,
Duration, Extension, HpkeCiphertext, HpkeConfig, HpkeConfigId, InputShareAad,
PlaintextInputShare, Report, ReportId, ReportMetadata, ReportShare, Role, TaskId, Time,
};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -2733,7 +2642,7 @@ mod tests {
let clock = MockClock::default();
let vdaf = Prio3Count::new_count(2).unwrap();
let task = TaskBuilder::new(
QueryType::TimeInterval,
QueryType::FixedSize { max_batch_size: 10 },
VdafInstance::Prio3Count,
Role::Leader,
)
Expand Down Expand Up @@ -2959,53 +2868,6 @@ mod tests {
});
}

#[tokio::test]
async fn upload_report_for_collected_batch() {
install_test_trace_subscriber();

let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(default_aggregator_config()).await;
let report = create_report(&task, clock.now());

// Insert a collection job for the batch interval including our report.
let batch_interval = Interval::new(
report
.metadata()
.time()
.to_batch_interval_start(task.time_precision())
.unwrap(),
*task.time_precision(),
)
.unwrap();
datastore
.run_tx(|tx| {
let task = task.clone();
Box::pin(async move {
tx.put_collection_job(&CollectionJob::<
PRIO3_VERIFY_KEY_LENGTH,
TimeInterval,
Prio3Count,
>::new(
*task.id(),
random(),
batch_interval,
(),
CollectionJobState::Start,
))
.await
})
})
.await
.unwrap();

// Try to upload the report, verify that we get the expected error.
assert_matches!(aggregator.handle_upload(task.id(), &report.get_encoded()).await.unwrap_err().as_ref(), Error::ReportRejected(err_task_id, err_report_id, err_time) => {
assert_eq!(task.id(), err_task_id);
assert_eq!(report.metadata().id(), err_report_id);
assert_eq!(report.metadata().time(), err_time);
});
}

pub(crate) fn generate_helper_report_share<V: vdaf::Client<16>>(
task_id: TaskId,
report_metadata: ReportMetadata,
Expand Down
32 changes: 20 additions & 12 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use janus_core::{
time::{Clock, MockClock, TimeExt as _},
};
use janus_messages::{
query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, PartialBatchSelector,
ReportMetadata, ReportShare, Role,
query_type::FixedSize, AggregationJobId, AggregationJobInitializeReq, BatchId,
PartialBatchSelector, ReportMetadata, ReportShare, Role,
};
use prio::codec::Encode;
use rand::random;
Expand Down Expand Up @@ -90,7 +90,8 @@ pub(super) struct AggregationJobInitTestCase {
pub(super) report_share_generator: ReportShareGenerator,
pub(super) report_shares: Vec<ReportShare>,
pub(super) aggregation_job_id: AggregationJobId,
aggregation_job_init_req: AggregationJobInitializeReq<TimeInterval>,
aggregation_job_init_req: AggregationJobInitializeReq<FixedSize>,
pub(super) batch_id: BatchId,
pub(super) aggregation_param: dummy_vdaf::AggregationParam,
pub(super) handler: Box<dyn Handler>,
pub(super) datastore: Arc<Datastore<MockClock>>,
Expand All @@ -115,7 +116,12 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase {
async fn setup_aggregate_init_test_without_sending_request() -> AggregationJobInitTestCase {
install_test_trace_subscriber();

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build();
let task = TaskBuilder::new(
QueryType::FixedSize { max_batch_size: 10 },
VdafInstance::Fake,
Role::Helper,
)
.build();
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
Expand All @@ -141,9 +147,10 @@ async fn setup_aggregate_init_test_without_sending_request() -> AggregationJobIn
]);

let aggregation_job_id = random();
let batch_id = random();
let aggregation_job_init_req = AggregationJobInitializeReq::new(
aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
PartialBatchSelector::new_fixed_size(batch_id),
report_shares.clone(),
);

Expand All @@ -154,6 +161,7 @@ async fn setup_aggregate_init_test_without_sending_request() -> AggregationJobIn
report_share_generator,
aggregation_job_id,
aggregation_job_init_req,
batch_id,
aggregation_param,
handler: Box::new(handler),
datastore,
Expand All @@ -164,7 +172,7 @@ async fn setup_aggregate_init_test_without_sending_request() -> AggregationJobIn
pub(crate) async fn put_aggregation_job(
task: &Task,
aggregation_job_id: &AggregationJobId,
aggregation_job: &AggregationJobInitializeReq<TimeInterval>,
aggregation_job: &AggregationJobInitializeReq<FixedSize>,
handler: &impl Handler,
) -> TestConn {
put(task.aggregation_job_uri(aggregation_job_id).unwrap().path())
Expand All @@ -174,7 +182,7 @@ pub(crate) async fn put_aggregation_job(
)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
AggregationJobInitializeReq::<FixedSize>::MEDIA_TYPE,
)
.with_request_body(aggregation_job.get_encoded())
.run_async(handler)
Expand All @@ -201,7 +209,7 @@ async fn aggregation_job_init_authorization_dap_auth_token() {
.with_request_header(auth_header, auth_value)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
AggregationJobInitializeReq::<FixedSize>::MEDIA_TYPE,
)
.with_request_body(test_case.aggregation_job_init_req.get_encoded())
.run_async(&test_case.handler)
Expand Down Expand Up @@ -236,7 +244,7 @@ async fn aggregation_job_init_malformed_authorization_header(#[case] header_valu
)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
AggregationJobInitializeReq::<FixedSize>::MEDIA_TYPE,
)
.with_request_body(test_case.aggregation_job_init_req.get_encoded())
.run_async(&test_case.handler)
Expand All @@ -252,7 +260,7 @@ async fn aggregation_job_mutation_aggregation_job() {
// Put the aggregation job again, but with a different aggregation parameter.
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
dummy_vdaf::AggregationParam(1).get_encoded(),
PartialBatchSelector::new_time_interval(),
PartialBatchSelector::new_fixed_size(test_case.batch_id),
test_case.report_shares,
);

Expand Down Expand Up @@ -292,7 +300,7 @@ async fn aggregation_job_mutation_report_shares() {
] {
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
PartialBatchSelector::new_fixed_size(test_case.batch_id),
mutated_report_shares,
);
let response = put_aggregation_job(
Expand Down Expand Up @@ -328,7 +336,7 @@ async fn aggregation_job_mutation_report_aggregations() {

let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
PartialBatchSelector::new_fixed_size(test_case.batch_id),
mutated_report_shares,
);
let response = put_aggregation_job(
Expand Down
Loading

0 comments on commit 96bae78

Please sign in to comment.