Skip to content

Commit

Permalink
Implement snapshot restore logic (#2398)
Browse files Browse the repository at this point in the history
* Implement basic restore logic

* Implement restore logic for accounts

* Implement restore logic for ontology

* Implement restore logic for entities

* Satisfy rustfmt

* Move ontology metadata into own module

* Use `INTEGER` instead of `INT4`

* Update apps/hash-graph/lib/graph/src/snapshot.rs

Co-authored-by: Ahmad Sattar <thehabbos007@gmail.com>

* Clarify documentation on `restore` function

* Use `try_fold` instead of `fold`

---------

Co-authored-by: Ahmad Sattar <thehabbos007@gmail.com>
  • Loading branch information
TimDiekmann and thehabbos007 authored Apr 13, 2023
1 parent 5df489a commit a92b712
Show file tree
Hide file tree
Showing 31 changed files with 2,812 additions and 199 deletions.
11 changes: 7 additions & 4 deletions apps/hash-graph/bin/cli/src/subcommand/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ pub async fn snapshot(args: SnapshotArgs) -> Result<(), GraphError> {
}
SnapshotCommand::Restore(_) => {
store
.restore_snapshot(FramedRead::new(
io::BufReader::new(io::stdin()),
codec::JsonLinesDecoder::default(),
))
.restore_snapshot(
FramedRead::new(
io::BufReader::new(io::stdin()),
codec::JsonLinesDecoder::default(),
),
10_000,
)
.await
.change_context(GraphError)
.attach_printable("Failed to restore snapshot")?;
Expand Down
2 changes: 1 addition & 1 deletion apps/hash-graph/lib/graph/src/shared/provenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ define_provenance_id!(RecordCreatedById);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct ProvenanceMetadata {
record_created_by_id: RecordCreatedById,
pub record_created_by_id: RecordCreatedById,
}

impl ProvenanceMetadata {
Expand Down
178 changes: 109 additions & 69 deletions apps/hash-graph/lib/graph/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
pub mod account;
pub mod codec;
pub mod entity;

mod entity;
mod error;
mod metadata;
mod ontology;
mod restore;

use std::pin::pin;

use error_stack::{ensure, Context, Report, Result, ResultExt};
use async_trait::async_trait;
use error_stack::{Context, IntoReport, Report, Result, ResultExt};
use futures::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use type_system::{DataType, EntityType, PropertyType};

pub use self::{
entity::EntitySnapshotRecord,
error::{SnapshotDumpError, SnapshotRestoreError},
metadata::{BlockProtocolModuleVersions, CustomGlobalMetadata},
ontology::{
Expand All @@ -24,7 +26,8 @@ pub use self::{
pub use crate::snapshot::metadata::SnapshotMetadata;
use crate::{
knowledge::Entity,
store::{crud::Read, query::Filter},
snapshot::{entity::EntitySnapshotRecord, restore::SnapshotRecordBatch},
store::{crud::Read, query::Filter, AsClient, InsertionError, PostgresStore},
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -92,32 +95,28 @@ impl SnapshotEntry {
}
}

pub struct SnapshotStore<S>(S);
#[async_trait]
trait WriteBatch<C> {
async fn begin(postgres_client: &PostgresStore<C>) -> Result<(), InsertionError>;
async fn write(&self, postgres_client: &PostgresStore<C>) -> Result<(), InsertionError>;
async fn commit(postgres_client: &PostgresStore<C>) -> Result<(), InsertionError>;
}

pub struct SnapshotStore<C>(PostgresStore<C>);

impl<S> SnapshotStore<S> {
pub const fn new(store: S) -> Self {
impl<C> SnapshotStore<C> {
pub const fn new(store: PostgresStore<C>) -> Self {
Self(store)
}
}

#[expect(
clippy::trait_duplication_in_bounds,
reason = "False positive: the generics are different"
)]
impl<S> SnapshotStore<S>
where
S: Read<OntologyTypeSnapshotRecord<DataType>>
+ Read<OntologyTypeSnapshotRecord<PropertyType>>
+ Read<OntologyTypeSnapshotRecord<EntityType>>
+ Read<Entity>
+ Send,
{
impl<C: AsClient> SnapshotStore<C> {
/// Convenience function to create a stream of snapshot entries.
async fn create_dump_stream<T>(
&self,
) -> Result<impl Stream<Item = Result<T, SnapshotDumpError>> + Send, SnapshotDumpError>
where
S: Read<T>,
PostgresStore<C>: Read<T>,
{
Ok(Read::<T>::read(&self.0, &Filter::All(vec![]), None)
.await
Expand Down Expand Up @@ -155,15 +154,24 @@ where
// entries or even use multiple connections to the database.
// see https://app.asana.com/0/0/1204347352251098/f

let data_type_stream = pin!(self.create_dump_stream().await?);
let data_type_stream = pin!(
self.create_dump_stream::<OntologyTypeSnapshotRecord<DataType>>()
.await?
);
sink.send_all(&mut data_type_stream.map_ok(SnapshotEntry::DataType))
.await?;

let property_type_stream = pin!(self.create_dump_stream().await?);
let property_type_stream = pin!(
self.create_dump_stream::<OntologyTypeSnapshotRecord<PropertyType>>()
.await?
);
sink.send_all(&mut property_type_stream.map_ok(SnapshotEntry::PropertyType))
.await?;

let entity_type_stream = pin!(self.create_dump_stream().await?);
let entity_type_stream = pin!(
self.create_dump_stream::<OntologyTypeSnapshotRecord<EntityType>>()
.await?
);
sink.send_all(&mut entity_type_stream.map_ok(SnapshotEntry::EntityType))
.await?;

Expand All @@ -176,57 +184,89 @@ where

/// Reads the snapshot from from the stream into the store.
///
/// The data emitted by the stream is read in a separate thread and is sent to different
/// channels for each record type. Each channel holds a buffer of `chunk_size` entries. The
/// receivers of the channels are then used to insert the records into the store. When a write
/// operation to the store succeeds, the next entry is read from the channel, even if the
/// buffer of the channel is not full yet. This ensures, that the store is continuously writing
/// to the database and does not wait for the buffer to be full.
///
/// Writing to the store happens in three stages:
/// 1. The first stage is the `begin` stage. This stage is executed before any records are
/// read from the stream. It is used to create a transaction, so a possible rollback is
/// possible. For each data, which is inserted, a temporary table is created. This table
/// is used to insert the data into the store without locking the store and avoiding
/// yet unfulfilled foreign key constraints.
/// 2. The second stage is the `write` stage. This stage is executed for each record type. It
/// reads the batch of records from the channels and inserts them into the temporary
/// tables, which were created above.
/// 3. The third stage is the `commit` stage. This stage is executed after all records have
/// been read from the stream. It is used to insert the data from the temporary tables
/// into the store and to drop the temporary tables. As foreign key constraints are now
/// enabled, this stage might fail. In this case, the transaction is rolled back and the
/// error is returned.
///
/// If the input stream contains an `Err` value, the snapshot restore is aborted and the error
/// is returned.
///
/// # Errors
///
/// - If writing the record into the datastore fails
#[expect(
clippy::todo,
clippy::missing_panics_doc,
reason = "This will be done in a follow-up"
)]
/// - If reading a record from the provided stream fails
/// - If writing a record into the datastore fails
pub async fn restore_snapshot(
&mut self,
snapshot: impl Stream<Item = Result<SnapshotEntry, impl Context>> + Send,
snapshot: impl Stream<Item = Result<SnapshotEntry, impl Context>> + Send + 'static,
chunk_size: usize,
) -> Result<(), SnapshotRestoreError> {
let mut snapshot = pin!(snapshot);
while let Some(entry) = snapshot.next().await {
let entry = entry.change_context(SnapshotRestoreError::Canceled)?;

match entry {
SnapshotEntry::Snapshot(global) => {
ensure!(
global.block_protocol_module_versions.graph
== semver::Version::new(0, 3, 0),
SnapshotRestoreError::Unsupported
);
}
SnapshotEntry::DataType(data_type) => {
tracing::trace!(
"Inserting data type: {:?}",
data_type.metadata.record_id.base_url
);
}
SnapshotEntry::PropertyType(property_type) => {
tracing::trace!(
"Inserting property type: {:?}",
property_type.metadata.record_id.base_url
);
}
SnapshotEntry::EntityType(entity_type) => {
tracing::trace!(
"Inserting entity type: {:?}",
entity_type.metadata.record_id.base_url
);
}
SnapshotEntry::Entity(entity) => {
tracing::trace!(
"Inserting entity: {:?}",
entity.metadata.record_id.entity_id.entity_uuid
);
}
}
}
tracing::info!("snapshot restore started");

let (snapshot_record_tx, snapshot_record_rx) = restore::channel(chunk_size);

let read_thread = tokio::spawn(
snapshot
.map_err(|report| report.change_context(SnapshotRestoreError::Read))
.forward(
snapshot_record_tx
.sink_map_err(|report| report.change_context(SnapshotRestoreError::Buffer)),
),
);

todo!("https://app.asana.com/0/0/1204216809501006/f")
let client = self
.0
.transaction()
.await
.change_context(SnapshotRestoreError::Write)?;

SnapshotRecordBatch::begin(&client)
.await
.change_context(SnapshotRestoreError::Write)?;

let client = snapshot_record_rx
.map(Ok::<_, Report<SnapshotRestoreError>>)
.try_fold(client, |client, records: SnapshotRecordBatch| async move {
records
.write(&client)
.await
.change_context(SnapshotRestoreError::Write)?;
Ok(client)
})
.await?;

tracing::info!("snapshot reading finished, committing...");

SnapshotRecordBatch::commit(&client)
.await
.change_context(SnapshotRestoreError::Write)?;

client
.commit()
.await
.change_context(SnapshotRestoreError::Write)
.attach_printable("unable to commit snapshot to the store")?;

read_thread
.await
.into_report()
.change_context(SnapshotRestoreError::Read)?
}
}
9 changes: 9 additions & 0 deletions apps/hash-graph/lib/graph/src/snapshot/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod batch;
mod channel;
mod table;

pub use self::{
batch::AccountRowBatch,
channel::{channel, AccountReceiver, AccountSender},
table::AccountRow,
};
73 changes: 73 additions & 0 deletions apps/hash-graph/lib/graph/src/snapshot/account/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use async_trait::async_trait;
use error_stack::{IntoReport, Result, ResultExt};
use tokio_postgres::GenericClient;

use crate::{
snapshot::{account::AccountRow, WriteBatch},
store::{AsClient, InsertionError, PostgresStore},
};

pub enum AccountRowBatch {
Accounts(Vec<AccountRow>),
}

#[async_trait]
impl<C: AsClient> WriteBatch<C> for AccountRowBatch {
async fn begin(postgres_client: &PostgresStore<C>) -> Result<(), InsertionError> {
postgres_client
.as_client()
.client()
.simple_query(
r"
CREATE TEMPORARY TABLE accounts_tmp
(LIKE accounts INCLUDING ALL)
ON COMMIT DROP;
",
)
.await
.into_report()
.change_context(InsertionError)?;

Ok(())
}

async fn write(&self, postgres_client: &PostgresStore<C>) -> Result<(), InsertionError> {
let client = postgres_client.as_client().client();
match self {
Self::Accounts(accounts) => {
let rows = client
.query(
r"
INSERT INTO accounts_tmp
SELECT DISTINCT * FROM UNNEST($1::accounts[])
ON CONFLICT DO NOTHING
RETURNING 1;
",
&[accounts],
)
.await
.into_report()
.change_context(InsertionError)?;
if !rows.is_empty() {
tracing::info!("Read {} accounts", rows.len());
}
}
}
Ok(())
}

async fn commit(postgres_client: &PostgresStore<C>) -> Result<(), InsertionError> {
postgres_client
.as_client()
.client()
.simple_query(
r"
INSERT INTO accounts SELECT * FROM accounts_tmp;
",
)
.await
.into_report()
.change_context(InsertionError)?;
Ok(())
}
}
Loading

0 comments on commit a92b712

Please sign in to comment.