Skip to content

Commit

Permalink
Add traits for DO GC and Alarms
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Aug 3, 2023
1 parent 4eea658 commit 5bf7976
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 144 deletions.
41 changes: 30 additions & 11 deletions daphne_worker/src/durable/aggregate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::{
use daphne::DapAggregateShare;
use worker::*;

use super::{DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_AGGREGATE_STORE_GET: &str = "/internal/do/aggregate_store/get";
pub(crate) const DURABLE_AGGREGATE_STORE_MERGE: &str = "/internal/do/aggregate_store/merge";
pub(crate) const DURABLE_AGGREGATE_STORE_MARK_COLLECTED: &str =
Expand Down Expand Up @@ -61,17 +63,13 @@ impl DurableObject for AggregateStore {

async fn fetch(&mut self, req: Request) -> Result<Response> {
let id_hex = self.state.id().to_string();
let ControlFlow::Continue(mut req) = super::run_garbage_collection(
req,
&self.state,
&self.env,
self.config.deployment,
&mut self.touched,
id_hex,
BINDING_DAP_AGGREGATE_STORE,
)
.await? else {
return Response::from_json(&());
let mut req = match self
.schedule_for_garbage_collection(req, id_hex, BINDING_DAP_AGGREGATE_STORE)
.await?
{
ControlFlow::Continue(req) => req,
// this req was a GC request and as such we must return from this funcition.
ControlFlow::Break(_) => return Response::from_json(&()),
};

match (req.path().as_ref(), req.method()) {
Expand Down Expand Up @@ -138,3 +136,24 @@ impl DurableObject for AggregateStore {
}
}
}

impl DapDurableObject for AggregateStore {
fn state(&self) -> &State {
&self.state
}

fn deployment(&self) -> crate::config::DaphneWorkerDeployment {
self.config.deployment
}
}

#[async_trait::async_trait(?Send)]
impl GarbageCollectable for AggregateStore {
fn touched(&mut self) -> &mut bool {
&mut self.touched
}

fn env(&self) -> &Env {
&self.env
}
}
24 changes: 20 additions & 4 deletions daphne_worker/src/durable/helper_state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use daphne::{messages::TaskId, DapVersion, MetaAggregationJobId};
use tracing::trace;
use worker::*;

use super::{Alarmed, DapDurableObject};

pub(crate) fn durable_helper_state_name(
version: &DapVersion,
task_id: &TaskId,
Expand Down Expand Up @@ -58,10 +60,7 @@ impl DurableObject for HelperStateStore {

async fn fetch(&mut self, mut req: Request) -> Result<Response> {
// Ensure this DO instance is garbage collected eventually.
super::ensure_alarmed(
&self.state,
self.config.deployment,
&mut self.alarmed,
self.ensure_alarmed(
self.config
.helper_state_store_garbage_collect_after_secs
.expect("Daphne-Worker not configured as helper"),
Expand Down Expand Up @@ -110,3 +109,20 @@ impl DurableObject for HelperStateStore {
Response::from_json(&())
}
}

impl DapDurableObject for HelperStateStore {
fn state(&self) -> &State {
&self.state
}

fn deployment(&self) -> crate::config::DaphneWorkerDeployment {
self.config.deployment
}
}

#[async_trait::async_trait(?Send)]
impl Alarmed for HelperStateStore {
fn alarmed(&mut self) -> &mut bool {
&mut self.alarmed
}
}
41 changes: 30 additions & 11 deletions daphne_worker/src/durable/leader_agg_job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::{
use tracing::debug;
use worker::*;

use super::{DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_LEADER_AGG_JOB_QUEUE_PUT: &str = "/internal/do/agg_job_queue/put";
pub(crate) const DURABLE_LEADER_AGG_JOB_QUEUE_GET: &str = "/internal/do/agg_job_queue/get";
pub(crate) const DURABLE_LEADER_AGG_JOB_QUEUE_FINISH: &str = "/internal/do/agg_job_queue/finish";
Expand Down Expand Up @@ -59,17 +61,13 @@ impl DurableObject for LeaderAggregationJobQueue {

async fn fetch(&mut self, req: Request) -> Result<Response> {
let id_hex = self.state.id().to_string();
let ControlFlow::Continue(mut req) = super::run_garbage_collection(
req,
&self.state,
&self.env,
self.config.deployment,
&mut self.touched,
id_hex,
BINDING_DAP_LEADER_AGG_JOB_QUEUE,
)
.await? else {
return Response::from_json(&());
let mut req = match self
.schedule_for_garbage_collection(req, id_hex, BINDING_DAP_LEADER_AGG_JOB_QUEUE)
.await?
{
ControlFlow::Continue(req) => req,
// this req was a GC request and as such we must return from this funcition.
ControlFlow::Break(_) => return Response::from_json(&()),
};

match (req.path().as_ref(), req.method()) {
Expand Down Expand Up @@ -123,3 +121,24 @@ impl DurableObject for LeaderAggregationJobQueue {
}
}
}

impl DapDurableObject for LeaderAggregationJobQueue {
fn state(&self) -> &State {
&self.state
}

fn deployment(&self) -> crate::config::DaphneWorkerDeployment {
self.config.deployment
}
}

#[async_trait::async_trait(?Send)]
impl GarbageCollectable for LeaderAggregationJobQueue {
fn touched(&mut self) -> &mut bool {
&mut self.touched
}

fn env(&self) -> &Env {
&self.env
}
}
41 changes: 30 additions & 11 deletions daphne_worker/src/durable/leader_batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use serde::{Deserialize, Serialize};
use tracing::debug;
use worker::*;

use super::{DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_LEADER_BATCH_QUEUE_ASSIGN: &str = "/internal/do/leader_batch_queue/assign";
pub(crate) const DURABLE_LEADER_BATCH_QUEUE_CURRENT: &str =
"/internal/do/leader_batch_queue/current";
Expand Down Expand Up @@ -106,17 +108,13 @@ impl DurableObject for LeaderBatchQueue {

async fn fetch(&mut self, req: Request) -> Result<Response> {
let id_hex = self.state.id().to_string();
let ControlFlow::Continue(mut req) = super::run_garbage_collection(
req,
&self.state,
&self.env,
self.config.deployment,
&mut self.touched,
id_hex.clone(),
BINDING_DAP_LEADER_BATCH_QUEUE,
)
.await? else {
return Response::from_json(&());
let mut req = match self
.schedule_for_garbage_collection(req, id_hex, BINDING_DAP_LEADER_BATCH_QUEUE)
.await?
{
ControlFlow::Continue(req) => req,
// this req was a GC request and as such we must return from this funcition.
ControlFlow::Break(_) => return Response::from_json(&()),
};

match (req.path().as_ref(), req.method()) {
Expand Down Expand Up @@ -207,3 +205,24 @@ impl DurableObject for LeaderBatchQueue {
fn lookup_key(batch_id_hex: &str) -> String {
format!("{PENDING_PREFIX}/id/{batch_id_hex}")
}

impl DapDurableObject for LeaderBatchQueue {
fn state(&self) -> &State {
&self.state
}

fn deployment(&self) -> crate::config::DaphneWorkerDeployment {
self.config.deployment
}
}

#[async_trait::async_trait(?Send)]
impl GarbageCollectable for LeaderBatchQueue {
fn touched(&mut self) -> &mut bool {
&mut self.touched
}

fn env(&self) -> &Env {
&self.env
}
}
41 changes: 30 additions & 11 deletions daphne_worker/src/durable/leader_col_job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use prio::{
use serde::{Deserialize, Serialize};
use worker::*;

use super::{DapDurableObject, GarbageCollectable};

const PENDING_PREFIX: &str = "pending";
const PROCESSED_PREFIX: &str = "processed";

Expand Down Expand Up @@ -84,17 +86,13 @@ impl DurableObject for LeaderCollectionJobQueue {

async fn fetch(&mut self, req: Request) -> Result<Response> {
let id_hex = self.state.id().to_string();
let ControlFlow::Continue(mut req) = super::run_garbage_collection(
req,
&self.state,
&self.env,
self.config.deployment,
&mut self.touched,
id_hex,
BINDING_DAP_LEADER_COL_JOB_QUEUE,
)
.await? else {
return Response::from_json(&());
let mut req = match self
.schedule_for_garbage_collection(req, id_hex, BINDING_DAP_LEADER_COL_JOB_QUEUE)
.await?
{
ControlFlow::Continue(req) => req,
// this req was a GC request and as such we must return from this funcition.
ControlFlow::Break(_) => return Response::from_json(&()),
};

match (req.path().as_ref(), req.method()) {
Expand Down Expand Up @@ -251,3 +249,24 @@ fn processed_key(task_id: &TaskId, collection_job_id: &CollectionJobId) -> Strin
collection_job_id.to_base64url()
)
}

impl DapDurableObject for LeaderCollectionJobQueue {
fn state(&self) -> &State {
&self.state
}

fn deployment(&self) -> crate::config::DaphneWorkerDeployment {
self.config.deployment
}
}

#[async_trait::async_trait(?Send)]
impl GarbageCollectable for LeaderCollectionJobQueue {
fn touched(&mut self) -> &mut bool {
&mut self.touched
}

fn env(&self) -> &Env {
&self.env
}
}
Loading

0 comments on commit 5bf7976

Please sign in to comment.