Skip to content

Commit

Permalink
return the old VID generation for ols subgraphs
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Jan 27, 2025
1 parent 16b47d4 commit ec452e8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
30 changes: 28 additions & 2 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ use graph::{
};
use itertools::Itertools;

use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table};
use crate::{
catalog,
copy::AdaptiveBatchSize,
deployment,
relational::{Table, VID_COLUMN},
};

use super::{Catalog, Layout, Namespace};

Expand Down Expand Up @@ -68,6 +73,7 @@ struct TablePair {
// has the same name as `src` but is in a different namespace
dst: Arc<Table>,
src_nsp: Namespace,
dst_nsp: Namespace,
}

impl TablePair {
Expand All @@ -94,7 +100,12 @@ impl TablePair {
}
conn.batch_execute(&query)?;

Ok(TablePair { src, dst, src_nsp })
Ok(TablePair {
src,
dst,
src_nsp,
dst_nsp,
})
}

/// Copy all entity versions visible between `earliest_block` and
Expand Down Expand Up @@ -228,6 +239,12 @@ impl TablePair {
let src_qname = &self.src.qualified_name;
let dst_qname = &self.dst.qualified_name;
let src_nsp = &self.src_nsp;
let dst_nsp = &self.dst_nsp;

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.new_vid_form();

let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -237,6 +254,15 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
)?;
}

writeln!(query, "drop table {src_qname};")?;
writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?;
conn.transaction(|conn| conn.batch_execute(&query))?;
Expand Down
10 changes: 8 additions & 2 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5096,6 +5096,8 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.new_vid_form();

// Construct a query
// insert into {dst}({columns})
// select {columns} from {src}
Expand All @@ -5116,7 +5118,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(")\nselect ");
for column in &self.columns {
Expand Down Expand Up @@ -5182,7 +5186,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(" from ");
out.push_sql(self.src.qualified_name.as_str());
Expand Down

0 comments on commit ec452e8

Please sign in to comment.