Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daphne_worker: Stabilize the PRF for internal use #390

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daphne_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ prio.workspace = true
prometheus.workspace = true
rand.workspace = true
reqwest-wasm = { version = "0.11.16", features = ["json"] }
ring.workspace = true
serde-wasm-bindgen = "0.5.0"
serde.workspace = true
serde_json.workspace = true
Expand Down
46 changes: 23 additions & 23 deletions daphne_worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use daphne::{
};
use futures::TryFutureExt;
use matchit::Router;
use prio::{
codec::Decode,
vdaf::prg::{Prg, PrgSha3, Seed, SeedStream},
};
use prio::codec::Decode;
use prometheus::{Encoder, Registry};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -97,11 +94,11 @@ pub(crate) struct DaphneWorkerConfig {
pub(crate) deployment: DaphneWorkerDeployment,

/// Leader: Key used to derive collection job IDs. This field is not configured by the Helper.
pub(crate) collection_job_id_key: Option<Seed<16>>,
pub(crate) collection_job_id_key: Option<[u8; 32]>,

/// Sharding key, used to compute the ReportsPending or ReportsProcessed shard to map a report
/// to (based on the report ID).
report_shard_key: Seed<16>,
report_shard_key: [u8; 32],

/// Shard count, the number of report storage shards. This should be a power of 2.
report_shard_count: u64,
Expand Down Expand Up @@ -134,6 +131,17 @@ pub(crate) struct DaphneWorkerConfig {

impl DaphneWorkerConfig {
pub(crate) fn from_worker_env(env: &Env) -> Result<Self> {
let load_key = |name| {
let key = env
.secret(name)
.map_err(|e| format!("failed to load {name}: {e}"))?
.to_string();
let key = hex::decode(key)
.map_err(|e| format!("failed to load {name}: error while parsing hex: {e}"))?;
key.try_into()
.map_err(|_| format!("failed to load {name}: unexpected length"))
};

let is_leader = match env.var("DAP_AGGREGATOR_ROLE")?.to_string().as_str() {
"leader" => true,
"helper" => false,
Expand Down Expand Up @@ -161,24 +169,13 @@ impl DaphneWorkerConfig {
None
};

const DAP_COLLECTION_JOB_ID_KEY: &str = "DAP_COLLECTION_JOB_ID_KEY";
let collection_job_id_key = if is_leader {
let collection_job_id_key_hex = env
.secret(DAP_COLLECTION_JOB_ID_KEY)
.map_err(|e| format!("failed to load {DAP_COLLECTION_JOB_ID_KEY}: {e}"))?
.to_string();
let collection_job_id_key =
Seed::get_decoded(&hex::decode(collection_job_id_key_hex).map_err(int_err)?)
.map_err(int_err)?;
Some(collection_job_id_key)
Some(load_key("DAP_COLLECTION_JOB_ID_KEY")?)
} else {
None
};

let report_shard_key = Seed::get_decoded(
&hex::decode(env.secret("DAP_REPORT_SHARD_KEY")?.to_string()).map_err(int_err)?,
)
.map_err(int_err)?;
let report_shard_key = load_key("DAP_REPORT_SHARD_KEY")?;

let report_shard_count: u64 = env
.var("DAP_REPORT_SHARD_COUNT")?
Expand Down Expand Up @@ -348,10 +345,13 @@ impl DaphneWorkerConfig {
report_id: &ReportId,
report_time: Time,
) -> String {
let mut shard_seed = [0; 8];
PrgSha3::seed_stream(&self.report_shard_key, b"report shard", report_id.as_ref())
.fill(&mut shard_seed);
let shard = u64::from_be_bytes(shard_seed) % self.report_shard_count;
let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, &self.report_shard_key);
let tag = ring::hmac::sign(&key, report_id.as_ref());
let shard = u64::from_be_bytes(
tag.as_ref()[..std::mem::size_of::<u64>()]
.try_into()
.unwrap(),
) % self.report_shard_count;
let epoch = report_time - (report_time % self.global.report_storage_epoch_duration);
durable_name_report_store(&task_config.version, task_id_hex, epoch, shard)
}
Expand Down
31 changes: 17 additions & 14 deletions daphne_worker/src/durable/leader_col_job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use daphne::{
messages::{Collection, CollectionJobId, CollectionReq, TaskId},
DapCollectJob, DapVersion,
};
use prio::{
codec::ParameterizedEncode,
vdaf::prg::{Prg, PrgSha3, SeedStream},
};
use prio::codec::ParameterizedEncode;
use serde::{Deserialize, Serialize};
use tracing::Instrument;
use worker::*;
Expand Down Expand Up @@ -122,16 +119,22 @@ impl LeaderCollectionJobQueue {
// enumerating collect URIs. Second, it provides a stable map from requests
// to URIs, which prevents us from processing the same collect request more
// than once.
let collect_req_bytes = collect_queue_req
.collect_req
.get_encoded_with_param(&DapVersion::Draft02);
let mut collection_job_id_bytes = [0; 16];
PrgSha3::seed_stream(
self.config.collection_job_id_key.as_ref().unwrap(),
b"collection job id",
&collect_req_bytes,
)
.fill(&mut collection_job_id_bytes);
let collection_job_id_bytes = {
let collect_req_bytes = collect_queue_req
.collect_req
.get_encoded_with_param(&DapVersion::Draft02);

let mut buf = [0; 16];
let key = ring::hmac::Key::new(
ring::hmac::HMAC_SHA256,
self.config.collection_job_id_key.as_ref().ok_or_else(|| {
Error::RustError("missing collection job ID key".into())
})?,
);
let tag = ring::hmac::sign(&key, &collect_req_bytes);
buf.copy_from_slice(&tag.as_ref()[..16]);
buf
};
CollectionJobId(collection_job_id_bytes)
};

Expand Down
6 changes: 3 additions & 3 deletions daphne_worker_test/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ fallthrough = false
DAP_AGGREGATOR_ROLE = "leader"
DAP_BASE_URL = "http://127.0.0.1:8787/"
DAP_ISSUE73_DISABLE_AGG_JOB_QUEUE_GARBAGE_COLLECTION = "true"
DAP_COLLECTION_JOB_ID_KEY = "b416a85d280591d6da14e5b75a7d6e31" # SECRET
DAP_REPORT_SHARD_KEY = "61cd9685547370cfea76c2eb8d156ad9" # SECRET
DAP_COLLECTION_JOB_ID_KEY = "12da249b0e3b1a9b5936d6ff6cdd2fa09964e2e11f5450e04eef11dc5e64daf1" # SECRET
DAP_REPORT_SHARD_KEY = "b6e5736df46d9dd7dc087cad1cd4f03c1ff399160a8eb30c5e7841aa696e737f" # SECRET
DAP_REPORT_SHARD_COUNT = "2"
DAP_GLOBAL_CONFIG = """{
"report_storage_epoch_duration": 604800,
Expand Down Expand Up @@ -103,7 +103,7 @@ DAP_ADMIN_BEARER_TOKEN = "administrator bearer token" # SECRET
DAP_AGGREGATOR_ROLE = "helper"
DAP_BASE_URL = "http://127.0.0.1:8788/"
DAP_ISSUE73_DISABLE_AGG_JOB_QUEUE_GARBAGE_COLLECTION = "true"
DAP_REPORT_SHARD_KEY = "f79c352056982bae1737e34bdac24d63" # SECRET
DAP_REPORT_SHARD_KEY = "896eccc5e89474dd656d490408226fdd4ce5b34d2d1349147c6fcbb1741ddb2d" # SECRET
DAP_REPORT_SHARD_COUNT = "2"
DAP_HELPER_STATE_STORE_GARBAGE_COLLECT_AFTER_SECS = "10"
DAP_GLOBAL_CONFIG = """{
Expand Down
Loading