From 290a5fa2555bca39a0e8b55fa9346e6e6f0f7d90 Mon Sep 17 00:00:00 2001 From: Christopher Patton Date: Thu, 7 Sep 2023 10:46:05 -0700 Subject: [PATCH] daphne_worker: Stabilize the PRF for internal use We currently use cSHAKE128 for deriving ReportsPending and ReportsProcessed shards and for computing the collection job ID for DAP-02. This comes from `prio::vdaf::prg::PrgSha3`. This API is not stable: the algorithm it uses depends on the VDAF version used, and we have not settled on which XOF to use for VDAF. (In VDAF-07 we have replaced cSHAKE128 with SHAKE128.) To avoid breaking changes for our internal use cases, use `ring::hmac::HMAC_SHA256` instead. The recommended key size for this PRF is 32 bytes. Accordingly, rotate the keys in daphne_worker_test/wrangler.toml. --- Cargo.lock | 1 + daphne_worker/Cargo.toml | 1 + daphne_worker/src/config.rs | 46 +++++++++---------- .../src/durable/leader_col_job_queue.rs | 31 +++++++------ daphne_worker_test/wrangler.toml | 6 +-- 5 files changed, 45 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c64219826..a84a1ec58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,6 +729,7 @@ dependencies = [ "prometheus", "rand", "reqwest-wasm", + "ring", "serde", "serde-wasm-bindgen", "serde_json", diff --git a/daphne_worker/Cargo.toml b/daphne_worker/Cargo.toml index 12918a2b2..fefe45cd8 100644 --- a/daphne_worker/Cargo.toml +++ b/daphne_worker/Cargo.toml @@ -30,6 +30,7 @@ prio.workspace = true prometheus.workspace = true rand.workspace = true reqwest-wasm = { version = "0.11.16", features = ["json"] } +ring.workspace = true serde-wasm-bindgen = "0.5.0" serde.workspace = true serde_json.workspace = true diff --git a/daphne_worker/src/config.rs b/daphne_worker/src/config.rs index 060127bdd..e7ceb6b4e 100644 --- a/daphne_worker/src/config.rs +++ b/daphne_worker/src/config.rs @@ -33,10 +33,7 @@ use daphne::{ }; use futures::TryFutureExt; use matchit::Router; -use prio::{ - codec::Decode, - vdaf::prg::{Prg, PrgSha3, Seed, SeedStream}, -}; +use prio::codec::Decode; use prometheus::{Encoder, Registry}; use serde::{Deserialize, Serialize}; use std::{ @@ -97,11 +94,11 @@ pub(crate) struct DaphneWorkerConfig { pub(crate) deployment: DaphneWorkerDeployment, /// Leader: Key used to derive collection job IDs. This field is not configured by the Helper. - pub(crate) collection_job_id_key: Option>, + pub(crate) collection_job_id_key: Option<[u8; 32]>, /// Sharding key, used to compute the ReportsPending or ReportsProcessed shard to map a report /// to (based on the report ID). - report_shard_key: Seed<16>, + report_shard_key: [u8; 32], /// Shard count, the number of report storage shards. This should be a power of 2. report_shard_count: u64, @@ -134,6 +131,17 @@ pub(crate) struct DaphneWorkerConfig { impl DaphneWorkerConfig { pub(crate) fn from_worker_env(env: &Env) -> Result { + let load_key = |name| { + let key = env + .secret(name) + .map_err(|e| format!("failed to load {name}: {e}"))? + .to_string(); + let key = hex::decode(key) + .map_err(|e| format!("failed to load {name}: error while parsing hex: {e}"))?; + key.try_into() + .map_err(|_| format!("failed to load {name}: unexpected length")) + }; + let is_leader = match env.var("DAP_AGGREGATOR_ROLE")?.to_string().as_str() { "leader" => true, "helper" => false, @@ -161,24 +169,13 @@ impl DaphneWorkerConfig { None }; - const DAP_COLLECTION_JOB_ID_KEY: &str = "DAP_COLLECTION_JOB_ID_KEY"; let collection_job_id_key = if is_leader { - let collection_job_id_key_hex = env - .secret(DAP_COLLECTION_JOB_ID_KEY) - .map_err(|e| format!("failed to load {DAP_COLLECTION_JOB_ID_KEY}: {e}"))? - .to_string(); - let collection_job_id_key = - Seed::get_decoded(&hex::decode(collection_job_id_key_hex).map_err(int_err)?) - .map_err(int_err)?; - Some(collection_job_id_key) + Some(load_key("DAP_COLLECTION_JOB_ID_KEY")?) } else { None }; - let report_shard_key = Seed::get_decoded( - &hex::decode(env.secret("DAP_REPORT_SHARD_KEY")?.to_string()).map_err(int_err)?, - ) - .map_err(int_err)?; + let report_shard_key = load_key("DAP_REPORT_SHARD_KEY")?; let report_shard_count: u64 = env .var("DAP_REPORT_SHARD_COUNT")? @@ -348,10 +345,13 @@ impl DaphneWorkerConfig { report_id: &ReportId, report_time: Time, ) -> String { - let mut shard_seed = [0; 8]; - PrgSha3::seed_stream(&self.report_shard_key, b"report shard", report_id.as_ref()) - .fill(&mut shard_seed); - let shard = u64::from_be_bytes(shard_seed) % self.report_shard_count; + let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, &self.report_shard_key); + let tag = ring::hmac::sign(&key, report_id.as_ref()); + let shard = u64::from_be_bytes( + tag.as_ref()[..std::mem::size_of::()] + .try_into() + .unwrap(), + ) % self.report_shard_count; let epoch = report_time - (report_time % self.global.report_storage_epoch_duration); durable_name_report_store(&task_config.version, task_id_hex, epoch, shard) } diff --git a/daphne_worker/src/durable/leader_col_job_queue.rs b/daphne_worker/src/durable/leader_col_job_queue.rs index c8ba6ac04..1ba600473 100644 --- a/daphne_worker/src/durable/leader_col_job_queue.rs +++ b/daphne_worker/src/durable/leader_col_job_queue.rs @@ -15,10 +15,7 @@ use daphne::{ messages::{Collection, CollectionJobId, CollectionReq, TaskId}, DapCollectJob, DapVersion, }; -use prio::{ - codec::ParameterizedEncode, - vdaf::prg::{Prg, PrgSha3, SeedStream}, -}; +use prio::codec::ParameterizedEncode; use serde::{Deserialize, Serialize}; use tracing::Instrument; use worker::*; @@ -122,16 +119,22 @@ impl LeaderCollectionJobQueue { // enumerating collect URIs. Second, it provides a stable map from requests // to URIs, which prevents us from processing the same collect request more // than once. - let collect_req_bytes = collect_queue_req - .collect_req - .get_encoded_with_param(&DapVersion::Draft02); - let mut collection_job_id_bytes = [0; 16]; - PrgSha3::seed_stream( - self.config.collection_job_id_key.as_ref().unwrap(), - b"collection job id", - &collect_req_bytes, - ) - .fill(&mut collection_job_id_bytes); + let collection_job_id_bytes = { + let collect_req_bytes = collect_queue_req + .collect_req + .get_encoded_with_param(&DapVersion::Draft02); + + let mut buf = [0; 16]; + let key = ring::hmac::Key::new( + ring::hmac::HMAC_SHA256, + self.config.collection_job_id_key.as_ref().ok_or_else(|| { + Error::RustError("missing collection job ID key".into()) + })?, + ); + let tag = ring::hmac::sign(&key, &collect_req_bytes); + buf.copy_from_slice(&tag.as_ref()[..16]); + buf + }; CollectionJobId(collection_job_id_bytes) }; diff --git a/daphne_worker_test/wrangler.toml b/daphne_worker_test/wrangler.toml index 4d334f89a..1de58c420 100644 --- a/daphne_worker_test/wrangler.toml +++ b/daphne_worker_test/wrangler.toml @@ -23,8 +23,8 @@ fallthrough = false DAP_AGGREGATOR_ROLE = "leader" DAP_BASE_URL = "http://127.0.0.1:8787/" DAP_ISSUE73_DISABLE_AGG_JOB_QUEUE_GARBAGE_COLLECTION = "true" -DAP_COLLECTION_JOB_ID_KEY = "b416a85d280591d6da14e5b75a7d6e31" # SECRET -DAP_REPORT_SHARD_KEY = "61cd9685547370cfea76c2eb8d156ad9" # SECRET +DAP_COLLECTION_JOB_ID_KEY = "12da249b0e3b1a9b5936d6ff6cdd2fa09964e2e11f5450e04eef11dc5e64daf1" # SECRET +DAP_REPORT_SHARD_KEY = "b6e5736df46d9dd7dc087cad1cd4f03c1ff399160a8eb30c5e7841aa696e737f" # SECRET DAP_REPORT_SHARD_COUNT = "2" DAP_GLOBAL_CONFIG = """{ "report_storage_epoch_duration": 604800, @@ -103,7 +103,7 @@ DAP_ADMIN_BEARER_TOKEN = "administrator bearer token" # SECRET DAP_AGGREGATOR_ROLE = "helper" DAP_BASE_URL = "http://127.0.0.1:8788/" DAP_ISSUE73_DISABLE_AGG_JOB_QUEUE_GARBAGE_COLLECTION = "true" -DAP_REPORT_SHARD_KEY = "f79c352056982bae1737e34bdac24d63" # SECRET +DAP_REPORT_SHARD_KEY = "896eccc5e89474dd656d490408226fdd4ce5b34d2d1349147c6fcbb1741ddb2d" # SECRET DAP_REPORT_SHARD_COUNT = "2" DAP_HELPER_STATE_STORE_GARBAGE_COLLECT_AFTER_SECS = "10" DAP_GLOBAL_CONFIG = """{