Skip to content

Commit

Permalink
Enable taskprov in draft07
Browse files Browse the repository at this point in the history
Enable support for taskprov in the latest draft. The main technical
hurdle is to ensure the Leader can advertise the taskprov config to the
Helper in an HTTP Header. The simplest solution is to add the
information to `DapTaskConfig` that is needed in order to reconstruct
the serialized taskprov config, in particular the "task_info" field.

To do this, we add a new `method` field to the `DapTaskConfig` struct
that conveys the method by which the task was configured. The method has
type `enum DapTaskConfigMethod`, of which there are two variants. The
first is `Taskprov` and includes the "task_info". The second is
`Unknown` and is a catch-all for existing, manually configured tasks.

Accordingly, we deprecate the `taskprov` field of the `DapTaskConfig`
struct. We've kept the field in the struct for backwards compatibility:
there are some situations for which the indication is sufficient; in
situations where the "task_info" field is needed, we must abort.

This change also includes a number of related, but minor changes:

* Have `resolve_advertised_task_config()` return a `DapAbort` rather
  than a `DapError` (an error in this flow always leads to an abort and
  is never an internal error).

* Opt-out of the task unless (1) the "none" variant for "DpConfig" is
  indicated and (2) the indicated max batch query count is 1. Other
  values for these parameters are not currently supported by Daphne.

* Introduce a type `DapTaskParameters` that encapsulates the parameters
  used to configure a task and implement a method for generating a task
  config using the taskprov method. This change is intended to simplify
  the test code, but should be independently useful.
  • Loading branch information
cjpatton committed Nov 23, 2023
1 parent 6cd6ab7 commit a8cc3fe
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 165 deletions.
154 changes: 145 additions & 9 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ use crate::{
use constants::DapMediaType;
pub use error::DapError;
use hpke::{HpkeConfig, HpkeKemId};
use messages::encode_base64url;
use prio::{
codec::{Decode, Encode, ParameterizedDecode},
codec::{Decode, Encode, ParameterizedDecode, ParameterizedEncode},
vdaf::Aggregatable as AggregatableTrait,
};
use rand::prelude::*;
Expand All @@ -95,13 +96,14 @@ use url::Url;
use vdaf::{EarlyReportState, EarlyReportStateConsumed};

/// DAP version used for a task.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
pub enum DapVersion {
#[serde(rename = "v02")]
Draft02,

#[serde(rename = "v07")]
#[default]
Draft07,
}

Expand Down Expand Up @@ -402,10 +404,23 @@ impl Extend<(DapBatchBucket, (ReportId, Time))> for DapAggregateSpan<()> {
}
}

/// Per-task DAP parameters.
#[derive(Clone, Deserialize, Serialize)]
/// Method for configuring tasks.
#[derive(Clone, Default, Deserialize, Serialize)]
#[cfg_attr(test, derive(PartialEq, Debug))]
pub struct DapTaskConfig {
pub enum DapTaskConfigMethod {
/// draft-wang-ppm-dap-taskprov-06
Taskprov {
/// `TaskConfig.task_info`
info: Vec<u8>,
},

#[default]
Unknown,
}

/// Base parameters used to configure a DAP task.
#[derive(Clone, Deserialize, Serialize)]
pub struct DapTaskParameters {
/// The protocol version (i.e., which draft).
pub version: DapVersion,

Expand All @@ -419,8 +434,8 @@ pub struct DapTaskConfig {
/// constrain the batch interval of time=interval queries.
pub time_precision: Duration,

/// The time at which the task expires.
pub expiration: Time,
/// The amount of time before teh task should expire.
pub lifetime: Duration,

/// The smallest batch permitted for this task.
pub min_batch_size: u64,
Expand All @@ -430,16 +445,113 @@ pub struct DapTaskConfig {

/// The VDAF configuration for this task.
pub vdaf: VdafConfig,
}

impl DapTaskParameters {
/// Construct a new task config using the taskprov extension. Return the task ID, the taskprov
/// advertisement (if applicable), and the payload of the report extension.
pub fn to_config_with_taskprov(
&self,
task_info: Vec<u8>,
now: Time,
vdaf_verify_key_init: &[u8; 32],
collector_hpke_config: &HpkeConfig,
) -> Result<(DapTaskConfig, TaskId, Option<String>, Vec<u8>), DapError> {
let taskprov_config = messages::taskprov::TaskConfig {
task_info,
leader_url: messages::taskprov::UrlBytes {
bytes: self.leader_url.to_string().into_bytes(),
},
helper_url: messages::taskprov::UrlBytes {
bytes: self.helper_url.to_string().into_bytes(),
},
query_config: messages::taskprov::QueryConfig {
time_precision: self.time_precision,
max_batch_query_count: 1,
min_batch_size: self.min_batch_size.try_into().unwrap(),
var: (&self.query).try_into()?,
},
task_expiration: now + 86400 * 14, // expires in two weeks
vdaf_config: messages::taskprov::VdafConfig {
dp_config: messages::taskprov::DpConfig::None,
var: messages::taskprov::VdafTypeVar::Prio2 { dimension: 10 },
},
};

let encoded_taskprov_config = taskprov_config.get_encoded_with_param(&self.version);
let task_id = taskprov::compute_task_id(self.version, &encoded_taskprov_config);

// Compute the DAP task config.
let task_config = DapTaskConfig::try_from_taskprov(
self.version,
&task_id,
taskprov_config,
vdaf_verify_key_init,
collector_hpke_config,
)
.unwrap();

let (taskprov_advertisement, taskprov_report_extension_payload) = match self.version {
DapVersion::Draft07 => (Some(encode_base64url(&encoded_taskprov_config)), Vec::new()),
// draft02 compatibility: The taskprov config is advertised in an HTTP header in
// the latest draft. In draft02, it is carried by a report extension.
DapVersion::Draft02 => (None, encoded_taskprov_config),
};

Ok((
task_config,
task_id,
taskprov_advertisement,
taskprov_report_extension_payload,
))
}
}

#[cfg(any(test, feature = "test-utils"))]
impl Default for DapTaskParameters {
fn default() -> Self {
Self {
version: Default::default(),
leader_url: "https://leader.example.com/".parse().unwrap(),
helper_url: "https://helper.example.com/".parse().unwrap(),
time_precision: 3600, // 1 hour
lifetime: 86400 * 14, // two weeks
min_batch_size: 10,
query: DapQueryConfig::TimeInterval,
vdaf: VdafConfig::Prio2 { dimension: 10 },
}
}
}

/// Per-task DAP parameters.
#[derive(Clone, Deserialize, Serialize)]
#[cfg_attr(test, derive(PartialEq, Debug))]
pub struct DapTaskConfig {
/// Same as [`DapTaskParameters`].
pub version: DapVersion,
pub leader_url: Url,
pub helper_url: Url,
pub time_precision: Duration,
pub min_batch_size: u64,
pub query: DapQueryConfig,
pub vdaf: VdafConfig,

/// The time at which the task expires.
pub expiration: Time,

/// VDAF verification key shared by the Aggregators. Used to aggregate reports.
pub vdaf_verify_key: VdafVerifyKey,

/// The Collector's HPKE configuration for this task.
pub collector_hpke_config: HpkeConfig,

/// If true, then the taskprov extension was used to configure this task.
/// Deprecated. Indicates that the task was configured via draft-wang-ppm-taskprov.
#[serde(default, rename = "taskprov")]
pub deprecated_taskprov: bool,

/// Method by which the task was configured.
#[serde(default)]
pub taskprov: bool,
pub method: DapTaskConfigMethod,
}

#[cfg(any(test, feature = "test-utils"))]
Expand Down Expand Up @@ -574,6 +686,30 @@ impl DapTaskConfig {

Ok(report_count >= self.min_batch_size)
}

/// Leader: Resolve taskprov advertisement to send in a request to the Helper.
pub(crate) fn resolve_taskprove_advertisement(&self) -> Result<Option<String>, DapError> {
if let DapTaskConfigMethod::Taskprov { info: _ } = &self.method {
let encoded_taskprov_config = messages::taskprov::TaskConfig::try_from(self)?
.get_encoded_with_param(&self.version);
Ok(Some(encode_base64url(encoded_taskprov_config)))
} else if self.deprecated_taskprov && self.version != DapVersion::Draft02 {
// The task config indicates that the configuration method was taskprov, but we don't
// have enough information to construct the advertisement. This is not a problem for
// draft02, however, because the task config was encoded by the report extensions in
// that version.
Err(fatal_error!(
err = "not enough information to resolve taskprov advertisement"
))
} else {
Ok(None)
}
}

/// Returns true if the task configuration method is taskprov.
pub fn method_is_taskprov(&self) -> bool {
matches!(self.method, DapTaskConfigMethod::Taskprov { .. }) || self.deprecated_taskprov
}
}

impl AsRef<DapTaskConfig> for DapTaskConfig {
Expand Down
11 changes: 10 additions & 1 deletion daphne/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct LeaderHttpRequestOptions<'p> {
resource: DapResource,
req_data: Vec<u8>,
method: LeaderHttpRequestMethod,
taskprov: Option<String>,
}

enum LeaderHttpRequestMethod {
Expand All @@ -49,6 +50,7 @@ async fn leader_send_http_request<S>(
resource,
req_data,
method,
taskprov,
} = opts;

let url = task_config
Expand All @@ -67,7 +69,7 @@ async fn leader_send_http_request<S>(
.await?,
),
payload: req_data,
taskprov: None,
taskprov,
};

let resp = match method {
Expand Down Expand Up @@ -312,6 +314,8 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
) -> Result<u64, DapAbort> {
let metrics = self.metrics();

let taskprov = task_config.resolve_taskprove_advertisement()?;

// Prepare AggregationJobInitReq.
let agg_job_id = MetaAggregationJobId::gen_for_version(task_config.version);
let transition = task_config
Expand Down Expand Up @@ -368,6 +372,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
resource: agg_job_id.for_request_path(),
req_data: agg_job_init_req.get_encoded_with_param(&task_config.version),
method,
taskprov: taskprov.clone(),
},
)
.await?;
Expand Down Expand Up @@ -399,6 +404,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
resource: agg_job_id.for_request_path(),
req_data: agg_job_cont_req.get_encoded_with_param(&task_config.version),
method: LeaderHttpRequestMethod::Post,
taskprov,
},
)
.await?;
Expand Down Expand Up @@ -467,6 +473,8 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
let batch_selector = BatchSelector::try_from(collect_req.query.clone())?;
let leader_agg_share = self.get_agg_share(task_id, &batch_selector).await?;

let taskprov = task_config.resolve_taskprove_advertisement()?;

// Check the batch size. If not not ready, then return early.
//
// TODO Consider logging this error, as it should never happen.
Expand Down Expand Up @@ -513,6 +521,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
resource: DapResource::Undefined,
req_data: agg_share_req.get_encoded_with_param(&task_config.version),
method: LeaderHttpRequestMethod::Post,
taskprov,
},
)
.await?;
Expand Down
Loading

0 comments on commit a8cc3fe

Please sign in to comment.