Skip to content

Commit

Permalink
store: Reduce the amount of data getting mirrored from the primary
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Oct 13, 2022
1 parent fdb9806 commit 000a33c
Showing 1 changed file with 38 additions and 16 deletions.
54 changes: 38 additions & 16 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,19 +1583,25 @@ impl Mirror {
"subgraph_version",
];

fn run_query(conn: &PgConnection, query: String) -> Result<(), StoreError> {
conn.batch_execute(&query).map_err(StoreError::from)
}

fn copy_table(
conn: &PgConnection,
src_nsp: &str,
dst_nsp: &str,
table_name: &str,
) -> Result<(), StoreError> {
let query = format!(
"insert into {dst_nsp}.{table_name} select * from {src_nsp}.{table_name};",
src_nsp = src_nsp,
dst_nsp = dst_nsp,
table_name = table_name
);
conn.batch_execute(&query).map_err(StoreError::from)
run_query(
conn,
format!(
"insert into {dst_nsp}.{table_name} select * from {src_nsp}.{table_name};",
src_nsp = src_nsp,
dst_nsp = dst_nsp,
table_name = table_name
),
)
}

let check_cancel = || {
Expand All @@ -1622,6 +1628,7 @@ impl Mirror {
conn.batch_execute(&query)?;
check_cancel()?;

// Repopulate `PUBLIC_TABLES` by copying their data wholesale
for table_name in PUBLIC_TABLES {
copy_table(
conn,
Expand All @@ -1631,15 +1638,30 @@ impl Mirror {
)?;
check_cancel()?;
}
for table_name in SUBGRAPHS_TABLES {
copy_table(
conn,
&ForeignServer::metadata_schema(&*PRIMARY_SHARD),
NAMESPACE_SUBGRAPHS,
table_name,
)?;
check_cancel()?;
}

// Repopulate `SUBGRAPHS_TABLES` but only copy the data we actually
// need to respond to queries when the primary is down
let src_nsp = ForeignServer::metadata_schema(&*PRIMARY_SHARD);
let dst_nsp = NAMESPACE_SUBGRAPHS;

run_query(
conn,
format!(
"insert into {dst_nsp}.subgraph \
select * from {src_nsp}.subgraph
where current_version is not null;"
),
)?;
run_query(
conn,
format!(
"insert into {dst_nsp}.subgraph_version \
select v.* from {src_nsp}.subgraph_version v, {src_nsp}.subgraph s
where v.id = s.current_version;"
),
)?;
copy_table(conn, &src_nsp, dst_nsp, "subgraph_deployment_assignment")?;

Ok(())
}

Expand Down

0 comments on commit 000a33c

Please sign in to comment.