Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to pause and resume subgraphs #4642

Merged
merged 6 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

- `graphman rewind` has changed, block-number and block-hash are now flags instead of arguments
- `graphman rewind` now has an extra flag `--start-block` which will rewind to the startBlock set in manifest or to the genesis block if no startBlock is set

- `graphman` now has two new commands `pause` and `resume` that can be used to pause and resume a deployment

<!--
Note: the changes in this PR were technically released in v0.31.0, but the feature also requires changes to graph-cli, which at the time of writing has NOT been released. This feature will make it into the release notes of graph-node only once graph-cli has been updated.

Expand Down
13 changes: 10 additions & 3 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,20 @@ where
match operation {
EntityChangeOperation::Set => {
store
.assigned_node(&deployment)
.assignment_status(&deployment)
.map_err(|e| {
anyhow!("Failed to get subgraph assignment entity: {}", e)
})
.map(|assigned| -> Box<dyn Stream<Item = _, Error = _> + Send> {
if let Some(assigned) = assigned {
if let Some((assigned,is_paused)) = assigned {
if assigned == node_id {

if is_paused{
// Subgraph is paused, so we don't start it
debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused);
return Box::new(stream::empty());
}

// Start subgraph on this node
debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id);
Box::new(stream::once(Ok(AssignmentEvent::Add {
Expand Down Expand Up @@ -215,7 +222,7 @@ where
let logger = self.logger.clone();
let node_id = self.node_id.clone();

future::result(self.store.assignments(&self.node_id))
future::result(self.store.active_assignments(&self.node_id))
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))
.and_then(move |deployments| {
// This operation should finish only after all subgraphs are
Expand Down
12 changes: 12 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,20 @@ pub trait SubgraphStore: Send + Sync + 'static {

fn assigned_node(&self, deployment: &DeploymentLocator) -> Result<Option<NodeId>, StoreError>;

/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
fn assignment_status(
&self,
deployment: &DeploymentLocator,
) -> Result<Option<(NodeId, bool)>, StoreError>;

fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;

/// Returns assignments that are not paused
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;

/// Return `true` if a subgraph `name` exists, regardless of whether the
/// subgraph has any deployments attached to it
fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError>;
Expand Down
18 changes: 18 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ pub enum Command {
/// The deployment (see `help info`)
deployment: DeploymentSearch,
},
/// Pause a deployment
Pause {
/// The deployment (see `help info`)
deployment: DeploymentSearch,
},
/// Resume a deployment
Resume {
/// The deployment (see `help info`)
deployment: DeploymentSearch,
},
/// Rewind a subgraph to a specific block
Rewind {
/// Force rewinding even if the block hash is not found in the local
Expand Down Expand Up @@ -1091,6 +1101,14 @@ async fn main() -> anyhow::Result<()> {
let sender = ctx.notification_sender();
commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node)
}
Pause { deployment } => {
let sender = ctx.notification_sender();
commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, true)
}
Resume { deployment } => {
let sender = ctx.notification_sender();
commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, false)
}
Rewind {
force,
sleep,
Expand Down
40 changes: 40 additions & 0 deletions node/src/manager/commands/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,43 @@ pub fn reassign(
}
Ok(())
}

pub fn pause_or_resume(
primary: ConnectionPool,
sender: &NotificationSender,
search: &DeploymentSearch,
should_pause: bool,
) -> Result<(), Error> {
let locator = search.locate_unique(&primary)?;

let conn = primary.get()?;
let conn = catalog::Connection::new(conn);

let site = conn
.locate_site(locator.clone())?
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;

let change = match conn.assignment_status(&site)? {
Some((_, is_paused)) => {
if should_pause {
if is_paused {
println!("deployment {locator} is already paused");
return Ok(());
}
println!("pausing {locator}");
conn.pause_subgraph(&site)?
} else {
println!("resuming {locator}");
conn.resume_subgraph(&site)?
}
}
None => {
println!("deployment {locator} not found");
return Ok(());
}
};
println!("Operation completed");
conn.send_store_event(sender, &StoreEvent::new(change))?;

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Define the 'down' migration to remove the 'paused_at' and 'assigned_at' fields from 'subgraph_deployment_assignment' table
ALTER TABLE subgraphs.subgraph_deployment_assignment
DROP COLUMN paused_at,
DROP COLUMN assigned_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Define the 'up' migration to add the 'paused_at' and 'assigned_at' fields to 'subgraph_deployment_assignment' table
ALTER TABLE subgraphs.subgraph_deployment_assignment
ADD COLUMN paused_at TIMESTAMPTZ NULL,
ADD COLUMN assigned_at TIMESTAMPTZ NULL;
114 changes: 114 additions & 0 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ table! {
subgraphs.subgraph_deployment_assignment {
id -> Integer,
node_id -> Text,
paused_at -> Nullable<Timestamptz>,
assigned_at -> Nullable<Timestamptz>,
}
}

Expand Down Expand Up @@ -403,6 +405,7 @@ pub fn make_dummy_site(deployment: DeploymentHash, namespace: Namespace, network
/// mirrored through `Mirror::refresh_tables` and must be queries, i.e.,
/// read-only
mod queries {
use diesel::data_types::PgTimestamp;
use diesel::dsl::{any, exists, sql};
use diesel::pg::PgConnection;
use diesel::prelude::{
Expand Down Expand Up @@ -585,6 +588,22 @@ mod queries {
.collect::<Result<Vec<Site>, _>>()
}

// All assignments for a node that are currently not paused
pub(super) fn active_assignments(
conn: &PgConnection,
node: &NodeId,
) -> Result<Vec<Site>, StoreError> {
ds::table
.inner_join(a::table.on(a::id.eq(ds::id)))
.filter(a::node_id.eq(node.as_str()))
.filter(a::paused_at.is_null())
.select(ds::all_columns)
.load::<Schema>(conn)?
.into_iter()
.map(Site::try_from)
.collect::<Result<Vec<Site>, _>>()
}

pub(super) fn fill_assignments(
conn: &PgConnection,
infos: &mut [status::Info],
Expand Down Expand Up @@ -624,6 +643,36 @@ mod queries {
.transpose()
}

/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
pub(super) fn assignment_status(
conn: &PgConnection,
site: &Site,
) -> Result<Option<(NodeId, bool)>, StoreError> {
a::table
.filter(a::id.eq(site.id))
.select((a::node_id, a::paused_at))
.first::<(String, Option<PgTimestamp>)>(conn)
.optional()?
.map(|(node, ts)| {
let node_id = NodeId::new(&node).map_err(|()| {
constraint_violation!(
"invalid node id `{}` in assignment for `{}`",
node,
site.deployment
)
})?;

match ts {
Some(_) => Ok((node_id, true)),
None => Ok((node_id, false)),
}
})
.transpose()
}

pub(super) fn version_info(
conn: &PgConnection,
version: &str,
Expand Down Expand Up @@ -979,6 +1028,51 @@ impl<'a> Connection<'a> {
}
}

pub fn pause_subgraph(&self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_ref();

let updates = update(a::table.filter(a::id.eq(site.id)))
.set(a::paused_at.eq(sql("now()")))
.execute(conn)?;
match updates {
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1 => {
let change =
EntityChange::for_assignment(site.into(), EntityChangeOperation::Removed);
Ok(vec![change])
}
_ => {
// `id` is the primary key of the subgraph_deployment_assignment table,
// and we can therefore only update no or one entry
unreachable!()
}
}
}

pub fn resume_subgraph(&self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_ref();

let updates = update(a::table.filter(a::id.eq(site.id)))
.set(a::paused_at.eq(sql("null")))
.execute(conn)?;
match updates {
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1 => {
let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
Ok(vec![change])
}
_ => {
// `id` is the primary key of the subgraph_deployment_assignment table,
// and we can therefore only update no or one entry
unreachable!()
}
}
}

pub fn reassign_subgraph(
&self,
site: &Site,
Expand Down Expand Up @@ -1108,6 +1202,14 @@ impl<'a> Connection<'a> {
queries::assigned_node(self.conn.as_ref(), site)
}

/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
queries::assignment_status(self.conn.as_ref(), site)
}

/// Create a copy of the site `src` in the shard `shard`, but mark it as
/// not active. If there already is a site in `shard`, return that
/// instead.
Expand Down Expand Up @@ -1717,10 +1819,22 @@ impl Mirror {
self.read(|conn| queries::assignments(conn, node))
}

pub fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
self.read(|conn| queries::active_assignments(conn, node))
}

pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {
self.read(|conn| queries::assigned_node(conn, site))
}

/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
self.read(|conn| queries::assignment_status(conn, site))
}

pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result<Option<Site>, StoreError> {
self.read(|conn| queries::find_active_site(conn, subgraph))
}
Expand Down
18 changes: 18 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,12 +1286,30 @@ impl SubgraphStoreTrait for SubgraphStore {
self.mirror.assigned_node(site.as_ref())
}

/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
fn assignment_status(
&self,
deployment: &DeploymentLocator,
) -> Result<Option<(NodeId, bool)>, StoreError> {
let site = self.find_site(deployment.id.into())?;
self.mirror.assignment_status(site.as_ref())
}

fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
self.mirror
.assignments(node)
.map(|sites| sites.iter().map(|site| site.into()).collect())
}

fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
self.mirror
.active_assignments(node)
.map(|sites| sites.iter().map(|site| site.into()).collect())
}

fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError> {
self.mirror.subgraph_exists(name)
}
Expand Down