Skip to content

Commit

Permalink
Replace draft-ietf-ppm-dap-03 with draft-ietf-ppm-dap-04
Browse files Browse the repository at this point in the history
Add support for draft04, drop support for draft03, and continue support
for draft02. No wire-breaking changes are expected, but deployments will
impacted by the following breaking changes:

* In draft02, we now use a 16-byte value for the collection job ID
  rather than 32. This matches the type specified in draft04. This does
  not break interop, since the structure of the collect URI is not
  specified in draft02; however, compatibility with older versions of
  LeaderCollectionJobQueue.

* LeaderCollectionJobQueue: The schema for lookup keys for collect jobs
  to durable storage has been changed.

* PrgAea128 is deprecated as of prio version 0.12.0; we now use PrgSha3
  for internal purposes. As a result, collection job IDs computed in
  draft02 will have changed, as well as the mapping of reports to
  ReportsPending and ReportsProcessed shards has changed.

* The interfaces for ReportsPending and LeaderCollectionJobQueue have
  changed.

Co-authored-by: Christopher Patton <cpatton@cloudflare.com>
  • Loading branch information
bhalleycf and cjpatton committed Apr 7, 2023
1 parent 6f1aa96 commit d893117
Show file tree
Hide file tree
Showing 35 changed files with 3,308 additions and 1,787 deletions.
739 changes: 439 additions & 300 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion daphne/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ hpke-rs-rust-crypto = { version = "0.1.1"}
lazy_static = "1.4.0"
matchit = "0.7.0"
paste = "1.0.12"
prio = { version = "0.10.0", features = ["prio2"] }
prio = { version = "0.12.0", features = ["prio2"] }
prometheus = "0.13.3"
rand = "0.8.5"
ring = "0.16.20"
Expand Down
2 changes: 1 addition & 1 deletion daphne/dapf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ license = "BSD-3-Clause"
daphne = { path = ".." }
assert_matches = "1.5.0"
base64 = "0.21.0"
prio = "0.10.0"
prio = "0.12.0"
serde = { version = "1.0.154", features = ["derive"] }
serde_json = "1.0.94"
url = { version = "2.3.1", features = ["serde"] }
Expand Down
23 changes: 13 additions & 10 deletions daphne/dapf/src/bin/dapf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use clap::{Parser, Subcommand};
use daphne::{
constants,
hpke::HpkeReceiverConfig,
messages::{decode_base64url, BatchSelector, CollectReq, CollectResp, HpkeConfig, Id, Query},
messages::{BatchSelector, Collection, CollectionReq, HpkeConfig, Query, TaskId},
DapMeasurement, DapVersion, ProblemDetails, VdafConfig,
};
use prio::codec::{Decode, ParameterizedEncode};
use prio::codec::{Decode, ParameterizedDecode, ParameterizedEncode};
use reqwest::blocking::{Client, ClientBuilder};
use std::{
io::{stdin, Read},
Expand Down Expand Up @@ -151,8 +151,12 @@ async fn main() -> Result<()> {
serde_json::from_str(&buf).with_context(|| "failed to parse JSON from stdin")?;

// Construct collect request.
let collect_req = CollectReq {
task_id,
let collect_req = CollectionReq {
draft02_task_id: if version == DapVersion::Draft02 {
Some(task_id)
} else {
None
},
query,
agg_param: Vec::default(),
};
Expand Down Expand Up @@ -212,7 +216,7 @@ async fn main() -> Result<()> {
let receiver = cli.hpke_receiver.as_ref().ok_or_else(|| {
anyhow!("received response, but cannot decrypt without HPKE receiver config")
})?;
let collect_resp = CollectResp::get_decoded(&resp.bytes()?)?;
let collect_resp = Collection::get_decoded_with_param(&version, &resp.bytes()?)?;
let agg_res = vdaf
.consume_encrypted_agg_shares(
receiver,
Expand All @@ -230,15 +234,14 @@ async fn main() -> Result<()> {
}
}

fn parse_id(id_str: &str) -> Result<Id> {
let id_bytes = decode_base64url(id_str.as_bytes())
fn parse_id(id_str: &str) -> Result<TaskId> {
TaskId::try_from_base64url(id_str)
.ok_or_else(|| anyhow!("failed to decode ID"))
.with_context(|| "expected URL-safe, base64 string")?;
Ok(Id(id_bytes))
.with_context(|| "expected URL-safe, base64 string")
}

// TODO(cjpatton) Refactor integration tests to use this method.
fn get_hpke_config(http_client: &Client, task_id: &Id, base_url: &str) -> Result<HpkeConfig> {
fn get_hpke_config(http_client: &Client, task_id: &TaskId, base_url: &str) -> Result<HpkeConfig> {
let url = Url::parse(base_url)
.with_context(|| "failed to parse base URL")?
.join("hpke_config")?;
Expand Down
8 changes: 4 additions & 4 deletions daphne/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{
constants::sender_for_media_type,
messages::{constant_time_eq, Id},
messages::{constant_time_eq, TaskId},
DapError, DapRequest, DapSender,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -58,13 +58,13 @@ pub trait BearerTokenProvider<'a> {
/// Fetch the Leader's bearer token for the given task, if the task is recognized.
async fn get_leader_bearer_token_for(
&'a self,
task_id: &'a Id,
task_id: &'a TaskId,
) -> Result<Option<Self::WrappedBearerToken>, DapError>;

/// Fetch the Collector's bearer token for the given task, if the task is recognized.
async fn get_collector_bearer_token_for(
&'a self,
task_id: &'a Id,
task_id: &'a TaskId,
) -> Result<Option<Self::WrappedBearerToken>, DapError>;

/// Returns true if the given bearer token matches the leader token configured for the "taskprov" extension.
Expand All @@ -77,7 +77,7 @@ pub trait BearerTokenProvider<'a> {
/// media type.
async fn authorize_with_bearer_token(
&'a self,
task_id: &'a Id,
task_id: &'a TaskId,
media_type: &'static str,
) -> Result<Self::WrappedBearerToken, DapError> {
if matches!(sender_for_media_type(media_type), Some(DapSender::Leader)) {
Expand Down
48 changes: 38 additions & 10 deletions daphne/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,79 @@

//! Constants used in the DAP protocol.

use crate::DapSender;
use crate::{DapSender, DapVersion};

// Media types for HTTP requests.
//
// TODO spec: Decide if media type should be enforced. (We currently don't.) In any case, it may be
// useful to enforce this for testing purposes.
pub const DRAFT02_MEDIA_TYPE_HPKE_CONFIG: &str = "application/dap-hpke-config";
pub const DRAFT02_MEDIA_TYPE_AGG_INIT_REQ: &str = "application/dap-aggregate-initialize-req";
pub const DRAFT02_MEDIA_TYPE_AGG_INIT_RESP: &str = "application/dap-aggregate-initialize-resp";
pub const DRAFT02_MEDIA_TYPE_AGG_CONT_REQ: &str = "application/dap-aggregate-continue-req";
pub const DRAFT02_MEDIA_TYPE_AGG_CONT_RESP: &str = "application/dap-aggregate-continue-resp";
pub const DRAFT02_MEDIA_TYPE_AGG_SHARE_RESP: &str = "application/dap-aggregate-share-resp";
pub const DRAFT02_MEDIA_TYPE_COLLECT_RESP: &str = "application/dap-collect-resp";
pub const MEDIA_TYPE_HPKE_CONFIG_LIST: &str = "application/dap-hpke-config-list";
pub const MEDIA_TYPE_REPORT: &str = "application/dap-report";
pub const MEDIA_TYPE_AGG_INIT_REQ: &str = "application/dap-aggregate-initialize-req";
pub const MEDIA_TYPE_AGG_INIT_RESP: &str = "application/dap-aggregate-initialize-resp";
pub const MEDIA_TYPE_AGG_CONT_REQ: &str = "application/dap-aggregate-continue-req";
pub const MEDIA_TYPE_AGG_CONT_RESP: &str = "application/dap-aggregate-continue-resp";
pub const MEDIA_TYPE_AGG_INIT_REQ: &str = "application/dap-aggregation-job-init-req";
pub const MEDIA_TYPE_AGG_INIT_RESP: &str = "application/dap-aggregation-job-resp";
pub const MEDIA_TYPE_AGG_CONT_REQ: &str = "application/dap-aggregation-job-continue-req";
pub const MEDIA_TYPE_AGG_CONT_RESP: &str = "application/dap-aggregation-job-continue-resp";
pub const MEDIA_TYPE_AGG_SHARE_REQ: &str = "application/dap-aggregate-share-req";
pub const MEDIA_TYPE_AGG_SHARE_RESP: &str = "application/dap-aggregate-share-resp";
pub const MEDIA_TYPE_AGG_SHARE_RESP: &str = "application/dap-aggregate-share";
pub const MEDIA_TYPE_COLLECT_REQ: &str = "application/dap-collect-req";
pub const MEDIA_TYPE_COLLECT_RESP: &str = "application/dap-collect-resp";
pub const MEDIA_TYPE_COLLECT_RESP: &str = "application/dap-collection";

/// Check if the provided value for the HTTP Content-Type is valid media type for DAP. If so, then
/// return a static reference to the media type.
pub fn media_type_for(content_type: &str) -> Option<&'static str> {
match content_type {
DRAFT02_MEDIA_TYPE_HPKE_CONFIG => Some(DRAFT02_MEDIA_TYPE_HPKE_CONFIG),
MEDIA_TYPE_REPORT => Some(MEDIA_TYPE_REPORT),
DRAFT02_MEDIA_TYPE_AGG_INIT_REQ => Some(DRAFT02_MEDIA_TYPE_AGG_INIT_REQ),
MEDIA_TYPE_AGG_INIT_REQ => Some(MEDIA_TYPE_AGG_INIT_REQ),
DRAFT02_MEDIA_TYPE_AGG_INIT_RESP => Some(DRAFT02_MEDIA_TYPE_AGG_INIT_RESP),
MEDIA_TYPE_AGG_INIT_RESP => Some(MEDIA_TYPE_AGG_INIT_RESP),
DRAFT02_MEDIA_TYPE_AGG_CONT_REQ => Some(DRAFT02_MEDIA_TYPE_AGG_CONT_REQ),
MEDIA_TYPE_AGG_CONT_REQ => Some(MEDIA_TYPE_AGG_CONT_REQ),
DRAFT02_MEDIA_TYPE_AGG_CONT_RESP => Some(DRAFT02_MEDIA_TYPE_AGG_CONT_RESP),
MEDIA_TYPE_AGG_CONT_RESP => Some(MEDIA_TYPE_AGG_CONT_RESP),
MEDIA_TYPE_AGG_SHARE_REQ => Some(MEDIA_TYPE_AGG_SHARE_REQ),
DRAFT02_MEDIA_TYPE_AGG_SHARE_RESP => Some(DRAFT02_MEDIA_TYPE_AGG_SHARE_RESP),
MEDIA_TYPE_AGG_SHARE_RESP => Some(MEDIA_TYPE_AGG_SHARE_RESP),
MEDIA_TYPE_COLLECT_REQ => Some(MEDIA_TYPE_COLLECT_REQ),
DRAFT02_MEDIA_TYPE_COLLECT_RESP => Some(DRAFT02_MEDIA_TYPE_COLLECT_RESP),
MEDIA_TYPE_COLLECT_RESP => Some(MEDIA_TYPE_COLLECT_RESP),
_ => None,
}
}

/// draft02 compatibility: Substitute the content type with the corresponding media type for the
/// older version of the protocol if necessary.
pub fn versioned_media_type_for(version: &DapVersion, content_type: &str) -> Option<&'static str> {
match (version, content_type) {
(DapVersion::Draft02, MEDIA_TYPE_AGG_INIT_REQ) => Some(DRAFT02_MEDIA_TYPE_AGG_INIT_REQ),
(DapVersion::Draft02, MEDIA_TYPE_AGG_INIT_RESP) => Some(DRAFT02_MEDIA_TYPE_AGG_INIT_RESP),
(DapVersion::Draft02, MEDIA_TYPE_AGG_CONT_REQ) => Some(DRAFT02_MEDIA_TYPE_AGG_CONT_REQ),
(DapVersion::Draft02, MEDIA_TYPE_AGG_CONT_RESP) => Some(DRAFT02_MEDIA_TYPE_AGG_CONT_RESP),
(DapVersion::Draft02, MEDIA_TYPE_AGG_SHARE_RESP) => Some(DRAFT02_MEDIA_TYPE_AGG_SHARE_RESP),
(DapVersion::Draft02, MEDIA_TYPE_COLLECT_RESP) => Some(DRAFT02_MEDIA_TYPE_COLLECT_RESP),
_ => media_type_for(content_type),
}
}

/// Return the sender that would send a message with the given media type (or none if the sender
/// can't be determined).
pub fn sender_for_media_type(media_type: &'static str) -> Option<DapSender> {
match media_type {
DRAFT02_MEDIA_TYPE_HPKE_CONFIG | MEDIA_TYPE_REPORT => Some(DapSender::Client),
MEDIA_TYPE_COLLECT_REQ => Some(DapSender::Collector),
MEDIA_TYPE_AGG_INIT_REQ | MEDIA_TYPE_AGG_CONT_REQ | MEDIA_TYPE_AGG_SHARE_REQ => {
Some(DapSender::Leader)
}
MEDIA_TYPE_AGG_INIT_REQ
| MEDIA_TYPE_AGG_CONT_REQ
| MEDIA_TYPE_AGG_SHARE_REQ
| DRAFT02_MEDIA_TYPE_AGG_INIT_REQ
| DRAFT02_MEDIA_TYPE_AGG_CONT_REQ => Some(DapSender::Leader),
_ => None,
}
}
14 changes: 7 additions & 7 deletions daphne/src/hpke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hpke_rs_rust_crypto::HpkeRustCrypto as ImplHpkeCrypto;
use crate::{
messages::{
decode_u16_bytes, encode_u16_bytes, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeKdfId,
HpkeKemId, Id, TransitionFailure,
HpkeKemId, TaskId, TransitionFailure,
},
DapError, DapVersion,
};
Expand Down Expand Up @@ -99,16 +99,16 @@ pub trait HpkeDecrypter<'a> {
async fn get_hpke_config_for(
&'a self,
version: DapVersion,
task_id: Option<&Id>,
task_id: Option<&TaskId>,
) -> Result<Self::WrappedHpkeConfig, DapError>;

/// Returns `true` if a ciphertext with the HPKE config ID can be consumed in the current task.
async fn can_hpke_decrypt(&self, task_id: &Id, config_id: u8) -> Result<bool, DapError>;
async fn can_hpke_decrypt(&self, task_id: &TaskId, config_id: u8) -> Result<bool, DapError>;

/// Decrypt the given HPKE ciphertext using the given info and AAD string.
async fn hpke_decrypt(
&self,
task_id: &Id,
task_id: &TaskId,
info: &[u8],
aad: &[u8],
ciphertext: &HpkeCiphertext,
Expand Down Expand Up @@ -208,18 +208,18 @@ impl<'a> HpkeDecrypter<'a> for HpkeReceiverConfig {
async fn get_hpke_config_for(
&'a self,
_version: DapVersion,
_task_id: Option<&Id>,
_task_id: Option<&TaskId>,
) -> Result<Self::WrappedHpkeConfig, DapError> {
unreachable!("not implemented");
}

async fn can_hpke_decrypt(&self, _task_id: &Id, config_id: u8) -> Result<bool, DapError> {
async fn can_hpke_decrypt(&self, _task_id: &TaskId, config_id: u8) -> Result<bool, DapError> {
Ok(config_id == self.config.id)
}

async fn hpke_decrypt(
&self,
_task_id: &Id,
_task_id: &TaskId,
info: &[u8],
aad: &[u8],
ciphertext: &HpkeCiphertext,
Expand Down
Loading

0 comments on commit d893117

Please sign in to comment.