Skip to content

Commit

Permalink
graph,store,core - add active_assigments() for unpaused subgraphs and…
Browse files Browse the repository at this point in the history
… wire it in subgraph startup
  • Loading branch information
incrypto32 committed May 25, 2023
1 parent 9b30df5 commit 0f1ca0c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ where
let logger = self.logger.clone();
let node_id = self.node_id.clone();

future::result(self.store.assignments(&self.node_id))
future::result(self.store.active_assignments(&self.node_id))
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))
.and_then(move |deployments| {
// This operation should finish only after all subgraphs are
Expand Down
3 changes: 3 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub trait SubgraphStore: Send + Sync + 'static {

fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;

/// Returns assignments that are not paused
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;

/// Return `true` if a subgraph `name` exists, regardless of whether the
/// subgraph has any deployments attached to it
fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError>;
Expand Down
20 changes: 20 additions & 0 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,22 @@ mod queries {
.collect::<Result<Vec<Site>, _>>()
}

// All assignments for a node that are currently not paused
pub(super) fn active_assignments(
conn: &PgConnection,
node: &NodeId,
) -> Result<Vec<Site>, StoreError> {
ds::table
.inner_join(a::table.on(a::id.eq(ds::id)))
.filter(a::node_id.eq(node.as_str()))
.filter(a::paused_at.is_null())
.select(ds::all_columns)
.load::<Schema>(conn)?
.into_iter()
.map(Site::try_from)
.collect::<Result<Vec<Site>, _>>()
}

pub(super) fn fill_assignments(
conn: &PgConnection,
infos: &mut [status::Info],
Expand Down Expand Up @@ -1803,6 +1819,10 @@ impl Mirror {
self.read(|conn| queries::assignments(conn, node))
}

pub fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
self.read(|conn| queries::active_assignments(conn, node))
}

pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {
self.read(|conn| queries::assigned_node(conn, site))
}
Expand Down
6 changes: 6 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,12 @@ impl SubgraphStoreTrait for SubgraphStore {
.map(|sites| sites.iter().map(|site| site.into()).collect())
}

fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
self.mirror
.active_assignments(node)
.map(|sites| sites.iter().map(|site| site.into()).collect())
}

fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError> {
self.mirror.subgraph_exists(name)
}
Expand Down

0 comments on commit 0f1ca0c

Please sign in to comment.