Skip to content

Commit

Permalink
Use AggregateStore::ReportCount to check if a batch exists instead of…
Browse files Browse the repository at this point in the history
… loading the entire aggregate share
  • Loading branch information
mendess committed Oct 17, 2024
1 parent bbc8d98 commit 20241b9
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use daphne::{
use daphne_service_utils::durable_requests::bindings::{
self, AggregateStoreMergeOptions, AggregateStoreMergeReq, AggregateStoreMergeResp,
};
use futures::{future::try_join_all, StreamExt, TryStreamExt};
use futures::{future::try_join_all, StreamExt, TryFutureExt, TryStreamExt};
use mappable_rc::Marc;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

Expand Down Expand Up @@ -316,15 +316,34 @@ impl DapAggregator for crate::App {

futures::stream::iter(agg_span)
.map(|bucket| async move {
Ok::<bool, DapError>(
!self
.durable()
.request(bindings::AggregateStore::Get, (version, task_id, &bucket))
let durable = self.durable();
let params = (version, task_id, &bucket);

let get_report_count = || {
durable
.request(bindings::AggregateStore::ReportCount, params)
.send::<u64>()
};

// TODO: remove this after the worker has this feature deployed.
let backwards_compat_get_report_count = || {
durable
.request(bindings::AggregateStore::Get, params)
.send::<DapAggregateShare>()
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get an agg share"))?
.empty(),
)
.map_ok(|r| r.report_count)
};

let count = get_report_count()
.or_else(|_| backwards_compat_get_report_count())
.await
.map_err(|e| {
fatal_error!(
err = ?e,
params = ?params,
"failed fetching report count of an agg share"
)
})?;
Ok(count > 0)
})
.buffer_unordered(usize::MAX)
.collect::<Vec<_>>()
Expand Down

0 comments on commit 20241b9

Please sign in to comment.