diff --git a/src/adapter/graphql/tests/tests/mod.rs b/src/adapter/graphql/tests/tests/mod.rs index 6b13cb57f..7aeca3e1d 100644 --- a/src/adapter/graphql/tests/tests/mod.rs +++ b/src/adapter/graphql/tests/tests/mod.rs @@ -18,6 +18,7 @@ mod test_gql_dataset_flow_runs; mod test_gql_datasets; mod test_gql_metadata; mod test_gql_metadata_chain; +mod test_gql_remote_statuses; mod test_gql_search; mod test_guards; mod test_update_schema; diff --git a/src/adapter/graphql/tests/tests/test_gql_metadata.rs b/src/adapter/graphql/tests/tests/test_gql_metadata.rs index 02b1a538d..f6aefedb4 100644 --- a/src/adapter/graphql/tests/tests/test_gql_metadata.rs +++ b/src/adapter/graphql/tests/tests/test_gql_metadata.rs @@ -7,22 +7,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use async_graphql::*; use database_common::NoOpDatabasePlugin; use dill::*; use indoc::indoc; -use internal_error::InternalError; use kamu::testing::MetadataFactory; use kamu::*; -use kamu_core::utils::metadata_chain_comparator::CompareChainsResult; use kamu_core::*; use messaging_outbox::DummyOutboxImpl; use opendatafabric::*; -use tempfile::TempDir; use time_source::SystemTimeSourceDefault; -use url::Url; use crate::utils::authentication_catalogs; @@ -175,196 +169,4 @@ async fn test_current_push_sources() { ); } -#[test_log::test(tokio::test)] -async fn test_push_statuses() { - let harness = PushStatusesTestHarness::new(); - - // Init dataset with no sources - let (_, catalog_authorized) = authentication_catalogs(&harness.base_catalog).await; - - let create_dataset_from_snapshot = catalog_authorized - .get_one::() - .unwrap(); - let create_result = create_dataset_from_snapshot - .execute( - MetadataFactory::dataset_snapshot() - .kind(DatasetKind::Root) - .name("foo") - .build(), - Default::default(), - ) - .await - .unwrap(); - - let request_code = indoc!( - r#" - { - datasets { - byId (datasetId: "") { - metadata { - pushSyncStatuses { - statuses { - remote, - result { - ... on CompareChainsResultStatus { message } - ... on CompareChainsResultError { reason { message } } - } - } - } - } - } - } - } - "# - ) - .replace("", &create_result.dataset_handle.id.to_string()); - - let schema = kamu_adapter_graphql::schema_quiet(); - let res = schema - .execute(Request::new(request_code.clone()).data(catalog_authorized.clone())) - .await; - - assert!(res.is_ok(), "{res:?}"); - - let expected = value!({ - "datasets": { - "byId": { - "metadata": { - "pushSyncStatuses": { - "statuses": [ - { - "remote": "https://example.com/ahead", - "result": { - "message": "AHEAD" - } - }, - { - "remote": "https://example.com/behind", - "result": { - "message": "BEHIND" - } - }, - { - "remote": "https://example.com/diverged", - "result": { - "message": "DIVERGED" - } - }, - { - "remote": "https://example.com/equal", - "result": { - "message": "EQUAL" - } - }, - { - "remote": "https://example.com/not-found", - "result": { - "reason": { - "message": "Remote dataset not found" - } - } - } - ] - } - } - } - } - }); - assert_eq!(res.data, expected); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -struct PushStatusesTestHarness { - base_catalog: Catalog, - _tempdir: TempDir, -} - -impl PushStatusesTestHarness { - fn new() -> Self { - let tempdir = tempfile::tempdir().unwrap(); - let datasets_dir = tempdir.path().join("datasets"); - std::fs::create_dir(&datasets_dir).unwrap(); - - let base_catalog = { - let mut b = CatalogBuilder::new(); - - b.add_value(RunInfoDir::new(tempdir.path().join("run"))) - .add::() - .add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir)) - .bind::() - .bind::() - .add::() - .add::() - .add::() - .add_value(TenancyConfig::SingleTenant) - .add_value(FakeRemoteStatusService {}) - .bind::(); - - NoOpDatabasePlugin::init_database_components(&mut b); - - b.build() - }; - - Self { - base_catalog, - _tempdir: tempdir, - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -pub struct FakeRemoteStatusService {} - -#[async_trait::async_trait] -impl RemoteStatusService for FakeRemoteStatusService { - async fn check_remotes_status( - &self, - _dataset_handle: &DatasetHandle, - ) -> std::result::Result { - Ok(DatasetPushStatuses { - statuses: vec![ - PushStatus { - remote: DatasetRefRemote::Url(Arc::new( - Url::parse("https://example.com/ahead").unwrap(), - )), - check_result: Ok(CompareChainsResult::LhsBehind { - rhs_ahead_blocks: vec![], - }), - }, - PushStatus { - remote: DatasetRefRemote::Url(Arc::new( - Url::parse("https://example.com/behind").unwrap(), - )), - check_result: Ok(CompareChainsResult::LhsAhead { - lhs_ahead_blocks: vec![], - }), - }, - PushStatus { - remote: DatasetRefRemote::Url(Arc::new( - Url::parse("https://example.com/diverged").unwrap(), - )), - check_result: Ok(CompareChainsResult::Divergence { - uncommon_blocks_in_lhs: 0, - uncommon_blocks_in_rhs: 0, - }), - }, - PushStatus { - remote: DatasetRefRemote::Url(Arc::new( - Url::parse("https://example.com/equal").unwrap(), - )), - check_result: Ok(CompareChainsResult::Equal), - }, - PushStatus { - remote: DatasetRefRemote::Url(Arc::new( - Url::parse("https://example.com/not-found").unwrap(), - )), - check_result: Err(StatusCheckError::RemoteDatasetNotFound), - }, - ], - }) - } -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/tests/tests/test_gql_remote_statuses.rs b/src/adapter/graphql/tests/tests/test_gql_remote_statuses.rs new file mode 100644 index 000000000..f9b2798ae --- /dev/null +++ b/src/adapter/graphql/tests/tests/test_gql_remote_statuses.rs @@ -0,0 +1,223 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use async_graphql::*; +use database_common::NoOpDatabasePlugin; +use dill::*; +use indoc::indoc; +use internal_error::InternalError; +use kamu::testing::MetadataFactory; +use kamu::*; +use kamu_core::utils::metadata_chain_comparator::CompareChainsResult; +use kamu_core::*; +use messaging_outbox::DummyOutboxImpl; +use opendatafabric::*; +use tempfile::TempDir; +use time_source::SystemTimeSourceDefault; +use url::Url; + +use crate::utils::authentication_catalogs; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_remote_push_statuses() { + let harness = PushStatusesTestHarness::new(); + + // Init dataset with no sources + let (_, catalog_authorized) = authentication_catalogs(&harness.base_catalog).await; + + let create_dataset_from_snapshot = catalog_authorized + .get_one::() + .unwrap(); + let create_result = create_dataset_from_snapshot + .execute( + MetadataFactory::dataset_snapshot() + .kind(DatasetKind::Root) + .name("foo") + .build(), + Default::default(), + ) + .await + .unwrap(); + + let request_code = indoc!( + r#" + { + datasets { + byId (datasetId: "") { + metadata { + pushSyncStatuses { + statuses { + remote, + result { + ... on CompareChainsResultStatus { message } + ... on CompareChainsResultError { reason { message } } + } + } + } + } + } + } + } + "# + ) + .replace("", &create_result.dataset_handle.id.to_string()); + + let schema = kamu_adapter_graphql::schema_quiet(); + let res = schema + .execute(Request::new(request_code.clone()).data(catalog_authorized.clone())) + .await; + + assert!(res.is_ok(), "{res:?}"); + + let expected = value!({ + "datasets": { + "byId": { + "metadata": { + "pushSyncStatuses": { + "statuses": [ + { + "remote": "https://example.com/ahead", + "result": { + "message": "AHEAD" + } + }, + { + "remote": "https://example.com/behind", + "result": { + "message": "BEHIND" + } + }, + { + "remote": "https://example.com/diverged", + "result": { + "message": "DIVERGED" + } + }, + { + "remote": "https://example.com/equal", + "result": { + "message": "EQUAL" + } + }, + { + "remote": "https://example.com/not-found", + "result": { + "reason": { + "message": "Remote dataset not found" + } + } + } + ] + } + } + } + } + }); + assert_eq!(res.data, expected); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct PushStatusesTestHarness { + base_catalog: Catalog, + _tempdir: TempDir, +} + +impl PushStatusesTestHarness { + fn new() -> Self { + let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + + let base_catalog = { + let mut b = CatalogBuilder::new(); + + b.add_value(RunInfoDir::new(tempdir.path().join("run"))) + .add::() + .add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir)) + .bind::() + .bind::() + .add::() + .add::() + .add::() + .add_value(TenancyConfig::SingleTenant) + .add_value(FakeRemoteStatusService {}) + .bind::(); + + NoOpDatabasePlugin::init_database_components(&mut b); + + b.build() + }; + + Self { + base_catalog, + _tempdir: tempdir, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct FakeRemoteStatusService {} + +#[async_trait::async_trait] +impl RemoteStatusService for FakeRemoteStatusService { + async fn check_remotes_status( + &self, + _dataset_handle: &DatasetHandle, + ) -> std::result::Result { + Ok(DatasetPushStatuses { + statuses: vec![ + PushStatus { + remote: DatasetRefRemote::Url(Arc::new( + Url::parse("https://example.com/ahead").unwrap(), + )), + check_result: Ok(CompareChainsResult::LhsBehind { + rhs_ahead_blocks: vec![], + }), + }, + PushStatus { + remote: DatasetRefRemote::Url(Arc::new( + Url::parse("https://example.com/behind").unwrap(), + )), + check_result: Ok(CompareChainsResult::LhsAhead { + lhs_ahead_blocks: vec![], + }), + }, + PushStatus { + remote: DatasetRefRemote::Url(Arc::new( + Url::parse("https://example.com/diverged").unwrap(), + )), + check_result: Ok(CompareChainsResult::Divergence { + uncommon_blocks_in_lhs: 0, + uncommon_blocks_in_rhs: 0, + }), + }, + PushStatus { + remote: DatasetRefRemote::Url(Arc::new( + Url::parse("https://example.com/equal").unwrap(), + )), + check_result: Ok(CompareChainsResult::Equal), + }, + PushStatus { + remote: DatasetRefRemote::Url(Arc::new( + Url::parse("https://example.com/not-found").unwrap(), + )), + check_result: Err(StatusCheckError::RemoteDatasetNotFound), + }, + ], + }) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_delete_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_delete_command.rs index 941f0f24c..58ff4a35c 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/test_delete_command.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/test_delete_command.rs @@ -173,6 +173,8 @@ pub async fn test_delete_dataset_all(kamu: KamuCliPuppet) { } } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub async fn test_delete_warning(mut kamu_node_api_client: KamuApiServerClient) { let kamu: KamuCliPuppet = KamuCliPuppet::new_workspace_tmp(true).await; diff --git a/src/infra/core/tests/tests/test_remote_status_service.rs b/src/infra/core/tests/tests/test_remote_status_service.rs index ceba4e207..b02891975 100644 --- a/src/infra/core/tests/tests/test_remote_status_service.rs +++ b/src/infra/core/tests/tests/test_remote_status_service.rs @@ -46,7 +46,7 @@ async fn test_check_remotes_status_equal() { let remote = harness.push_dataset(&local_ds.dataset_handle).await; let result = harness - .service + .remote_status_service .check_remotes_status(&local_ds.dataset_handle) .await .unwrap(); @@ -58,6 +58,8 @@ async fn test_check_remotes_status_equal() { assert_matches!(status.check_result, Ok(CompareChainsResult::Equal)); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[tokio::test] async fn test_check_remotes_status_remote_behind() { let harness = RemoteStatusTestHarness::new(); @@ -75,7 +77,7 @@ async fn test_check_remotes_status_remote_behind() { .unwrap(); let result = harness - .service + .remote_status_service .check_remotes_status(&local_ds.dataset_handle) .await .unwrap(); @@ -90,6 +92,8 @@ async fn test_check_remotes_status_remote_behind() { ); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[tokio::test] async fn test_check_remotes_status_remote_ahead() { let harness = RemoteStatusTestHarness::new(); @@ -119,7 +123,7 @@ async fn test_check_remotes_status_remote_ahead() { .unwrap(); let result = harness - .service + .remote_status_service .check_remotes_status(&local_ds.dataset_handle) .await .unwrap(); @@ -134,6 +138,8 @@ async fn test_check_remotes_status_remote_ahead() { ); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[tokio::test] async fn test_check_remotes_status_remote_diverge() { let harness = RemoteStatusTestHarness::new(); @@ -180,7 +186,7 @@ async fn test_check_remotes_status_remote_diverge() { .unwrap(); let result = harness - .service + .remote_status_service .check_remotes_status(&local_ds.dataset_handle) .await .unwrap(); @@ -195,6 +201,8 @@ async fn test_check_remotes_status_remote_diverge() { ); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[tokio::test] async fn test_check_remotes_status_not_found() { let harness = RemoteStatusTestHarness::new(); @@ -205,7 +213,7 @@ async fn test_check_remotes_status_not_found() { fs::remove_dir_all(harness.remote_repos_dir.join("repo1")).unwrap(); let result = harness - .service + .remote_status_service .check_remotes_status(&local_ds.dataset_handle) .await .unwrap(); @@ -220,9 +228,12 @@ async fn test_check_remotes_status_not_found() { ); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[oop::extend(BaseRepoHarness, base_repo_harness)] struct RemoteStatusTestHarness { base_repo_harness: BaseRepoHarness, - service: Arc, + remote_status_service: Arc, sync_service: Arc, sync_builder: Arc, remote_aliases_reg: Arc, @@ -251,7 +262,7 @@ impl RemoteStatusTestHarness { Self { base_repo_harness, - service: catalog.get_one().unwrap(), + remote_status_service: catalog.get_one().unwrap(), sync_service: catalog.get_one().unwrap(), sync_builder: catalog.get_one().unwrap(), remote_aliases_reg: catalog.get_one().unwrap(), @@ -266,8 +277,7 @@ impl RemoteStatusTestHarness { MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build()) .build_typed(); - self.base_repo_harness - .dataset_repo_writer() + self.dataset_repo_writer() .create_dataset(&local_alias, seed_block) .await .unwrap() @@ -313,3 +323,5 @@ impl RemoteStatusTestHarness { .build() } } + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////