diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index 024f287214c5..44bc3c7aa232 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -53,14 +53,42 @@ def __init__(self, config: dict, config_args: dict) -> None: self.profilerSample = self.config_args.get("profilerSample", 100) # pylint: enable=invalid-name - def build(self) -> dict: """build profiler config""" del self.config["source"]["sourceConfig"]["config"] self.config["source"]["sourceConfig"] = { "config": { "type": "Profiler", - "generateSampleData": True, + "profileSample": self.profilerSample, + } + } + + if self.config_args.get("includes"): + self.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "includes": self.config_args.get("includes") + } + + self.config["processor"] = {"type": "orm-profiler", "config": {}} + return self.config + + +class AutoClassificationConfigBuilder(BaseBuilder): + """Builder class for the AutoClassification config""" + + # pylint: disable=invalid-name + def __init__(self, config: dict, config_args: dict) -> None: + super().__init__(config, config_args) + self.profilerSample = self.config_args.get("profilerSample", 100) + + # pylint: enable=invalid-name + def build(self) -> dict: + """build profiler config""" + del self.config["source"]["sourceConfig"]["config"] + self.config["source"]["sourceConfig"] = { + "config": { + "type": "AutoClassification", + "storeSampleData": True, + "enableAutoClassification": False, "profileSample": self.profilerSample, } } @@ -186,6 +214,7 @@ def builder_factory(builder, config: dict, config_args: dict): E2EType.INGEST_DASHBOARD_FILTER_MIX.value: DashboardMixConfigBuilder, E2EType.INGEST_DASHBOARD_NOT_INCLUDING.value: DashboardConfigBuilder, E2EType.PROFILER_PROCESSOR.value: ProfilerProcessorConfigBuilder, + E2EType.AUTO_CLASSIFICATION.value: AutoClassificationConfigBuilder, } return builder_classes.get(builder, BaseBuilder)(config, config_args) diff --git a/ingestion/tests/cli_e2e/base/e2e_types.py b/ingestion/tests/cli_e2e/base/e2e_types.py index 442c5c27b884..00d4b9352f52 100644 --- a/ingestion/tests/cli_e2e/base/e2e_types.py +++ b/ingestion/tests/cli_e2e/base/e2e_types.py @@ -24,6 +24,7 @@ class E2EType(Enum): INGEST = "ingest" PROFILER = "profiler" PROFILER_PROCESSOR = "profiler-processor" + AUTO_CLASSIFICATION = "auto-classification" DATA_QUALITY = "test" INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema" INGEST_DB_FILTER_TABLE = "ingest-db-filter-table" diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index d746a2265adc..838d40536d68 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -53,7 +53,7 @@ def test_vanilla_ingestion(self) -> None: @pytest.mark.order(2) def test_create_table_with_profiler(self) -> None: - """2. create a new table + deploy ingestion with views, sample data, and profiler. + """2. create a new table + deploy ingestion with views, and profiler. We will perform the following steps: 1. delete table in case it exists @@ -76,6 +76,25 @@ def test_create_table_with_profiler(self) -> None: self.system_profile_assertions() @pytest.mark.order(3) + def test_auto_classify_data(self) -> None: + """2. Run the auto classification workflow and validate the sample data + + We will perform the following steps: + 1. build config file for ingest + 2. run ingest with new tables `self.run_command()` defaults to `ingestion` + 3. build config file for auto classification + 4. run auto classification + """ + self.build_config_file() + self.run_command() + self.build_config_file( + E2EType.AUTO_CLASSIFICATION, {"includes": self.get_includes_schemas()} + ) + result = self.run_command("classify") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_auto_classification_sample_data(source_status, sink_status) + + @pytest.mark.order(4) def test_delete_table_is_marked_as_deleted(self) -> None: """3. delete the new table + deploy marking tables as deleted @@ -93,7 +112,7 @@ def test_delete_table_is_marked_as_deleted(self) -> None: source_status, sink_status ) - @pytest.mark.order(4) + @pytest.mark.order(5) def test_schema_filter_includes(self) -> None: """4. vanilla ingestion + include schema filter pattern @@ -110,7 +129,7 @@ def test_schema_filter_includes(self) -> None: sink_status, source_status = self.retrieve_statuses(result) self.assert_filtered_schemas_includes(source_status, sink_status) - @pytest.mark.order(5) + @pytest.mark.order(6) def test_schema_filter_excludes(self) -> None: """5. vanilla ingestion + exclude schema filter pattern @@ -126,7 +145,7 @@ def test_schema_filter_excludes(self) -> None: sink_status, source_status = self.retrieve_statuses(result) self.assert_filtered_schemas_excludes(source_status, sink_status) - @pytest.mark.order(6) + @pytest.mark.order(7) def test_table_filter_includes(self) -> None: """6. Vanilla ingestion + include table filter pattern @@ -142,7 +161,7 @@ def test_table_filter_includes(self) -> None: sink_status, source_status = self.retrieve_statuses(result) self.assert_filtered_tables_includes(source_status, sink_status) - @pytest.mark.order(7) + @pytest.mark.order(8) def test_table_filter_excludes(self) -> None: """7. Vanilla ingestion + exclude table filter pattern @@ -157,7 +176,7 @@ def test_table_filter_excludes(self) -> None: sink_status, source_status = self.retrieve_statuses(result) self.assert_filtered_tables_excludes(source_status, sink_status) - @pytest.mark.order(8) + @pytest.mark.order(9) def test_table_filter_mix(self) -> None: """8. Vanilla ingestion + include schema filter pattern + exclude table filter pattern @@ -179,21 +198,21 @@ def test_table_filter_mix(self) -> None: sink_status, source_status = self.retrieve_statuses(result) self.assert_filtered_mix(source_status, sink_status) - @pytest.mark.order(9) + @pytest.mark.order(10) def test_usage(self) -> None: """9. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage This test will need to be implemented on the database specific test classes """ - @pytest.mark.order(10) + @pytest.mark.order(11) def test_lineage(self) -> None: """10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage This test will need to be implemented on the database specific test classes """ - @pytest.mark.order(11) + @pytest.mark.order(12) def test_profiler_with_time_partition(self) -> None: """11. Test time partitioning for the profiler""" time_partition = self.get_profiler_time_partition() @@ -215,7 +234,7 @@ def test_profiler_with_time_partition(self) -> None: sink_status, ) - @pytest.mark.order(12) + @pytest.mark.order(13) def test_data_quality(self) -> None: """12. Test data quality for the connector""" if self.get_data_quality_table() is None: @@ -312,6 +331,12 @@ def assert_for_table_with_profiler( ): raise NotImplementedError() + @abstractmethod + def assert_auto_classification_sample_data( + self, source_status: Status, sink_status: Status + ): + raise NotImplementedError() + @abstractmethod def assert_for_table_with_profiler_time_partition( self, source_status: Status, sink_status: Status diff --git a/ingestion/tests/cli_e2e/common/test_cli_db.py b/ingestion/tests/cli_e2e/common/test_cli_db.py index 17fa666477d2..7ed847f64fdd 100644 --- a/ingestion/tests/cli_e2e/common/test_cli_db.py +++ b/ingestion/tests/cli_e2e/common/test_cli_db.py @@ -97,19 +97,26 @@ def assert_for_table_with_profiler( (len(sink_status.records) + len(sink_status.updated_records)), self.expected_profiled_tables(), ) - sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData - self.assertEqual(len(sample_data.rows), self.inserted_rows_count()) # Since we removed view lineage from metadata workflow as part # of https://github.com/open-metadata/OpenMetadata/pull/18558 # we need to introduce Lineage E2E base and add view lineage check there. + def assert_auto_classification_sample_data( + self, source_status: Status, sink_status: Status + ): + self.assertEqual(len(source_status.failures), 0) + self.assertGreaterEqual( + (len(source_status.records) + len(source_status.updated_records)), + self.expected_profiled_tables(), + ) + sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData + self.assertEqual(len(sample_data.rows), self.inserted_rows_count()) + def assert_for_table_with_profiler_time_partition( self, source_status: Status, sink_status: Status ): self.assertEqual(len(source_status.failures), 0) self.assertEqual(len(sink_status.failures), 0) - sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData - self.assertLessEqual(len(sample_data.rows), self.inserted_rows_count()) profile = self.retrieve_profile(self.fqn_created_table()) expected_profiler_time_partition_results = ( self.get_profiler_time_partition_results()