From 3d6bc5b88bae5bab6121bc9427825cc8b9b87c3b Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 24 May 2023 08:49:27 +0530 Subject: [PATCH 1/6] store: Add paused_at and assigned_at to subgraph_deployment_assignmen table --- .../down.sql | 4 ++++ .../up.sql | 4 ++++ store/postgres/src/primary.rs | 2 ++ 3 files changed, 10 insertions(+) create mode 100644 store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/down.sql create mode 100644 store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/up.sql diff --git a/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/down.sql b/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/down.sql new file mode 100644 index 00000000000..0bb6211f8b3 --- /dev/null +++ b/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/down.sql @@ -0,0 +1,4 @@ +-- Define the 'down' migration to remove the 'paused_at' and 'assigned_at' fields from 'subgraph_deployment_assignment' table +ALTER TABLE subgraphs.subgraph_deployment_assignment +DROP COLUMN paused_at, +DROP COLUMN assigned_at; \ No newline at end of file diff --git a/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/up.sql b/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/up.sql new file mode 100644 index 00000000000..ee76b78db8d --- /dev/null +++ b/store/postgres/migrations/2023-05-23-1715-update-subgraph-deployment-assignment/up.sql @@ -0,0 +1,4 @@ +-- Define the 'up' migration to add the 'paused_at' and 'assigned_at' fields to 'subgraph_deployment_assignment' table +ALTER TABLE subgraphs.subgraph_deployment_assignment +ADD COLUMN paused_at TIMESTAMPTZ NULL, +ADD COLUMN assigned_at TIMESTAMPTZ NULL; \ No newline at end of file diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 90bdfae26a0..9c5ea002509 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -93,6 +93,8 @@ table! { subgraphs.subgraph_deployment_assignment { id -> Integer, node_id -> Text, + paused_at -> Nullable, + assigned_at -> Nullable, } } From 0c5ad008d53cf5910fa5de4a3facee7fdd90f42a Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 24 May 2023 15:31:15 +0530 Subject: [PATCH 2/6] store: postgres - add pause_subgraph and resume_subgraph methods --- store/postgres/src/primary.rs | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 9c5ea002509..4c77798fd91 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -981,6 +981,44 @@ impl<'a> Connection<'a> { } } + pub fn pause_subgraph(&self, site: &Site) -> Result<(), StoreError> { + use subgraph_deployment_assignment as a; + + let conn = self.conn.as_ref(); + + let updates = update(a::table.filter(a::id.eq(site.id))) + .set(a::paused_at.eq(sql("now()"))) + .execute(conn)?; + match updates { + 0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())), + 1 => Ok(()), + _ => { + // `id` is the primary key of the subgraph_deployment_assignment table, + // and we can therefore only update no or one entry + unreachable!() + } + } + } + + pub fn resume_subgraph(&self, site: &Site) -> Result<(), StoreError> { + use subgraph_deployment_assignment as a; + + let conn = self.conn.as_ref(); + + let updates = update(a::table.filter(a::id.eq(site.id))) + .set(a::paused_at.eq(sql("null"))) + .execute(conn)?; + match updates { + 0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())), + 1 => Ok(()), + _ => { + // `id` is the primary key of the subgraph_deployment_assignment table, + // and we can therefore only update no or one entry + unreachable!() + } + } + } + pub fn reassign_subgraph( &self, site: &Site, From f0cc9d38055f15ac4114d418e26ac25468bac3de Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 24 May 2023 17:59:35 +0530 Subject: [PATCH 3/6] node: Add graphman commands to pause and resume subgraphs --- node/src/bin/manager.rs | 18 ++++++++++++++++++ node/src/manager/commands/assign.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 0e6b6cd26e8..3f831343306 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -158,6 +158,16 @@ pub enum Command { /// The deployment (see `help info`) deployment: DeploymentSearch, }, + /// Pause a deployment + Pause { + /// The deployment (see `help info`) + deployment: DeploymentSearch, + }, + /// Resume a deployment + Resume { + /// The deployment (see `help info`) + deployment: DeploymentSearch, + }, /// Rewind a subgraph to a specific block Rewind { /// Force rewinding even if the block hash is not found in the local @@ -1091,6 +1101,14 @@ async fn main() -> anyhow::Result<()> { let sender = ctx.notification_sender(); commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node) } + Pause { deployment } => { + let sender = ctx.notification_sender(); + commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, true) + } + Resume { deployment } => { + let sender = ctx.notification_sender(); + commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, false) + } Rewind { force, sleep, diff --git a/node/src/manager/commands/assign.rs b/node/src/manager/commands/assign.rs index c9b69e693bc..4907f6292da 100644 --- a/node/src/manager/commands/assign.rs +++ b/node/src/manager/commands/assign.rs @@ -69,3 +69,31 @@ pub fn reassign( } Ok(()) } + +pub fn pause_or_resume( + primary: ConnectionPool, + sender: &NotificationSender, + search: &DeploymentSearch, + pause: bool, +) -> Result<(), Error> { + let locator = search.locate_unique(&primary)?; + + let conn = primary.get()?; + let conn = catalog::Connection::new(conn); + + let site = conn + .locate_site(locator.clone())? + .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; + + if pause { + println!("pausing {locator}"); + conn.pause_subgraph(&site)?; + println!("paused {locator}") + } else { + println!("resuming {locator}"); + conn.resume_subgraph(&site)?; + println!("resumed {locator}") + } + + Ok(()) +} From 9b30df51d4aff739c162c25322239c62d2037eb5 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 24 May 2023 20:22:37 +0530 Subject: [PATCH 4/6] core,graph,node,store: send StoreEvents for Pause and Resume and handle it --- core/src/subgraph/registrar.rs | 11 ++++- graph/src/components/store/traits.rs | 9 ++++ node/src/manager/commands/assign.rs | 26 ++++++++---- store/postgres/src/primary.rs | 62 ++++++++++++++++++++++++++-- store/postgres/src/subgraph_store.rs | 12 ++++++ 5 files changed, 105 insertions(+), 15 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7f706fcd622..65d66250abb 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -167,13 +167,20 @@ where match operation { EntityChangeOperation::Set => { store - .assigned_node(&deployment) + .assignment_status(&deployment) .map_err(|e| { anyhow!("Failed to get subgraph assignment entity: {}", e) }) .map(|assigned| -> Box + Send> { - if let Some(assigned) = assigned { + if let Some((assigned,is_paused)) = assigned { if assigned == node_id { + + if is_paused{ + // Subgraph is paused, so we don't start it + debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused); + return Box::new(stream::empty()); + } + // Start subgraph on this node debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id); Box::new(stream::once(Ok(AssignmentEvent::Add { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 01875b9bfa2..a9e06e264a7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -95,6 +95,15 @@ pub trait SubgraphStore: Send + Sync + 'static { fn assigned_node(&self, deployment: &DeploymentLocator) -> Result, StoreError>; + /// Returns Option<(node_id,is_paused)> where `node_id` is the node that + /// the subgraph is assigned to, and `is_paused` is true if the + /// subgraph is paused. + /// Returns None if the deployment does not exist. + fn assignment_status( + &self, + deployment: &DeploymentLocator, + ) -> Result, StoreError>; + fn assignments(&self, node: &NodeId) -> Result, StoreError>; /// Return `true` if a subgraph `name` exists, regardless of whether the diff --git a/node/src/manager/commands/assign.rs b/node/src/manager/commands/assign.rs index 4907f6292da..dd6e9212ad5 100644 --- a/node/src/manager/commands/assign.rs +++ b/node/src/manager/commands/assign.rs @@ -85,15 +85,23 @@ pub fn pause_or_resume( .locate_site(locator.clone())? .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; - if pause { - println!("pausing {locator}"); - conn.pause_subgraph(&site)?; - println!("paused {locator}") - } else { - println!("resuming {locator}"); - conn.resume_subgraph(&site)?; - println!("resumed {locator}") - } + let change = match conn.assignment_status(&site)? { + Some((_, paused)) => { + if paused == pause { + println!("deployment {locator} is already {paused}"); + vec![] + } else { + println!("pausing {locator}"); + conn.pause_subgraph(&site)? + } + } + None => { + println!("resuming {locator}"); + conn.resume_subgraph(&site)? + } + }; + println!("Operation completed"); + conn.send_store_event(sender, &StoreEvent::new(change))?; Ok(()) } diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 4c77798fd91..f2e5cb42df2 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -405,6 +405,7 @@ pub fn make_dummy_site(deployment: DeploymentHash, namespace: Namespace, network /// mirrored through `Mirror::refresh_tables` and must be queries, i.e., /// read-only mod queries { + use diesel::data_types::PgTimestamp; use diesel::dsl::{any, exists, sql}; use diesel::pg::PgConnection; use diesel::prelude::{ @@ -626,6 +627,36 @@ mod queries { .transpose() } + /// Returns Option<(node_id,is_paused)> where `node_id` is the node that + /// the subgraph is assigned to, and `is_paused` is true if the + /// subgraph is paused. + /// Returns None if the deployment does not exist. + pub(super) fn assignment_status( + conn: &PgConnection, + site: &Site, + ) -> Result, StoreError> { + a::table + .filter(a::id.eq(site.id)) + .select((a::node_id, a::paused_at)) + .first::<(String, Option)>(conn) + .optional()? + .map(|(node, ts)| { + let node_id = NodeId::new(&node).map_err(|()| { + constraint_violation!( + "invalid node id `{}` in assignment for `{}`", + node, + site.deployment + ) + })?; + + match ts { + Some(_) => Ok((node_id, true)), + None => Ok((node_id, false)), + } + }) + .transpose() + } + pub(super) fn version_info( conn: &PgConnection, version: &str, @@ -981,7 +1012,7 @@ impl<'a> Connection<'a> { } } - pub fn pause_subgraph(&self, site: &Site) -> Result<(), StoreError> { + pub fn pause_subgraph(&self, site: &Site) -> Result, StoreError> { use subgraph_deployment_assignment as a; let conn = self.conn.as_ref(); @@ -991,7 +1022,11 @@ impl<'a> Connection<'a> { .execute(conn)?; match updates { 0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())), - 1 => Ok(()), + 1 => { + let change = + EntityChange::for_assignment(site.into(), EntityChangeOperation::Removed); + Ok(vec![change]) + } _ => { // `id` is the primary key of the subgraph_deployment_assignment table, // and we can therefore only update no or one entry @@ -1000,7 +1035,7 @@ impl<'a> Connection<'a> { } } - pub fn resume_subgraph(&self, site: &Site) -> Result<(), StoreError> { + pub fn resume_subgraph(&self, site: &Site) -> Result, StoreError> { use subgraph_deployment_assignment as a; let conn = self.conn.as_ref(); @@ -1010,7 +1045,10 @@ impl<'a> Connection<'a> { .execute(conn)?; match updates { 0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())), - 1 => Ok(()), + 1 => { + let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set); + Ok(vec![change]) + } _ => { // `id` is the primary key of the subgraph_deployment_assignment table, // and we can therefore only update no or one entry @@ -1148,6 +1186,14 @@ impl<'a> Connection<'a> { queries::assigned_node(self.conn.as_ref(), site) } + /// Returns Option<(node_id,is_paused)> where `node_id` is the node that + /// the subgraph is assigned to, and `is_paused` is true if the + /// subgraph is paused. + /// Returns None if the deployment does not exist. + pub fn assignment_status(&self, site: &Site) -> Result, StoreError> { + queries::assignment_status(self.conn.as_ref(), site) + } + /// Create a copy of the site `src` in the shard `shard`, but mark it as /// not active. If there already is a site in `shard`, return that /// instead. @@ -1761,6 +1807,14 @@ impl Mirror { self.read(|conn| queries::assigned_node(conn, site)) } + /// Returns Option<(node_id,is_paused)> where `node_id` is the node that + /// the subgraph is assigned to, and `is_paused` is true if the + /// subgraph is paused. + /// Returns None if the deployment does not exist. + pub fn assignment_status(&self, site: &Site) -> Result, StoreError> { + self.read(|conn| queries::assignment_status(conn, site)) + } + pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result, StoreError> { self.read(|conn| queries::find_active_site(conn, subgraph)) } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 0ce48b4ac88..0c338ee42ad 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1286,6 +1286,18 @@ impl SubgraphStoreTrait for SubgraphStore { self.mirror.assigned_node(site.as_ref()) } + /// Returns Option<(node_id,is_paused)> where `node_id` is the node that + /// the subgraph is assigned to, and `is_paused` is true if the + /// subgraph is paused. + /// Returns None if the deployment does not exist. + fn assignment_status( + &self, + deployment: &DeploymentLocator, + ) -> Result, StoreError> { + let site = self.find_site(deployment.id.into())?; + self.mirror.assignment_status(site.as_ref()) + } + fn assignments(&self, node: &NodeId) -> Result, StoreError> { self.mirror .assignments(node) From 046dd41103009d08ddfeaebb7041e986f87d55e3 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Thu, 25 May 2023 13:55:16 +0530 Subject: [PATCH 5/6] graph,store,core - add active_assigments() for unpaused subgraphs and wire it in subgraph startup --- core/src/subgraph/registrar.rs | 2 +- graph/src/components/store/traits.rs | 3 +++ node/src/manager/commands/assign.rs | 20 ++++++++++++-------- store/postgres/src/primary.rs | 20 ++++++++++++++++++++ store/postgres/src/subgraph_store.rs | 6 ++++++ 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 65d66250abb..3e90c167987 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -222,7 +222,7 @@ where let logger = self.logger.clone(); let node_id = self.node_id.clone(); - future::result(self.store.assignments(&self.node_id)) + future::result(self.store.active_assignments(&self.node_id)) .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e)) .and_then(move |deployments| { // This operation should finish only after all subgraphs are diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index a9e06e264a7..9ca494a4f62 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -106,6 +106,9 @@ pub trait SubgraphStore: Send + Sync + 'static { fn assignments(&self, node: &NodeId) -> Result, StoreError>; + /// Returns assignments that are not paused + fn active_assignments(&self, node: &NodeId) -> Result, StoreError>; + /// Return `true` if a subgraph `name` exists, regardless of whether the /// subgraph has any deployments attached to it fn subgraph_exists(&self, name: &SubgraphName) -> Result; diff --git a/node/src/manager/commands/assign.rs b/node/src/manager/commands/assign.rs index dd6e9212ad5..74b1961fb68 100644 --- a/node/src/manager/commands/assign.rs +++ b/node/src/manager/commands/assign.rs @@ -74,7 +74,7 @@ pub fn pause_or_resume( primary: ConnectionPool, sender: &NotificationSender, search: &DeploymentSearch, - pause: bool, + should_pause: bool, ) -> Result<(), Error> { let locator = search.locate_unique(&primary)?; @@ -86,18 +86,22 @@ pub fn pause_or_resume( .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; let change = match conn.assignment_status(&site)? { - Some((_, paused)) => { - if paused == pause { - println!("deployment {locator} is already {paused}"); - vec![] - } else { + Some((_, is_paused)) => { + if should_pause { + if is_paused { + println!("deployment {locator} is already paused"); + return Ok(()); + } println!("pausing {locator}"); conn.pause_subgraph(&site)? + } else { + println!("resuming {locator}"); + conn.resume_subgraph(&site)? } } None => { - println!("resuming {locator}"); - conn.resume_subgraph(&site)? + println!("deployment {locator} not found"); + return Ok(()); } }; println!("Operation completed"); diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index f2e5cb42df2..29aec86d670 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -588,6 +588,22 @@ mod queries { .collect::, _>>() } + // All assignments for a node that are currently not paused + pub(super) fn active_assignments( + conn: &PgConnection, + node: &NodeId, + ) -> Result, StoreError> { + ds::table + .inner_join(a::table.on(a::id.eq(ds::id))) + .filter(a::node_id.eq(node.as_str())) + .filter(a::paused_at.is_null()) + .select(ds::all_columns) + .load::(conn)? + .into_iter() + .map(Site::try_from) + .collect::, _>>() + } + pub(super) fn fill_assignments( conn: &PgConnection, infos: &mut [status::Info], @@ -1803,6 +1819,10 @@ impl Mirror { self.read(|conn| queries::assignments(conn, node)) } + pub fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + self.read(|conn| queries::active_assignments(conn, node)) + } + pub fn assigned_node(&self, site: &Site) -> Result, StoreError> { self.read(|conn| queries::assigned_node(conn, site)) } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 0c338ee42ad..a44f59bb718 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1304,6 +1304,12 @@ impl SubgraphStoreTrait for SubgraphStore { .map(|sites| sites.iter().map(|site| site.into()).collect()) } + fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + self.mirror + .active_assignments(node) + .map(|sites| sites.iter().map(|site| site.into()).collect()) + } + fn subgraph_exists(&self, name: &SubgraphName) -> Result { self.mirror.subgraph_exists(name) } From 6b6aa7f50f74a31d369dbef5131a73da9b8a1e9a Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Fri, 26 May 2023 23:17:42 +0530 Subject: [PATCH 6/6] NEWS.md : add graphman commands pause and resume --- NEWS.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 176faf76aef..1ae4cf93ba1 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,7 +4,8 @@ - `graphman rewind` has changed, block-number and block-hash are now flags instead of arguments - `graphman rewind` now has an extra flag `--start-block` which will rewind to the startBlock set in manifest or to the genesis block if no startBlock is set - +- `graphman` now has two new commands `pause` and `resume` that can be used to pause and resume a deployment +