Skip to content

Commit

Permalink
Merge pull request #176 from divviup/timg/janus-main-function
Browse files Browse the repository at this point in the history
Introduce `janus_main`
  • Loading branch information
tgeoghegan authored May 19, 2022
2 parents d8e6041 + 4571d84 commit 4e909d7
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 313 deletions.
107 changes: 33 additions & 74 deletions janus_server/src/bin/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,72 @@
use anyhow::Context;
use anyhow::Result;
use futures::future::try_join_all;
use itertools::Itertools;
use janus::message::{Nonce, Role, TaskId, Time};
use janus::message::{Nonce, Time};
use janus::message::{Role, TaskId};
use janus::time::{Clock, RealClock};
use janus_server::binary_utils::datastore;
use janus_server::binary_utils::{janus_main, BinaryOptions, CommonBinaryOptions};
use janus_server::config::AggregationJobCreatorConfig;
use janus_server::datastore::models::{
AggregationJob, AggregationJobState, ReportAggregation, ReportAggregationState,
};
use janus_server::datastore::{self, Datastore};
use janus_server::message::AggregationJobId;
use janus_server::task::Task;
use janus_server::trace::install_trace_subscriber;
use prio::codec::Encode;
use prio::vdaf;
use prio::vdaf::prio3::{Prio3Aes128Count, Prio3Aes128Histogram, Prio3Aes128Sum};
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::fmt::Formatter;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, fmt::Debug, fs};
use structopt::StructOpt;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::time::{Instant, MissedTickBehavior};
use tokio::{select, time};
use tokio::select;
use tokio::{
sync::oneshot::{self, Receiver, Sender},
time::{self, Instant, MissedTickBehavior},
};
use tracing::{debug, error, info};

#[derive(StructOpt)]
#[derive(Debug, StructOpt)]
#[structopt(
name = "janus-aggregation-job-creator",
about = "Janus aggregation job creator",
rename_all = "kebab-case",
version = env!("CARGO_PKG_VERSION"),
)]
struct Options {
/// Path to configuration YAML.
#[structopt(
long,
env = "CONFIG_FILE",
parse(from_os_str),
takes_value = true,
required(true),
help = "path to configuration file"
)]
config_file: PathBuf,

/// Password for the PostgreSQL database connection. If specified, must not be specified in the
/// connection string.
#[structopt(long, env = "PGPASSWORD", help = "PostgreSQL password")]
database_password: Option<String>,

/// Datastore encryption keys.
#[structopt(
long,
env = "DATASTORE_KEYS",
takes_value = true,
use_delimiter = true,
required(true),
help = "datastore encryption keys, encoded in base64 then comma-separated"
)]
datastore_keys: Vec<String>,
#[structopt(flatten)]
common: CommonBinaryOptions,
}

impl Debug for Options {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Options")
.field("config_file", &self.config_file)
.finish()
impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Read arguments, read & parse config.
let options = Options::from_args();
let config: AggregationJobCreatorConfig = {
let config_content = fs::read_to_string(&options.config_file)
.with_context(|| format!("couldn't read config file {:?}", options.config_file))?;
serde_yaml::from_str(&config_content)
.with_context(|| format!("couldn't parse config file {:?}", options.config_file))?
};
install_trace_subscriber(&config.logging_config)
.context("couldn't install tracing subscriber")?;

info!(?options, ?config, "Starting aggregation job creator");

// Connect to database.
let datastore = datastore(
janus_main::<Options, _, _, _, _>(
RealClock::default(),
config.database,
options.database_password,
options.datastore_keys,
|clock, config: AggregationJobCreatorConfig, datastore| async move {
// Start creating aggregation jobs.
Arc::new(AggregationJobCreator {
datastore,
clock,
tasks_update_frequency: Duration::from_secs(config.tasks_update_frequency_secs),
aggregation_job_creation_interval: Duration::from_secs(
config.aggregation_job_creation_interval_secs,
),
min_aggregation_job_size: config.min_aggregation_job_size,
max_aggregation_job_size: config.max_aggregation_job_size,
})
.run()
.await;

Ok(())
},
)
.context("couldn't connect to database")?;

// Start creating aggregation jobs.
Arc::new(AggregationJobCreator {
datastore,
clock: RealClock::default(),
tasks_update_frequency: Duration::from_secs(config.tasks_update_frequency_secs),
aggregation_job_creation_interval: Duration::from_secs(
config.aggregation_job_creation_interval_secs,
),
min_aggregation_job_size: config.min_aggregation_job_size,
max_aggregation_job_size: config.max_aggregation_job_size,
})
.run()
.await
}

Expand Down
134 changes: 42 additions & 92 deletions janus_server/src/bin/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use janus::{
time::{Clock, RealClock},
};
use janus_server::{
binary_utils::datastore,
binary_utils::{janus_main, BinaryOptions, CommonBinaryOptions},
config::AggregationJobDriverConfig,
datastore::{
self,
Expand All @@ -21,7 +21,6 @@ use janus_server::{
TransitionError, TransitionTypeSpecificData,
},
task::{Task, VdafInstance},
trace::install_trace_subscriber,
};
use prio::{
codec::{Decode, Encode, ParameterizedDecode},
Expand All @@ -31,61 +30,26 @@ use prio::{
PrepareTransition,
},
};
use std::{
fmt::{self, Debug, Formatter},
fs,
path::PathBuf,
sync::Arc,
};
use std::{fmt::Debug, sync::Arc};
use structopt::StructOpt;
use tokio::{
sync::Semaphore,
task,
time::{self},
};
use tokio::{sync::Semaphore, task, time};
use tracing::{debug, error, info, info_span, Instrument};

#[derive(StructOpt)]
#[derive(Debug, StructOpt)]
#[structopt(
name = "janus-aggregation-job-driver",
about = "Janus aggregation job driver",
rename_all = "kebab-case",
version = env!("CARGO_PKG_VERSION"),
)]
struct Options {
/// Path to configuration YAML.
#[structopt(
long,
env = "CONFIG_FILE",
parse(from_os_str),
takes_value = true,
required(true),
help = "path to configuration file"
)]
config_file: PathBuf,

/// Password for the PostgreSQL database connection. If specified, must not be specified in the
/// connection string.
#[structopt(long, env = "PGPASSWORD", help = "PostgreSQL password")]
database_password: Option<String>,

/// Datastore encryption keys.
#[structopt(
long,
env = "DATASTORE_KEYS",
takes_value = true,
use_delimiter = true,
required(true),
help = "datastore encryption keys, encoded in base64 then comma-separated"
)]
datastore_keys: Vec<String>,
#[structopt(flatten)]
common: CommonBinaryOptions,
}

impl Debug for Options {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Options")
.field("config_file", &self.config_file)
.finish()
impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

Expand All @@ -100,54 +64,40 @@ const DAP_AUTH_HEADER: &str = "DAP-Auth-Token";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Read arguments, read & parse config.
let options = Options::from_args();
let config: AggregationJobDriverConfig = {
let config_content = fs::read_to_string(&options.config_file)
.with_context(|| format!("couldn't read config file {:?}", options.config_file))?;
serde_yaml::from_str(&config_content)
.with_context(|| format!("couldn't parse config file {:?}", options.config_file))?
};
install_trace_subscriber(&config.logging_config)
.context("couldn't install tracing subscriber")?;

info!(?options, ?config, "Starting aggregation job driver");

// Connect to database & create other dependencies for AggregationJobDriver.
let clock = RealClock::default();
let datastore = datastore(
clock,
config.database,
options.database_password,
options.datastore_keys,
janus_main::<Options, _, _, _, _>(
RealClock::default(),
|clock, config: AggregationJobDriverConfig, datastore| async move {
let http_client = reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.build()
.context("couldn't create HTTP client")?;

// Start running.
Arc::new(AggregationJobDriver {
datastore,
clock,
http_client,
min_aggregation_job_discovery_delay: Duration::from_seconds(
config.min_aggregation_job_discovery_delay_secs,
),
max_aggregation_job_discovery_delay: Duration::from_seconds(
config.max_aggregation_job_discovery_delay_secs,
),
max_concurrent_aggregation_job_workers: config
.max_concurrent_aggregation_job_workers,
aggregation_worker_lease_duration: Duration::from_seconds(
config.aggregation_worker_lease_duration_secs,
),
aggregation_worker_lease_clock_skew_allowance: Duration::from_seconds(
config.aggregation_worker_lease_clock_skew_allowance_secs,
),
})
.run()
.await;

Ok(())
},
)
.context("couldn't connect to database")?;

let http_client = reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.build()
.context("couldn't create HTTP client")?;

// Start running.
Arc::new(AggregationJobDriver {
datastore,
clock,
http_client,
min_aggregation_job_discovery_delay: Duration::from_seconds(
config.min_aggregation_job_discovery_delay_secs,
),
max_aggregation_job_discovery_delay: Duration::from_seconds(
config.max_aggregation_job_discovery_delay_secs,
),
max_concurrent_aggregation_job_workers: config.max_concurrent_aggregation_job_workers,
aggregation_worker_lease_duration: Duration::from_seconds(
config.aggregation_worker_lease_duration_secs,
),
aggregation_worker_lease_clock_skew_allowance: Duration::from_seconds(
config.aggregation_worker_lease_clock_skew_allowance_secs,
),
})
.run()
.await
}

Expand Down
Loading

0 comments on commit 4e909d7

Please sign in to comment.