Skip to content

Commit

Permalink
Fixed image building (#977)
Browse files Browse the repository at this point in the history
Replaced cascade delete of dataset entries in graph with more explicit events to allow orphan upstream dependencies where only ID is given
  • Loading branch information
zaychenko-sergei authored Dec 2, 2024
1 parent b71715e commit b1a0a01
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 78 deletions.
5 changes: 3 additions & 2 deletions migrations/postgres/20241125193114_dataset_dependencies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

CREATE TABLE dataset_dependencies
(
upstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE,
downstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE
-- Note: no foreign keys here, as external orphans are possible in the graph
upstream_dataset_id VARCHAR(100) NOT NULL,
downstream_dataset_id VARCHAR(100) NOT NULL
);

CREATE UNIQUE INDEX idx_dataset_dependencies
Expand Down
5 changes: 3 additions & 2 deletions migrations/sqlite/20241125192943_dataset_dependencies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

CREATE TABLE dataset_dependencies
(
upstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE,
downstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE
-- Note: no foreign keys here, as external orphans are possible in the graph
upstream_dataset_id VARCHAR(100) NOT NULL,
downstream_dataset_id VARCHAR(100) NOT NULL
);

CREATE UNIQUE INDEX idx_dataset_dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,10 @@ pub enum DeleteEntryDatasetError {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
pub trait DatasetEntryRemovalListener: Send + Sync {
async fn on_dataset_entry_removed(&self, dataset_id: &DatasetID) -> Result<(), InternalError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use internal_error::InternalError;
use kamu_datasets::*;
use opendatafabric::DatasetID;

use super::InMemoryDatasetEntryRemovalListener;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Default)]
Expand All @@ -33,7 +31,7 @@ pub struct InMemoryDatasetDependencyRepository {

#[component(pub)]
#[interface(dyn DatasetDependencyRepository)]
#[interface(dyn InMemoryDatasetEntryRemovalListener)]
#[interface(dyn DatasetEntryRemovalListener)]
#[scope(Singleton)]
impl InMemoryDatasetDependencyRepository {
pub fn new() -> Self {
Expand Down Expand Up @@ -163,7 +161,7 @@ impl DatasetDependencyRepository for InMemoryDatasetDependencyRepository {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
impl InMemoryDatasetEntryRemovalListener for InMemoryDatasetDependencyRepository {
impl DatasetEntryRemovalListener for InMemoryDatasetDependencyRepository {
async fn on_dataset_entry_removed(&self, dataset_id: &DatasetID) -> Result<(), InternalError> {
let mut guard = self.state.lock().unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ impl State {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct InMemoryDatasetEntryRepository {
listeners: Vec<Arc<dyn InMemoryDatasetEntryRemovalListener>>,
listeners: Vec<Arc<dyn DatasetEntryRemovalListener>>,
state: Arc<Mutex<State>>,
}

#[component(pub)]
#[interface(dyn DatasetEntryRepository)]
#[scope(Singleton)]
impl InMemoryDatasetEntryRepository {
pub fn new(listeners: Vec<Arc<dyn InMemoryDatasetEntryRemovalListener>>) -> Self {
pub fn new(listeners: Vec<Arc<dyn DatasetEntryRemovalListener>>) -> Self {
Self {
listeners,
state: Arc::new(Mutex::new(State::new())),
Expand Down Expand Up @@ -278,10 +278,3 @@ impl DatasetEntryRepository for InMemoryDatasetEntryRepository {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
pub trait InMemoryDatasetEntryRemovalListener: Send + Sync {
async fn on_dataset_entry_removed(&self, dataset_id: &DatasetID) -> Result<(), InternalError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct PostgresDatasetDependencyRepository {

#[component(pub)]
#[interface(dyn DatasetDependencyRepository)]
#[interface(dyn DatasetEntryRemovalListener)]
impl PostgresDatasetDependencyRepository {
pub fn new(transaction: TransactionRef) -> Self {
Self {
Expand Down Expand Up @@ -183,3 +184,28 @@ impl DatasetDependencyRepository for PostgresDatasetDependencyRepository {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
impl DatasetEntryRemovalListener for PostgresDatasetDependencyRepository {
async fn on_dataset_entry_removed(&self, dataset_id: &DatasetID) -> Result<(), InternalError> {
let mut tr = self.transaction.lock().await;

let connection_mut = tr.connection_mut().await?;

let stack_dataset_id = dataset_id.as_did_str().to_stack_string();

sqlx::query!(
r#"
DELETE FROM dataset_dependencies WHERE downstream_dataset_id = $1 OR upstream_dataset_id = $1
"#,
stack_dataset_id.as_str(),
)
.execute(&mut *connection_mut)
.await
.int_err()?;

Ok(())
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use std::collections::HashSet;
use std::sync::Arc;

use database_common::{PaginationOpts, TransactionRef, TransactionRefT};
use dill::{component, interface};
Expand All @@ -19,14 +20,19 @@ use opendatafabric::{AccountID, DatasetID, DatasetName};

pub struct PostgresDatasetEntryRepository {
transaction: TransactionRefT<sqlx::Postgres>,
listeners: Vec<Arc<dyn DatasetEntryRemovalListener>>,
}

#[component(pub)]
#[interface(dyn DatasetEntryRepository)]
impl PostgresDatasetEntryRepository {
pub fn new(transaction: TransactionRef) -> Self {
pub fn new(
transaction: TransactionRef,
listeners: Vec<Arc<dyn DatasetEntryRemovalListener>>,
) -> Self {
Self {
transaction: transaction.into(),
listeners,
}
}
}
Expand Down Expand Up @@ -345,24 +351,33 @@ impl DatasetEntryRepository for PostgresDatasetEntryRepository {
&self,
dataset_id: &DatasetID,
) -> Result<(), DeleteEntryDatasetError> {
let mut tr = self.transaction.lock().await;
{
let mut tr = self.transaction.lock().await;

let connection_mut = tr.connection_mut().await?;
let connection_mut = tr.connection_mut().await?;

let stack_dataset_id = dataset_id.as_did_str().to_stack_string();
let stack_dataset_id = dataset_id.as_did_str().to_stack_string();

let delete_result = sqlx::query!(
r#"
DELETE FROM dataset_entries WHERE dataset_id = $1
"#,
stack_dataset_id.as_str(),
)
.execute(&mut *connection_mut)
.await
.int_err()?;
let delete_result = sqlx::query!(
r#"
DELETE FROM dataset_entries WHERE dataset_id = $1
"#,
stack_dataset_id.as_str(),
)
.execute(&mut *connection_mut)
.await
.int_err()?;

if delete_result.rows_affected() == 0 {
return Err(DatasetEntryNotFoundError::new(dataset_id.clone()).into());
if delete_result.rows_affected() == 0 {
return Err(DatasetEntryNotFoundError::new(dataset_id.clone()).into());
}
}

for listener in &self.listeners {
listener
.on_dataset_entry_removed(dataset_id)
.await
.int_err()?;
}

Ok(())
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct SqliteDatasetDependencyRepository {

#[component(pub)]
#[interface(dyn DatasetDependencyRepository)]
#[interface(dyn DatasetEntryRemovalListener)]
impl SqliteDatasetDependencyRepository {
pub fn new(transaction: TransactionRef) -> Self {
Self {
Expand Down Expand Up @@ -192,3 +193,29 @@ impl DatasetDependencyRepository for SqliteDatasetDependencyRepository {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
impl DatasetEntryRemovalListener for SqliteDatasetDependencyRepository {
async fn on_dataset_entry_removed(&self, dataset_id: &DatasetID) -> Result<(), InternalError> {
let mut tr = self.transaction.lock().await;

let connection_mut = tr.connection_mut().await?;

let stack_dataset_id = dataset_id.as_did_str().to_stack_string();
let stack_dataset_id_as_str = stack_dataset_id.as_str();

sqlx::query!(
r#"
DELETE FROM dataset_dependencies WHERE downstream_dataset_id = $1 OR upstream_dataset_id = $1
"#,
stack_dataset_id_as_str,
)
.execute(&mut *connection_mut)
.await
.int_err()?;

Ok(())
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading

0 comments on commit b1a0a01

Please sign in to comment.