diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 08ebdec500d3..a2fb2cfae2ee 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -38,8 +38,8 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX -from metadata.utils.importer import import_source_class from metadata.utils.logger import test_suite_logger +from metadata.utils.service_spec.service_spec import import_source_class logger = test_suite_logger() diff --git a/ingestion/src/metadata/ingestion/source/api/rest/service_spec.py b/ingestion/src/metadata/ingestion/source/api/rest/service_spec.py new file mode 100644 index 000000000000..63f4439254d3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.api.rest.metadata import RestSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=RestSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/service_spec.py new file mode 100644 index 000000000000..005964bb2e0f --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/service_spec.py @@ -0,0 +1,6 @@ +from metadata.ingestion.source.dashboard.domodashboard.metadata import ( + DomodashboardSource, +) +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=DomodashboardSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/service_spec.py new file mode 100644 index 000000000000..f9e645eae315 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.lightdash.metadata import LightdashSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=LightdashSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/service_spec.py new file mode 100644 index 000000000000..660d08142402 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.looker.metadata import LookerSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=LookerSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase/service_spec.py new file mode 100644 index 000000000000..830fdd8eb208 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=MetabaseSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/mode/service_spec.py new file mode 100644 index 000000000000..fb39c9435e75 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.mode.metadata import ModeSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=ModeSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/service_spec.py new file mode 100644 index 000000000000..f2ea7e03df8a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.mstr.metadata import MstrSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=MstrSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/service_spec.py new file mode 100644 index 000000000000..dd6c2f4053be --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=PowerbiSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/service_spec.py new file mode 100644 index 000000000000..e59a7c3c4e24 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.qlikcloud.metadata import QlikcloudSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=QlikcloudSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/service_spec.py new file mode 100644 index 000000000000..bedf3c5ca269 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=QliksenseSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/service_spec.py new file mode 100644 index 000000000000..c8549572623e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=QuicksightSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/service_spec.py new file mode 100644 index 000000000000..c5557d7732b0 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.redash.metadata import RedashSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=RedashSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/service_spec.py new file mode 100644 index 000000000000..ddec090a90be --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.sigma.metadata import SigmaSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=SigmaSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/service_spec.py new file mode 100644 index 000000000000..c01d75ba1707 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=SupersetSource) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/service_spec.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/service_spec.py new file mode 100644 index 000000000000..5743360ec108 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.dashboard.tableau.metadata import TableauSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=TableauSource) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/service_spec.py b/ingestion/src/metadata/ingestion/source/database/athena/service_spec.py new file mode 100644 index 000000000000..3a02ba0f8cb8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/athena/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.athena.lineage import AthenaLineageSource +from metadata.ingestion.source.database.athena.metadata import AthenaSource +from metadata.ingestion.source.database.athena.usage import AthenaUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=AthenaSource, + lineage_source_class=AthenaLineageSource, + usage_source_class=AthenaUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/service_spec.py b/ingestion/src/metadata/ingestion/source/database/azuresql/service_spec.py new file mode 100644 index 000000000000..14072b8618a8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.azuresql.lineage import AzuresqlLineageSource +from metadata.ingestion.source.database.azuresql.metadata import AzuresqlSource +from metadata.ingestion.source.database.azuresql.usage import AzuresqlUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=AzuresqlSource, + lineage_source_class=AzuresqlLineageSource, + usage_source_class=AzuresqlUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/__init__.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py new file mode 100644 index 000000000000..cceeafe3ea6e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py @@ -0,0 +1,29 @@ +from typing import List, Type + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.ingestion.source.database.bigquery.profiler.system import ( + BigQuerySystemMetricsComputer, +) +from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( + BigQueryProfilerInterface, +) +from metadata.profiler.metrics.system.system import System +from metadata.profiler.processor.runner import QueryRunner + + +class BigQueryProfiler(BigQueryProfilerInterface): + def _compute_system_metrics( + self, + metrics: Type[System], + runner: QueryRunner, + *args, + **kwargs, + ) -> List[SystemProfile]: + return self.system_metrics_computer.get_system_metrics( + runner.table, self.service_connection_config + ) + + def initialize_system_metrics_computer( + self, **kwargs + ) -> BigQuerySystemMetricsComputer: + return BigQuerySystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py new file mode 100644 index 000000000000..7aa6d787be82 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py @@ -0,0 +1,161 @@ +from typing import List + +from pydantic import TypeAdapter +from sqlalchemy.orm import DeclarativeMeta + +from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) +from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult +from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations +from metadata.profiler.metrics.system.system import ( + CacheProvider, + EmptySystemMetricsSource, + SQASessionProvider, + SystemMetricsComputer, +) +from metadata.utils.logger import profiler_logger +from metadata.utils.time_utils import datetime_to_timestamp + +logger = profiler_logger() + + +class BigQuerySystemMetricsSource( + SQASessionProvider, EmptySystemMetricsSource, CacheProvider +): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_kwargs( + self, + table: DeclarativeMeta, + service_connection: BigQueryConnection, + *args, + **kwargs, + ): + return { + "table": table.__table__.name, + "dataset_id": table.__table_args__["schema"], + "project_id": super().get_session().get_bind().url.host, + "usage_location": service_connection.usageLocation, + } + + def get_deletes( + self, table: str, project_id: str, usage_location: str, dataset_id: str + ) -> List[SystemProfile]: + return self.get_system_profile( + project_id, + dataset_id, + table, + list( + self.get_queries_by_operation( + usage_location, + project_id, + dataset_id, + [ + DatabaseDMLOperations.DELETE, + ], + ) + ), + "deleted_row_count", + DmlOperationType.DELETE, + ) + + def get_updates( + self, table: str, project_id: str, usage_location: str, dataset_id: str + ) -> List[SystemProfile]: + return self.get_system_profile( + project_id, + dataset_id, + table, + self.get_queries_by_operation( + usage_location, + project_id, + dataset_id, + [ + DatabaseDMLOperations.UPDATE, + DatabaseDMLOperations.MERGE, + ], + ), + "updated_row_count", + DmlOperationType.UPDATE, + ) + + def get_inserts( + self, table: str, project_id: str, usage_location: str, dataset_id: str + ) -> List[SystemProfile]: + return self.get_system_profile( + project_id, + dataset_id, + table, + self.get_queries_by_operation( + usage_location, + project_id, + dataset_id, + [ + DatabaseDMLOperations.INSERT, + DatabaseDMLOperations.MERGE, + ], + ), + "inserted_row_count", + DmlOperationType.INSERT, + ) + + def get_queries_by_operation( + self, + usage_location: str, + project_id: str, + dataset_id: str, + operations: List[DatabaseDMLOperations], + ) -> List[BigQueryQueryResult]: + ops = {op.value for op in operations} + yield from ( + query + for query in self.get_queries(usage_location, project_id, dataset_id) + if query.statement_type in ops + ) + + def get_queries( + self, usage_location: str, project_id: str, dataset_id: str + ) -> List[BigQueryQueryResult]: + return self.get_or_update_cache( + f"{project_id}.{dataset_id}", + BigQueryQueryResult.get_for_table, + session=super().get_session(), + usage_location=usage_location, + project_id=project_id, + dataset_id=dataset_id, + ) + + @staticmethod + def get_system_profile( + project_id: str, + dataset_id: str, + table: str, + query_results: List[BigQueryQueryResult], + rows_affected_field: str, + operation: DmlOperationType, + ) -> List[SystemProfile]: + if not BigQueryQueryResult.model_fields.get(rows_affected_field): + raise ValueError( + f"rows_affected_field [{rows_affected_field}] is not a valid field in BigQueryQueryResult." + ) + return TypeAdapter(List[SystemProfile]).validate_python( + [ + { + "timestamp": datetime_to_timestamp(q.start_time, milliseconds=True), + "operation": operation, + "rowsAffected": getattr(q, rows_affected_field), + } + for q in query_results + if getattr(q, rows_affected_field) > 0 + and q.project_id == project_id + and q.dataset_id == dataset_id + and q.table_name == table + ] + ) + + +class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource): + pass diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index 07a50b538c89..8886c9b4afac 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -13,6 +13,14 @@ """ import textwrap +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, TypeAdapter +from sqlalchemy import text +from sqlalchemy.orm import Session + +from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations BIGQUERY_STATEMENT = textwrap.dedent( """ @@ -172,3 +180,63 @@ AND resource.labels.dataset_id = "{dataset}" AND timestamp >= "{start_date}" """ + + +class BigQueryQueryResult(BaseModel): + project_id: str + dataset_id: str + table_name: str + inserted_row_count: Optional[int] = None + deleted_row_count: Optional[int] = None + updated_row_count: Optional[int] = None + start_time: datetime + statement_type: str + + @staticmethod + def get_for_table( + session: Session, + usage_location: str, + dataset_id: str, + project_id: str, + ): + rows = session.execute( + text( + JOBS.format( + usage_location=usage_location, + dataset_id=dataset_id, + project_id=project_id, + insert=DatabaseDMLOperations.INSERT.value, + update=DatabaseDMLOperations.UPDATE.value, + delete=DatabaseDMLOperations.DELETE.value, + merge=DatabaseDMLOperations.MERGE.value, + ) + ) + ) + + return TypeAdapter(List[BigQueryQueryResult]).validate_python(map(dict, rows)) + + +JOBS = """ + SELECT + statement_type, + start_time, + destination_table.project_id as project_id, + destination_table.dataset_id as dataset_id, + destination_table.table_id as table_name, + dml_statistics.inserted_row_count as inserted_row_count, + dml_statistics.deleted_row_count as deleted_row_count, + dml_statistics.updated_row_count as updated_row_count + FROM + `region-{usage_location}`.INFORMATION_SCHEMA.JOBS + WHERE + DATE(creation_time) >= CURRENT_DATE() - 1 AND + destination_table.dataset_id = '{dataset_id}' AND + destination_table.project_id = '{project_id}' AND + statement_type IN ( + '{insert}', + '{update}', + '{delete}', + '{merge}' + ) + ORDER BY creation_time DESC; +""" diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py b/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py new file mode 100644 index 000000000000..bf97171d0982 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py @@ -0,0 +1,14 @@ +from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource +from metadata.ingestion.source.database.bigquery.metadata import BigquerySource +from metadata.ingestion.source.database.bigquery.profiler.profiler import ( + BigQueryProfiler, +) +from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=BigquerySource, + lineage_source_class=BigqueryLineageSource, + usage_source_class=BigqueryUsageSource, + profiler_class=BigQueryProfiler, +) diff --git a/ingestion/src/metadata/ingestion/source/database/bigtable/service_spec.py b/ingestion/src/metadata/ingestion/source/database/bigtable/service_spec.py new file mode 100644 index 000000000000..08eb68c1b7d0 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/bigtable/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.bigtable.metadata import BigtableSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=BigtableSource) diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/service_spec.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/service_spec.py new file mode 100644 index 000000000000..43d14129bf17 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/service_spec.py @@ -0,0 +1,12 @@ +from metadata.ingestion.source.database.clickhouse.lineage import ( + ClickhouseLineageSource, +) +from metadata.ingestion.source.database.clickhouse.metadata import ClickhouseSource +from metadata.ingestion.source.database.clickhouse.usage import ClickhouseUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=ClickhouseSource, + lineage_source_class=ClickhouseLineageSource, + usage_source_class=ClickhouseUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/couchbase/service_spec.py b/ingestion/src/metadata/ingestion/source/database/couchbase/service_spec.py new file mode 100644 index 000000000000..8a396949b58d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/couchbase/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.couchbase.metadata import CouchbaseSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=CouchbaseSource) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/service_spec.py b/ingestion/src/metadata/ingestion/source/database/databricks/service_spec.py new file mode 100644 index 000000000000..3bf1978a8a99 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/databricks/service_spec.py @@ -0,0 +1,16 @@ +from metadata.ingestion.source.database.databricks.lineage import ( + DatabricksLineageSource, +) +from metadata.ingestion.source.database.databricks.metadata import DatabricksSource +from metadata.ingestion.source.database.databricks.usage import DatabricksUsageSource +from metadata.profiler.interface.sqlalchemy.databricks.profiler_interface import ( + DatabricksProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=DatabricksSource, + lineage_source_class=DatabricksLineageSource, + usage_source_class=DatabricksUsageSource, + profiler_class=DatabricksProfilerInterface, +) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/service_spec.py b/ingestion/src/metadata/ingestion/source/database/datalake/service_spec.py new file mode 100644 index 000000000000..bbd36b6f312c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/datalake/service_spec.py @@ -0,0 +1,9 @@ +from metadata.ingestion.source.database.datalake.metadata import DatalakeSource +from metadata.profiler.interface.pandas.profiler_interface import ( + PandasProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=DatalakeSource, profiler_class=PandasProfilerInterface +) diff --git a/ingestion/src/metadata/ingestion/source/database/db2/service_spec.py b/ingestion/src/metadata/ingestion/source/database/db2/service_spec.py new file mode 100644 index 000000000000..e5ec7fdd3ed7 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/db2/service_spec.py @@ -0,0 +1,9 @@ +from metadata.ingestion.source.database.db2.metadata import Db2Source +from metadata.profiler.interface.sqlalchemy.db2.profiler_interface import ( + DB2ProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=Db2Source, profiler_class=DB2ProfilerInterface +) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/service_spec.py b/ingestion/src/metadata/ingestion/source/database/dbt/service_spec.py new file mode 100644 index 000000000000..40ae953002b1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/dbt/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.dbt.metadata import DbtSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DbtSource) diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/service_spec.py b/ingestion/src/metadata/ingestion/source/database/deltalake/service_spec.py new file mode 100644 index 000000000000..83eaa31c628b --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.deltalake.metadata import DeltalakeSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DeltalakeSource) diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/service_spec.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/service_spec.py new file mode 100644 index 000000000000..4f10d286a381 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.domodatabase.metadata import DomodatabaseSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DomodatabaseSource) diff --git a/ingestion/src/metadata/ingestion/source/database/doris/service_spec.py b/ingestion/src/metadata/ingestion/source/database/doris/service_spec.py new file mode 100644 index 000000000000..84937aca34ea --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/doris/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.doris.metadata import DorisSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DorisSource) diff --git a/ingestion/src/metadata/ingestion/source/database/druid/service_spec.py b/ingestion/src/metadata/ingestion/source/database/druid/service_spec.py new file mode 100644 index 000000000000..e83fe9a41a07 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/druid/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.druid.metadata import DruidSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DruidSource) diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb/service_spec.py b/ingestion/src/metadata/ingestion/source/database/dynamodb/service_spec.py new file mode 100644 index 000000000000..5c5555707dae --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb/service_spec.py @@ -0,0 +1,7 @@ +from metadata.ingestion.source.database.dynamodb.metadata import DynamodbSource +from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=DynamodbSource, profiler_class=NoSQLProfilerInterface +) diff --git a/ingestion/src/metadata/ingestion/source/database/glue/service_spec.py b/ingestion/src/metadata/ingestion/source/database/glue/service_spec.py new file mode 100644 index 000000000000..79e029904ed8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/glue/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.glue.metadata import GlueSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=GlueSource) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/service_spec.py b/ingestion/src/metadata/ingestion/source/database/greenplum/service_spec.py new file mode 100644 index 000000000000..9fe3ac14cfa1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.greenplum.metadata import GreenplumSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=GreenplumSource) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/service_spec.py b/ingestion/src/metadata/ingestion/source/database/hive/service_spec.py new file mode 100644 index 000000000000..e303ccfab773 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/hive/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.hive.metadata import HiveSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=HiveSource) diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/service_spec.py b/ingestion/src/metadata/ingestion/source/database/iceberg/service_spec.py new file mode 100644 index 000000000000..db888fd6e326 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.iceberg.metadata import IcebergSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=IcebergSource) diff --git a/ingestion/src/metadata/ingestion/source/database/impala/service_spec.py b/ingestion/src/metadata/ingestion/source/database/impala/service_spec.py new file mode 100644 index 000000000000..51e302a88e42 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/impala/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.impala.metadata import ImpalaSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=ImpalaSource) diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb/service_spec.py b/ingestion/src/metadata/ingestion/source/database/mariadb/service_spec.py new file mode 100644 index 000000000000..40f55327c905 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mariadb/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.mariadb.metadata import MariadbSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=MariadbSource) diff --git a/ingestion/src/metadata/ingestion/source/database/mongodb/service_spec.py b/ingestion/src/metadata/ingestion/source/database/mongodb/service_spec.py new file mode 100644 index 000000000000..b3feafb4a665 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mongodb/service_spec.py @@ -0,0 +1,7 @@ +from metadata.ingestion.source.database.mongodb.metadata import MongodbSource +from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=MongodbSource, profiler_class=NoSQLProfilerInterface +) diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/service_spec.py b/ingestion/src/metadata/ingestion/source/database/mssql/service_spec.py new file mode 100644 index 000000000000..9d7fa7bf1fa3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mssql/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.mssql.lineage import MssqlLineageSource +from metadata.ingestion.source.database.mssql.metadata import MssqlSource +from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=MssqlSource, + lineage_source_class=MssqlLineageSource, + usage_source_class=MssqlUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/service_spec.py b/ingestion/src/metadata/ingestion/source/database/mysql/service_spec.py new file mode 100644 index 000000000000..0cf927395161 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mysql/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.mysql.metadata import MysqlSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=MysqlSource) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/service_spec.py b/ingestion/src/metadata/ingestion/source/database/oracle/service_spec.py new file mode 100644 index 000000000000..3c89f9162117 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/oracle/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.oracle.lineage import OracleLineageSource +from metadata.ingestion.source.database.oracle.metadata import OracleSource +from metadata.ingestion.source.database.oracle.usage import OracleUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=OracleSource, + lineage_source_class=OracleLineageSource, + usage_source_class=OracleUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/pinotdb/service_spec.py b/ingestion/src/metadata/ingestion/source/database/pinotdb/service_spec.py new file mode 100644 index 000000000000..ffc4b1885a17 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/pinotdb/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.pinotdb.metadata import PinotdbSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=PinotdbSource) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/service_spec.py b/ingestion/src/metadata/ingestion/source/database/postgres/service_spec.py new file mode 100644 index 000000000000..3bea308b164a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/postgres/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.postgres.lineage import PostgresLineageSource +from metadata.ingestion.source.database.postgres.metadata import PostgresSource +from metadata.ingestion.source.database.postgres.usage import PostgresUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=PostgresSource, + lineage_source_class=PostgresLineageSource, + usage_source_class=PostgresUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/presto/service_spec.py b/ingestion/src/metadata/ingestion/source/database/presto/service_spec.py new file mode 100644 index 000000000000..c88f3f69db55 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/presto/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.presto.metadata import PrestoSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=PrestoSource) diff --git a/ingestion/src/metadata/ingestion/source/database/query/service_spec.py b/ingestion/src/metadata/ingestion/source/database/query/service_spec.py new file mode 100644 index 000000000000..643f2d7c398b --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/query/service_spec.py @@ -0,0 +1,9 @@ +from metadata.ingestion.source.database.query.lineage import QueryLogLineageSource +from metadata.ingestion.source.database.query.usage import QueryLogUsageSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec( + metadata_source_class="not.implemented", + lineage_source_class=QueryLogLineageSource, + usage_source_class=QueryLogUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 92267757dd91..270eae010920 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -96,7 +96,6 @@ ) from metadata.utils.filters import filter_by_database from metadata.utils.helpers import get_start_and_end -from metadata.utils.importer import import_side_effects from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( get_all_table_comments, @@ -106,9 +105,6 @@ logger = ingestion_logger() -import_side_effects( - "metadata.ingestion.source.database.redshift.profiler.system", -) STANDARD_TABLE_TYPES = { "r": TableType.Regular, diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py new file mode 100644 index 000000000000..b64fcd2a11b7 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py @@ -0,0 +1,12 @@ +from metadata.ingestion.source.database.redshift.profiler.system import ( + RedshiftSystemMetricsComputer, +) +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.profiler.metrics.system.system import SystemMetricsComputer + + +class RedshiftProfiler(SQAProfilerInterface): + def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer: + return RedshiftSystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py index 8ea963e8ed4c..148178722a6f 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py @@ -1,55 +1,74 @@ -from typing import Dict, List +from typing import List -from pydantic import TypeAdapter -from sqlalchemy.orm import DeclarativeMeta, Session +from sqlalchemy.orm import DeclarativeMeta from metadata.generated.schema.entity.data.table import SystemProfile from metadata.ingestion.source.database.redshift.queries import ( STL_QUERY, - get_metric_result, get_query_results, ) from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations from metadata.profiler.metrics.system.system import ( - SYSTEM_QUERY_RESULT_CACHE, - get_system_metrics_for_dialect, + CacheProvider, + EmptySystemMetricsSource, + SQASessionProvider, + SystemMetricsComputer, ) -from metadata.profiler.orm.registry import Dialects from metadata.utils.logger import profiler_logger -from metadata.utils.profiler_utils import get_value_from_cache, set_cache +from metadata.utils.profiler_utils import QueryResult +from metadata.utils.time_utils import datetime_to_timestamp logger = profiler_logger() -@get_system_metrics_for_dialect.register(Dialects.Redshift) -def _( - dialect: str, - session: Session, - table: DeclarativeMeta, - *args, - **kwargs, -) -> List[SystemProfile]: - """List all the DML operations for reshifts tables +class RedshiftSystemMetricsSource( + SQASessionProvider, EmptySystemMetricsSource, CacheProvider +): + def __init__(self, *args, **kwargs): + # collaborative constructor that initalizes the SQASessionProvider and CacheProvider + super().__init__(*args, **kwargs) - Args: - dialect (str): redshift - session (Session): session object - table (DeclarativeMeta): orm table + def get_inserts( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + queries = self.get_or_update_cache( + f"{database}.{schema}", + self._get_insert_queries, + database=database, + schema=schema, + ) + return get_metric_result(queries, table) - Returns: - List[Dict]: - """ - logger.debug(f"Fetching system metrics for {dialect}") - database = session.get_bind().url.database - schema = table.__table_args__["schema"] # type: ignore + def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs): + return { + "table": table.__table__.name, + "database": self.get_session().get_bind().url.database, + "schema": table.__table__.schema, + } - metric_results: List[Dict] = [] + def get_deletes( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + queries = self.get_or_update_cache( + f"{database}.{schema}", + self._get_delete_queries, + database=database, + schema=schema, + ) + return get_metric_result(queries, table) + + def get_updates( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + queries = self.get_or_update_cache( + f"{database}.{schema}", + self._get_update_queries, + database=database, + schema=schema, + ) + return get_metric_result(queries, table) - # get inserts ddl queries - inserts = get_value_from_cache( - SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.inserts" - ) - if not inserts: + def _get_insert_queries(self, database: str, schema: str) -> List[QueryResult]: insert_query = STL_QUERY.format( alias="si", join_type="LEFT", @@ -57,23 +76,13 @@ def _( database=database, schema=schema, ) - inserts = get_query_results( - session, + return get_query_results( + super().get_session(), insert_query, DatabaseDMLOperations.INSERT.value, ) - set_cache( - SYSTEM_QUERY_RESULT_CACHE, - f"{Dialects.Redshift}.{database}.{schema}.inserts", - inserts, - ) - metric_results.extend(get_metric_result(inserts, table.__tablename__)) - # get deletes ddl queries - deletes = get_value_from_cache( - SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.deletes" - ) - if not deletes: + def _get_delete_queries(self, database: str, schema: str) -> List[QueryResult]: delete_query = STL_QUERY.format( alias="sd", join_type="RIGHT", @@ -81,23 +90,13 @@ def _( database=database, schema=schema, ) - deletes = get_query_results( - session, + return get_query_results( + super().get_session(), delete_query, DatabaseDMLOperations.DELETE.value, ) - set_cache( - SYSTEM_QUERY_RESULT_CACHE, - f"{Dialects.Redshift}.{database}.{schema}.deletes", - deletes, - ) - metric_results.extend(get_metric_result(deletes, table.__tablename__)) # type: ignore - # get updates ddl queries - updates = get_value_from_cache( - SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.updates" - ) - if not updates: + def _get_update_queries(self, database: str, schema: str) -> List[QueryResult]: update_query = STL_QUERY.format( alias="si", join_type="INNER", @@ -105,16 +104,33 @@ def _( database=database, schema=schema, ) - updates = get_query_results( - session, + return get_query_results( + super().get_session(), update_query, DatabaseDMLOperations.UPDATE.value, ) - set_cache( - SYSTEM_QUERY_RESULT_CACHE, - f"{Dialects.Redshift}.{database}.{schema}.updates", - updates, - ) - metric_results.extend(get_metric_result(updates, table.__tablename__)) # type: ignore - return TypeAdapter(List[SystemProfile]).validate_python(metric_results) + +def get_metric_result(ddls: List[QueryResult], table_name: str) -> List: + """Given query results, retur the metric result + + Args: + ddls (List[QueryResult]): list of query results + table_name (str): table name + + Returns: + List: + """ + return [ + { + "timestamp": datetime_to_timestamp(ddl.start_time, milliseconds=True), + "operation": ddl.query_type, + "rowsAffected": ddl.rows, + } + for ddl in ddls + if ddl.table_name == table_name + ] + + +class RedshiftSystemMetricsComputer(SystemMetricsComputer, RedshiftSystemMetricsSource): + pass diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py b/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py new file mode 100644 index 000000000000..6f010e9287ef --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py @@ -0,0 +1,14 @@ +from metadata.ingestion.source.database.redshift.lineage import RedshiftLineageSource +from metadata.ingestion.source.database.redshift.metadata import RedshiftSource +from metadata.ingestion.source.database.redshift.profiler.profiler import ( + RedshiftProfiler, +) +from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=RedshiftSource, + lineage_source_class=RedshiftLineageSource, + usage_source_class=RedshiftUsageSource, + profiler_class=RedshiftProfiler, +) diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/service_spec.py b/ingestion/src/metadata/ingestion/source/database/salesforce/service_spec.py new file mode 100644 index 000000000000..f0fc26d0b05a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.salesforce.metadata import SalesforceSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=SalesforceSource) diff --git a/ingestion/src/metadata/ingestion/source/database/saperp/service_spec.py b/ingestion/src/metadata/ingestion/source/database/saperp/service_spec.py new file mode 100644 index 000000000000..8d63287440c8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/saperp/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.saperp.metadata import SaperpSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=SaperpSource) diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/service_spec.py b/ingestion/src/metadata/ingestion/source/database/saphana/service_spec.py new file mode 100644 index 000000000000..733652ff7419 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/saphana/service_spec.py @@ -0,0 +1,7 @@ +from metadata.ingestion.source.database.saphana.lineage import SaphanaLineageSource +from metadata.ingestion.source.database.saphana.metadata import SaphanaSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=SaphanaSource, lineage_source_class=SaphanaLineageSource +) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/service_spec.py b/ingestion/src/metadata/ingestion/source/database/sas/service_spec.py new file mode 100644 index 000000000000..9c6794d842bc --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sas/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.sas.metadata import SasSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=SasSource) diff --git a/ingestion/src/metadata/ingestion/source/database/singlestore/service_spec.py b/ingestion/src/metadata/ingestion/source/database/singlestore/service_spec.py new file mode 100644 index 000000000000..3175e998f128 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/singlestore/service_spec.py @@ -0,0 +1,9 @@ +from metadata.ingestion.source.database.singlestore.metadata import SinglestoreSource +from metadata.profiler.interface.sqlalchemy.single_store.profiler_interface import ( + SingleStoreProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=SinglestoreSource, profiler_class=SingleStoreProfilerInterface +) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py new file mode 100644 index 000000000000..e028bb2a229c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py @@ -0,0 +1,27 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Profiler for Snowflake +""" +from metadata.ingestion.source.database.snowflake.profiler.system import ( + SnowflakeSystemMetricsSource, +) +from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import ( + SnowflakeProfilerInterface, +) + + +class SnowflakeProfiler(SnowflakeProfilerInterface): + def initialize_system_metrics_computer( + self, **kwargs + ) -> SnowflakeSystemMetricsSource: + return SnowflakeSystemMetricsSource(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py index b7d398d79eac..fa92017db4db 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py @@ -1,17 +1,27 @@ +import hashlib import re import traceback from typing import List, Optional, Tuple import sqlalchemy.orm -from sqlalchemy.orm import DeclarativeMeta, Session +from pydantic import TypeAdapter +from sqlalchemy.orm import DeclarativeMeta +from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile from metadata.ingestion.source.database.snowflake.models import ( SnowflakeQueryLogEntry, SnowflakeQueryResult, ) +from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations +from metadata.profiler.metrics.system.system import ( + CacheProvider, + EmptySystemMetricsSource, + SQASessionProvider, +) from metadata.utils.logger import profiler_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache from metadata.utils.profiler_utils import get_identifiers_from_string +from metadata.utils.time_utils import datetime_to_timestamp PUBLIC_SCHEMA = "PUBLIC" logger = profiler_logger() @@ -23,6 +33,16 @@ IDENTIFIER_PATTERN = r"(IDENTIFIER\(\')([\w._\"]+)(\'\))" +def sha256_hash(text: str) -> str: + """Return the SHA256 hash of the text""" + + return hashlib.sha256(text.encode()).hexdigest() + + +cache = LRUCache(LRU_CACHE_SIZE) + + +@cache.wrap(key_func=lambda query: sha256_hash(query.strip())) def _parse_query(query: str) -> Optional[str]: """Parse snowflake queries to extract the identifiers""" match = re.match(QUERY_PATTERN, query, re.IGNORECASE) @@ -48,7 +68,7 @@ def _parse_query(query: str) -> Optional[str]: class SnowflakeTableResovler: def __init__(self, session: sqlalchemy.orm.Session): - self._cache = LRUCache(LRU_CACHE_SIZE) + self._cache = LRUCache[bool](LRU_CACHE_SIZE) self.session = session def show_tables(self, db, schema, table): @@ -241,20 +261,129 @@ def get_snowflake_system_queries( return None -def build_snowflake_query_results( - session: Session, - table: DeclarativeMeta, -) -> List[SnowflakeQueryResult]: - """List and parse snowflake DML query results""" - query_results = [] - resolver = SnowflakeTableResovler( - session=session, - ) - for row in SnowflakeQueryLogEntry.get_for_table(session, table.__tablename__): - result = get_snowflake_system_queries( - query_log_entry=row, - resolver=resolver, +class SnowflakeSystemMetricsSource( + SQASessionProvider, EmptySystemMetricsSource, CacheProvider[SnowflakeQueryLogEntry] +): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resolver = SnowflakeTableResovler( + session=super().get_session(), + ) + + def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs): + return { + "table": table.__table__.name, + "database": self.get_session().get_bind().url.database, + "schema": table.__table__.schema, + } + + def get_inserts( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + return self.get_system_profile( + database, + schema, + table, + list( + self.get_queries_by_operation( + table, + [ + DatabaseDMLOperations.INSERT, + DatabaseDMLOperations.MERGE, + ], + ) + ), + "rows_inserted", + DmlOperationType.INSERT, + ) + + def get_updates( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + return self.get_system_profile( + database, + schema, + table, + list( + self.get_queries_by_operation( + table, + [ + DatabaseDMLOperations.UPDATE, + DatabaseDMLOperations.MERGE, + ], + ) + ), + "rows_updated", + DmlOperationType.UPDATE, + ) + + def get_deletes( + self, database: str, schema: str, table: str + ) -> List[SystemProfile]: + return self.get_system_profile( + database, + schema, + table, + list( + self.get_queries_by_operation( + table, + [ + DatabaseDMLOperations.DELETE, + ], + ) + ), + "rows_deleted", + DmlOperationType.DELETE, ) - if result: - query_results.append(result) - return query_results + + @staticmethod + def get_system_profile( + db: str, + schema: str, + table: str, + query_results: List[SnowflakeQueryResult], + rows_affected_field: str, + operation: DmlOperationType, + ) -> List[SystemProfile]: + if not SnowflakeQueryResult.model_fields.get(rows_affected_field): + raise ValueError( + f"rows_affected_field [{rows_affected_field}] is not a valid field in SnowflakeQueryResult." + ) + return TypeAdapter(List[SystemProfile]).validate_python( + [ + { + "timestamp": datetime_to_timestamp(q.start_time, milliseconds=True), + "operation": operation, + "rowsAffected": getattr(q, rows_affected_field), + } + for q in query_results + if getattr(q, rows_affected_field) > 0 + and q.database_name == db + and q.schema_name == schema + and q.table_name == table + ] + ) + + def get_queries_by_operation( + self, table: str, operations: List[DatabaseDMLOperations] + ): + ops = [op.value for op in operations] + yield from ( + query for query in self.get_queries(table) if query.query_type in ops + ) + + def get_queries(self, table: str) -> List[SnowflakeQueryResult]: + queries = self.get_or_update_cache( + table, + SnowflakeQueryLogEntry.get_for_table, + session=super().get_session(), + tablename=table, + ) + results = [ + get_snowflake_system_queries( + query_log_entry=row, + resolver=self.resolver, + ) + for row in queries + ] + return [result for result in results if result is not None] diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py b/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py new file mode 100644 index 000000000000..51ffc62ed29e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py @@ -0,0 +1,14 @@ +from metadata.ingestion.source.database.snowflake.lineage import SnowflakeLineageSource +from metadata.ingestion.source.database.snowflake.metadata import SnowflakeSource +from metadata.ingestion.source.database.snowflake.profiler.profiler import ( + SnowflakeProfiler, +) +from metadata.ingestion.source.database.snowflake.usage import SnowflakeUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=SnowflakeSource, + lineage_source_class=SnowflakeLineageSource, + usage_source_class=SnowflakeUsageSource, + profiler_class=SnowflakeProfiler, +) diff --git a/ingestion/src/metadata/ingestion/source/database/sqlite/service_spec.py b/ingestion/src/metadata/ingestion/source/database/sqlite/service_spec.py new file mode 100644 index 000000000000..6ffbe21a16ce --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sqlite/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.sqlite.metadata import SqliteSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=SqliteSource) diff --git a/ingestion/src/metadata/ingestion/source/database/teradata/service_spec.py b/ingestion/src/metadata/ingestion/source/database/teradata/service_spec.py new file mode 100644 index 000000000000..4fe31877cdf7 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/teradata/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.database.teradata.metadata import TeradataSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec(metadata_source_class=TeradataSource) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/service_spec.py b/ingestion/src/metadata/ingestion/source/database/trino/service_spec.py new file mode 100644 index 000000000000..4242ea01e9b3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/trino/service_spec.py @@ -0,0 +1,14 @@ +from metadata.ingestion.source.database.trino.lineage import TrinoLineageSource +from metadata.ingestion.source.database.trino.metadata import TrinoSource +from metadata.ingestion.source.database.trino.usage import TrinoUsageSource +from metadata.profiler.interface.sqlalchemy.trino.profiler_interface import ( + TrinoProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=TrinoSource, + lineage_source_class=TrinoLineageSource, + usage_source_class=TrinoUsageSource, + profiler_class=TrinoProfilerInterface, +) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/service_spec.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/service_spec.py new file mode 100644 index 000000000000..676941465306 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/service_spec.py @@ -0,0 +1,18 @@ +from metadata.ingestion.source.database.unitycatalog.lineage import ( + UnitycatalogLineageSource, +) +from metadata.ingestion.source.database.unitycatalog.metadata import UnitycatalogSource +from metadata.ingestion.source.database.unitycatalog.usage import ( + UnitycatalogUsageSource, +) +from metadata.profiler.interface.sqlalchemy.unity_catalog.profiler_interface import ( + UnityCatalogProfilerInterface, +) +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=UnitycatalogSource, + lineage_source_class=UnitycatalogLineageSource, + usage_source_class=UnitycatalogUsageSource, + profiler_class=UnityCatalogProfilerInterface, +) diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/service_spec.py b/ingestion/src/metadata/ingestion/source/database/vertica/service_spec.py new file mode 100644 index 000000000000..1cda23751a20 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/vertica/service_spec.py @@ -0,0 +1,10 @@ +from metadata.ingestion.source.database.vertica.lineage import VerticaLineageSource +from metadata.ingestion.source.database.vertica.metadata import VerticaSource +from metadata.ingestion.source.database.vertica.usage import VerticaUsageSource +from metadata.utils.service_spec.default import DefaultDatabaseSpec + +ServiceSpec = DefaultDatabaseSpec( + metadata_source_class=VerticaSource, + lineage_source_class=VerticaLineageSource, + usage_source_class=VerticaUsageSource, +) diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/service_spec.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/service_spec.py new file mode 100644 index 000000000000..3d789a62db53 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=KafkaSource) diff --git a/ingestion/src/metadata/ingestion/source/messaging/kinesis/service_spec.py b/ingestion/src/metadata/ingestion/source/messaging/kinesis/service_spec.py new file mode 100644 index 000000000000..feb043ff99c1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/kinesis/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.messaging.kinesis.metadata import KinesisSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=KinesisSource) diff --git a/ingestion/src/metadata/ingestion/source/messaging/redpanda/service_spec.py b/ingestion/src/metadata/ingestion/source/messaging/redpanda/service_spec.py new file mode 100644 index 000000000000..8b76e817ab49 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/redpanda/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.messaging.redpanda.metadata import RedpandaSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=RedpandaSource) diff --git a/ingestion/src/metadata/ingestion/source/metadata/alationsink/service_spec.py b/ingestion/src/metadata/ingestion/source/metadata/alationsink/service_spec.py new file mode 100644 index 000000000000..d539d57cefe0 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/metadata/alationsink/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.metadata.alationsink.metadata import AlationsinkSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=AlationsinkSource) diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen/service_spec.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen/service_spec.py new file mode 100644 index 000000000000..efeb8bb2c598 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.metadata.amundsen.metadata import AmundsenSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=AmundsenSource) diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/service_spec.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/service_spec.py new file mode 100644 index 000000000000..9e946f558e8a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.metadata.atlas.metadata import AtlasSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=AtlasSource) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/service_spec.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/service_spec.py new file mode 100644 index 000000000000..902f09311c39 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.mlmodel.mlflow.metadata import MlflowSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=MlflowSource) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/service_spec.py b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/service_spec.py new file mode 100644 index 000000000000..484de9795a8c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.mlmodel.sagemaker.metadata import SagemakerSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=SagemakerSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/service_spec.py new file mode 100644 index 000000000000..4e46824ac397 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.airbyte.metadata import AirbyteSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=AirbyteSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/service_spec.py new file mode 100644 index 000000000000..a02e30d76306 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.airflow.metadata import AirflowSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=AirflowSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster/service_spec.py new file mode 100644 index 000000000000..9c8a9e5176fa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.dagster.metadata import DagsterSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=DagsterSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/service_spec.py new file mode 100644 index 000000000000..a76b65df78aa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/service_spec.py @@ -0,0 +1,6 @@ +from metadata.ingestion.source.pipeline.databrickspipeline.metadata import ( + DatabrickspipelineSource, +) +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=DatabrickspipelineSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/service_spec.py new file mode 100644 index 000000000000..34f69c2db0a2 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.dbtcloud.metadata import DbtcloudSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=DbtcloudSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/service_spec.py new file mode 100644 index 000000000000..e720a39abc29 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.domopipeline.metadata import DomopipelineSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=DomopipelineSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/service_spec.py new file mode 100644 index 000000000000..34dd83a75f3e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.fivetran.metadata import FivetranSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=FivetranSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/flink/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/flink/service_spec.py new file mode 100644 index 000000000000..4a773ef9a729 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/flink/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.flink.metadata import FlinkSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=FlinkSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/service_spec.py new file mode 100644 index 000000000000..b0c465f4c226 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.gluepipeline.metadata import GluepipelineSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=GluepipelineSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/service_spec.py new file mode 100644 index 000000000000..a9a2556ac975 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.kafkaconnect.metadata import KafkaconnectSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=KafkaconnectSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/service_spec.py new file mode 100644 index 000000000000..531935883e22 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.nifi.metadata import NifiSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=NifiSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/service_spec.py new file mode 100644 index 000000000000..a556398938fa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=OpenlineageSource) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/spline/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/spline/service_spec.py new file mode 100644 index 000000000000..20527834826a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/spline/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.spline.metadata import SplineSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=SplineSource) diff --git a/ingestion/src/metadata/ingestion/source/search/elasticsearch/service_spec.py b/ingestion/src/metadata/ingestion/source/search/elasticsearch/service_spec.py new file mode 100644 index 000000000000..fa2da637fc2c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/search/elasticsearch/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.search.elasticsearch.metadata import ElasticsearchSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=ElasticsearchSource) diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/service_spec.py b/ingestion/src/metadata/ingestion/source/storage/gcs/service_spec.py new file mode 100644 index 000000000000..73df4a9f6200 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.storage.gcs.metadata import GcsSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=GcsSource) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/service_spec.py b/ingestion/src/metadata/ingestion/source/storage/s3/service_spec.py new file mode 100644 index 000000000000..6a3a31e96ac2 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/s3/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.storage.s3.metadata import S3Source +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=S3Source) diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index 721a16d5a120..884ceac4c69c 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -15,7 +15,7 @@ """ from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Type, Union from sqlalchemy import Column @@ -60,6 +60,7 @@ ) from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.metrics.system.system import System from metadata.profiler.processor.runner import QueryRunner from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.partition import get_partition_details @@ -460,7 +461,7 @@ def _compute_window_metrics( @abstractmethod def _compute_system_metrics( self, - metrics: Metrics, + metrics: Type[System], runner, *args, **kwargs, diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py deleted file mode 100644 index dc443e047161..000000000000 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Factory class for creating profiler interface objects -""" - -import importlib -from typing import Dict, cast - -from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( - BigQueryConnection, -) -from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( - DatabricksConnection, -) -from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( - DatalakeConnection, -) -from metadata.generated.schema.entity.services.connections.database.db2Connection import ( - Db2Connection, -) -from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( - DynamoDBConnection, -) -from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( - MariaDBConnection, -) -from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( - MongoDBConnection, -) -from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( - SingleStoreConnection, -) -from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( - SnowflakeConnection, -) -from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( - TrinoConnection, -) -from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( - UnityCatalogConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseConnection -from metadata.profiler.factory import Factory -from metadata.profiler.interface.profiler_interface import ProfilerInterface - - -class ProfilerInterfaceFactory(Factory): - def create(self, interface_type: str, *args, **kwargs): - """Create interface object based on interface type""" - interface_class_path = profiler_class_mapping.get( - interface_type, profiler_class_mapping[DatabaseConnection.__name__] - ) - try: - module_path, class_name = interface_class_path.rsplit(".", 1) - module = importlib.import_module(module_path) - profiler_class = getattr(module, class_name) - except (ImportError, AttributeError) as e: - raise ImportError(f"Error importing {class_name} from {module_path}: {e}") - profiler_class = cast(ProfilerInterface, profiler_class) - return profiler_class.create(*args, **kwargs) - - -profiler_interface_factory = ProfilerInterfaceFactory() - -BASE_PROFILER_PATH = "metadata.profiler.interface" -SQLALCHEMY_PROFILER_PATH = f"{BASE_PROFILER_PATH}.sqlalchemy" -NOSQL_PROFILER_PATH = ( - f"{BASE_PROFILER_PATH}.nosql.profiler_interface.NoSQLProfilerInterface" -) -PANDAS_PROFILER_PATH = ( - f"{BASE_PROFILER_PATH}.pandas.profiler_interface.PandasProfilerInterface" -) - -# Configuration for dynamic imports -profiler_class_mapping: Dict[str, str] = { - DatabaseConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".profiler_interface.SQAProfilerInterface", - BigQueryConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".bigquery.profiler_interface.BigQueryProfilerInterface", - SingleStoreConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".single_store.profiler_interface.SingleStoreProfilerInterface", - DatalakeConnection.__name__: PANDAS_PROFILER_PATH, - MariaDBConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".mariadb.profiler_interface.MariaDBProfilerInterface", - SnowflakeConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".snowflake.profiler_interface.SnowflakeProfilerInterface", - TrinoConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".trino.profiler_interface.TrinoProfilerInterface", - UnityCatalogConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".unity_catalog.profiler_interface.UnityCatalogProfilerInterface", - DatabricksConnection.__name__: SQLALCHEMY_PROFILER_PATH - + ".databricks.profiler_interface.DatabricksProfilerInterface", - Db2Connection.__name__: SQLALCHEMY_PROFILER_PATH - + ".db2.profiler_interface.DB2ProfilerInterface", - MongoDBConnection.__name__: NOSQL_PROFILER_PATH, - DynamoDBConnection.__name__: NOSQL_PROFILER_PATH, -} diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 68ef06957677..b0263741d607 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -21,13 +21,17 @@ import traceback from collections import defaultdict from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Type from sqlalchemy import Column, inspect, text from sqlalchemy.exc import DBAPIError, ProgrammingError, ResourceClosedError from sqlalchemy.orm import scoped_session -from metadata.generated.schema.entity.data.table import CustomMetricProfile, TableData +from metadata.generated.schema.entity.data.table import ( + CustomMetricProfile, + SystemProfile, + TableData, +) from metadata.generated.schema.tests.customMetric import CustomMetric from metadata.ingestion.connections.session import create_and_bind_thread_safe_session from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin @@ -38,6 +42,7 @@ from metadata.profiler.metrics.static.mean import Mean from metadata.profiler.metrics.static.stddev import StdDev from metadata.profiler.metrics.static.sum import Sum +from metadata.profiler.metrics.system.system import System, SystemMetricsComputer from metadata.profiler.orm.functions.table_metric_computer import TableMetricComputer from metadata.profiler.orm.registry import Dialects from metadata.profiler.processor.metric_filter import MetricFilter @@ -105,6 +110,13 @@ def __init__( self._table = self._convert_table_to_orm_object(sqa_metadata) self.create_session() + self.system_metrics_computer = self.initialize_system_metrics_computer() + + def initialize_system_metrics_computer(self) -> SystemMetricsComputer: + """Initialize system metrics computer. Override this if you want to use a metric source with + state or other dependencies. + """ + return SystemMetricsComputer() def create_session(self): self.session_factory = self._session_factory() @@ -363,12 +375,11 @@ def _compute_custom_metrics( def _compute_system_metrics( self, - metrics: Metrics, + metrics: Type[System], runner: QueryRunner, - session, *args, **kwargs, - ): + ) -> List[SystemProfile]: """Get system metric for tables Args: @@ -379,13 +390,8 @@ def _compute_system_metrics( Returns: dictionnary of results """ - try: - rows = metrics().sql(session, conn_config=self.service_connection_config) - return rows - except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}" - handle_query_exception(msg, exc, session) - return None + logger.debug(f"Computing system metrics for {runner.table.__tablename__}") + return self.system_metrics_computer.get_system_metrics(runner.table) def _create_thread_safe_sampler( self, diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py b/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py deleted file mode 100644 index f54853604bcd..000000000000 --- a/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Bigquery System Metric Queries -""" -from datetime import datetime - -from pydantic import BaseModel - -from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations - - -class BigQueryQueryResult(BaseModel): - table_name: dict - timestamp: datetime - query_type: str - dml_statistics: dict - - -DML_STAT_TO_DML_STATEMENT_MAPPING = { - "inserted_row_count": DatabaseDMLOperations.INSERT.value, - "deleted_row_count": DatabaseDMLOperations.DELETE.value, - "updated_row_count": DatabaseDMLOperations.UPDATE.value, -} - -JOBS = """ - SELECT - statement_type, - start_time, - destination_table, - dml_statistics - FROM - `region-{usage_location}`.INFORMATION_SCHEMA.JOBS - WHERE - DATE(creation_time) >= CURRENT_DATE() - 1 AND - destination_table.dataset_id = '{dataset_id}' AND - destination_table.project_id = '{project_id}' AND - statement_type IN ( - '{insert}', - '{update}', - '{delete}', - '{merge}' - ) - ORDER BY creation_time DESC; -""" diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 47524c430a2d..7d07fc0c3202 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -13,38 +13,18 @@ System Metric """ -import traceback +from abc import ABC from collections import defaultdict -from typing import Dict, List, Optional +from typing import Callable, Generic, List, TypeVar -from pydantic import TypeAdapter -from sqlalchemy import text -from sqlalchemy.orm import DeclarativeMeta, Session +from sqlalchemy.orm import Session from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.generated.schema.entity.data.table import SystemProfile -from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( - BigQueryConnection, -) -from metadata.ingestion.source.database.snowflake.profiler.system import ( - build_snowflake_query_results, -) from metadata.profiler.metrics.core import SystemMetric -from metadata.profiler.metrics.system.dml_operation import ( - DML_OPERATION_MAP, - DatabaseDMLOperations, -) -from metadata.profiler.metrics.system.queries.bigquery import ( - DML_STAT_TO_DML_STATEMENT_MAPPING, - JOBS, - BigQueryQueryResult, -) -from metadata.profiler.orm.registry import Dialects -from metadata.utils.dispatch import valuedispatch from metadata.utils.helpers import deep_size_of_dict from metadata.utils.logger import profiler_logger -from metadata.utils.profiler_utils import get_value_from_cache, set_cache -from metadata.utils.time_utils import datetime_to_timestamp +from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache logger = profiler_logger() @@ -58,207 +38,74 @@ def recursive_dic(): SYSTEM_QUERY_RESULT_CACHE = recursive_dic() - -@valuedispatch -def get_system_metrics_for_dialect( - dialect: str, - session: Session, - table: DeclarativeMeta, - *args, - **kwargs, -) -> Optional[List[SystemProfile]]: - """_summary_ - - Args: - dialect (str): database API dialect - session (Session): session object - - Returns: - Optional[Dict]: For BigQuery, Snowflake, Redshift returns - { - timestamp: , - operationType: - rowsAffected: , - } else returns None - """ - logger.debug(f"System metrics not support for {dialect}. Skipping processing.") - - -@get_system_metrics_for_dialect.register(Dialects.BigQuery) -def _( - dialect: str, - session: Session, - table: DeclarativeMeta, - conn_config: BigQueryConnection, - *args, - **kwargs, -) -> List[SystemProfile]: - """Compute system metrics for bigquery - - Args: - dialect (str): bigquery - session (Session): session Object - table (DeclarativeMeta): orm table - - Returns: - List[Dict]: - """ - logger.debug(f"Fetching system metrics for {dialect}") - - project_id = session.get_bind().url.host - dataset_id = table.__table_args__["schema"] # type: ignore - - metric_results: List[Dict] = [] - - jobs = get_value_from_cache( - SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs" - ) - - if not jobs: - cursor_jobs = session.execute( - text( - JOBS.format( - usage_location=conn_config.usageLocation, - dataset_id=dataset_id, - project_id=project_id, - insert=DatabaseDMLOperations.INSERT.value, - update=DatabaseDMLOperations.UPDATE.value, - delete=DatabaseDMLOperations.DELETE.value, - merge=DatabaseDMLOperations.MERGE.value, - ) - ) - ) - jobs = [ - BigQueryQueryResult( - query_type=row.statement_type, - timestamp=row.start_time, - table_name=row.destination_table, - dml_statistics=row.dml_statistics, - ) - for row in cursor_jobs - ] - set_cache( - SYSTEM_QUERY_RESULT_CACHE, - f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs", - jobs, +T = TypeVar("T") + + +class CacheProvider(ABC, Generic[T]): + def __init__(self): + self.cache = LRUCache[T](LRU_CACHE_SIZE) + + def get_or_update_cache( + self, + cache_path: str, + get_queries_fn: Callable[..., List[T]], + *args, + **kwargs, + ): + if cache_path in self.cache: + return self.cache.get(cache_path) + result = get_queries_fn(*args, **kwargs) + self.cache.put(cache_path, result) + return result + + +class EmptySystemMetricsSource: + """Empty system metrics source that can be used as a default. Just returns an empty list of system metrics + for any resource.""" + + def get_inserts(self, *args, **kwargs) -> List[SystemProfile]: + """Get insert queries""" + return [] + + def get_deletes(self, *args, **kwargs) -> List[SystemProfile]: + """Get delete queries""" + return [] + + def get_updates(self, *args, **kwargs) -> List[SystemProfile]: + """Get update queries""" + return [] + + def get_kwargs(self, *args, **kwargs): + return {} + + +class SystemMetricsComputer(EmptySystemMetricsSource): + def __init__(self, *args, **kwargs): + # collaborative constructor that initalizes upstream classes + super().__init__(*args, **kwargs) + + def get_system_metrics(self, *args, **kwargs) -> List[SystemProfile]: + """Return system metrics for a given table. Actual passed object can be a variety of types based + on the underlying infrastructure. For example, in the case of SQLalchemy, it can be a Table object + and in the case of Mongo, it can be a collection object.""" + kwargs = super().get_kwargs(*args, **kwargs) + return ( + super().get_inserts(**kwargs) + + super().get_deletes(**kwargs) + + super().get_updates(**kwargs) ) - for job in jobs: - if job.table_name.get("table_id") == table.__tablename__: # type: ignore - rows_affected = None - try: - if job.query_type == DatabaseDMLOperations.INSERT.value: - rows_affected = job.dml_statistics.get("inserted_row_count") - if job.query_type == DatabaseDMLOperations.DELETE.value: - rows_affected = job.dml_statistics.get("deleted_row_count") - if job.query_type == DatabaseDMLOperations.UPDATE.value: - rows_affected = job.dml_statistics.get("updated_row_count") - except AttributeError: - logger.debug(traceback.format_exc()) - rows_affected = None - - if job.query_type == DatabaseDMLOperations.MERGE.value: - for indx, key in enumerate(job.dml_statistics): - if job.dml_statistics[key] != 0: - metric_results.append( - { - # Merge statement can include multiple DML operations - # We are padding timestamps by 0,1,2 millisesond to avoid - # duplicate timestamps - "timestamp": int(job.timestamp.timestamp() * 1000) - + indx, - "operation": DML_STAT_TO_DML_STATEMENT_MAPPING.get(key), - "rowsAffected": job.dml_statistics[key], - } - ) - continue - - metric_results.append( - { - "timestamp": int(job.timestamp.timestamp() * 1000), - "operation": job.query_type, - "rowsAffected": rows_affected, - } - ) - - return TypeAdapter(List[SystemProfile]).validate_python(metric_results) - - -@get_system_metrics_for_dialect.register(Dialects.Snowflake) -def _( - dialect: str, - session: Session, - table: DeclarativeMeta, - *args, - **kwargs, -) -> Optional[List[Dict]]: - """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request. - We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types - (INSERTS, MERGE, DELETE, UPDATE). - - :waring: Unlike redshift and bigquery results are not cached as we'll be looking - at DDL for each table - - To get the number of rows affected we'll use the specific query ID. - - Args: - dialect (str): dialect - session (Session): session object - - Returns: - Dict: system metric - """ - logger.debug(f"Fetching system metrics for {dialect}") - - metric_results: List[Dict] = [] - - query_results = build_snowflake_query_results( - session=session, - table=table, - ) - - for query_result in query_results: - rows_affected = None - if query_result.query_type == DatabaseDMLOperations.INSERT.value: - rows_affected = query_result.rows_inserted - if query_result.query_type == DatabaseDMLOperations.DELETE.value: - rows_affected = query_result.rows_deleted - if query_result.query_type == DatabaseDMLOperations.UPDATE.value: - rows_affected = query_result.rows_updated - if query_result.query_type == DatabaseDMLOperations.MERGE.value: - if query_result.rows_inserted: - metric_results.append( - { - "timestamp": datetime_to_timestamp( - query_result.start_time, milliseconds=True - ), - "operation": DatabaseDMLOperations.INSERT.value, - "rowsAffected": query_result.rows_inserted, - } - ) - if query_result.rows_updated: - metric_results.append( - { - "timestamp": datetime_to_timestamp( - query_result.start_time, milliseconds=True - ), - "operation": DatabaseDMLOperations.UPDATE.value, - "rowsAffected": query_result.rows_updated, - } - ) - continue - - metric_results.append( - { - "timestamp": datetime_to_timestamp( - query_result.start_time, milliseconds=True - ), - "operation": DML_OPERATION_MAP.get(query_result.query_type), - "rowsAffected": rows_affected, - } - ) - return TypeAdapter(List[SystemProfile]).validate_python(metric_results) +class SQASessionProvider: + def __init__(self, *args, **kwargs): + self.session = kwargs.pop("session") + super().__init__(*args, **kwargs) + + def get_session(self): + return self.session + + def get_database(self) -> str: + return self.session.get_bind().url.database class System(SystemMetric): @@ -309,18 +156,6 @@ def _validate_attrs(self, attr_list: List[str]) -> None: ) def sql(self, session: Session, **kwargs): - """Implements the SQL logic to fetch system data""" - self._validate_attrs(["table", "ometa_client", "db_service"]) - - conn_config = kwargs.get("conn_config") - - system_metrics = get_system_metrics_for_dialect( - session.get_bind().dialect.name, - session=session, - table=self.table, # pylint: disable=no-member - conn_config=conn_config, - ometa_client=self.ometa_client, # pylint: disable=no-member - db_service=self.db_service, # pylint: disable=no-member + raise NotImplementedError( + "SQL method is not implemented for System metric. Use SystemMetricsComputer.get_system_metrics instead" ) - self._manage_cache() - return system_metrics diff --git a/ingestion/src/metadata/profiler/source/base/profiler_source.py b/ingestion/src/metadata/profiler/source/base/profiler_source.py index 3d3d19fd3e9e..b84aa994f2ae 100644 --- a/ingestion/src/metadata/profiler/source/base/profiler_source.py +++ b/ingestion/src/metadata/profiler/source/base/profiler_source.py @@ -14,7 +14,7 @@ its interface """ from copy import deepcopy -from typing import List, Optional, Tuple, cast +from typing import List, Optional, Tuple, Type, cast from sqlalchemy import MetaData @@ -31,6 +31,7 @@ DatabaseConnection, DatabaseService, ) +from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) @@ -44,9 +45,14 @@ from metadata.profiler.processor.core import Profiler from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics from metadata.profiler.source.profiler_source_interface import ProfilerSourceInterface +from metadata.utils.importer import import_from_module +from metadata.utils.logger import profiler_logger +from metadata.utils.service_spec.service_spec import BaseSpec NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,) +logger = profiler_logger() + class ProfilerSource(ProfilerSourceInterface): """ @@ -61,15 +67,14 @@ def __init__( global_profiler_configuration: ProfilerConfiguration, ): self.service_conn_config = self._copy_service_config(config, database) - self.source_config = config.source.sourceConfig.config - self.source_config = cast( - DatabaseServiceProfilerPipeline, self.source_config - ) # satisfy type checker + self.source_config = DatabaseServiceProfilerPipeline.model_validate( + config.source.sourceConfig.config + ) self.profiler_config = ProfilerProcessorConfig.model_validate( config.processor.model_dump().get("config") ) self.ometa_client = ometa_client - self.profiler_interface_type: str = self._get_profiler_interface_type(config) + self.profiler_interface_type: str = config.source.type.lower() self.sqa_metadata = self._set_sqa_metadata() self._interface = None self.global_profiler_configuration = global_profiler_configuration @@ -92,18 +97,6 @@ def _set_sqa_metadata(self): return MetaData() return None - def _get_profiler_interface_type(self, config) -> str: - """_summary_ - - Args: - config (_type_): profiler config - Returns: - str: - """ - if isinstance(self.service_conn_config, NON_SQA_DATABASE_CONNECTIONS): - return self.service_conn_config.__class__.__name__ - return config.source.serviceConnection.root.config.__class__.__name__ - @staticmethod def get_config_for_table(entity: Table, profiler_config) -> Optional[TableConfig]: """Get config for a specific entity @@ -196,12 +189,10 @@ def create_profiler_interface( db_service: Optional[DatabaseService], ) -> ProfilerInterface: """Create sqlalchemy profiler interface""" - from metadata.profiler.interface.profiler_interface_factory import ( # pylint: disable=import-outside-toplevel - profiler_interface_factory, + profiler_class = self.import_profiler_class( + ServiceType.Database, source_type=self.profiler_interface_type ) - - profiler_interface: ProfilerInterface = profiler_interface_factory.create( - self.profiler_interface_type, + profiler_interface: ProfilerInterface = profiler_class.create( entity, schema_entity, database_entity, @@ -217,6 +208,12 @@ def create_profiler_interface( self.interface = profiler_interface return self.interface + def import_profiler_class( + self, service_type: ServiceType, source_type: str + ) -> Type[ProfilerInterface]: + class_path = BaseSpec.get_for_source(service_type, source_type).profiler_class + return cast(Type[ProfilerInterface], import_from_module(class_path)) + def _get_context_entities( self, entity: Table ) -> Tuple[DatabaseSchema, Database, DatabaseService]: diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py index 9c00c11d57b0..922539ebddd0 100644 --- a/ingestion/src/metadata/profiler/source/metadata_ext.py +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -21,7 +21,7 @@ """ import traceback from copy import deepcopy -from typing import Iterable, cast +from typing import Iterable, Type, cast from sqlalchemy.inspection import inspect @@ -29,6 +29,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) +from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -40,16 +41,18 @@ ) from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.source.metadata import ( OpenMetadataSource, ProfilerSourceAndEntity, ) -from metadata.profiler.source.profiler_source_factory import profiler_source_factory from metadata.utils import fqn from metadata.utils.class_helper import get_service_type_from_source_type from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table -from metadata.utils.importer import import_source_class +from metadata.utils.importer import import_from_module from metadata.utils.logger import profiler_logger +from metadata.utils.service_spec import BaseSpec +from metadata.utils.service_spec.service_spec import import_source_class from metadata.utils.ssl_manager import get_ssl_connection logger = profiler_logger() @@ -145,8 +148,7 @@ def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: ) continue - profiler_source = profiler_source_factory.create( - self.config.source.type.lower(), + profiler_source = self.import_profiler_interface()( self.config, database_entity, self.metadata, @@ -174,6 +176,14 @@ def get_table_names(self, schema_name: str) -> Iterable[str]: continue yield table_name + def import_profiler_interface(self) -> Type[ProfilerInterface]: + class_path = BaseSpec.get_for_source( + ServiceType.Database, + source_type=self.config.source.type.lower(), + ).profiler_class + profiler_source_class = import_from_module(class_path) + return cast(Type[ProfilerInterface], profiler_source_class) + def get_schema_names(self) -> Iterable[str]: if self.service_connection.__dict__.get("databaseSchema"): yield self.service_connection.databaseSchema @@ -206,9 +216,11 @@ def get_database_names(self) -> Iterable[str]: ) if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn - if self.source_config.useFqnForFiltering - else database, + ( + database_fqn + if self.source_config.useFqnForFiltering + else database + ), ): self.status.filter(database, "Database pattern not allowed") continue diff --git a/ingestion/src/metadata/utils/dispatch.py b/ingestion/src/metadata/utils/dispatch.py index ef802121a2b7..8795ef2e1a8a 100644 --- a/ingestion/src/metadata/utils/dispatch.py +++ b/ingestion/src/metadata/utils/dispatch.py @@ -14,9 +14,7 @@ """ from collections import namedtuple -from functools import update_wrapper -from types import MappingProxyType -from typing import Any, Callable, Type, TypeVar +from typing import Type, TypeVar from pydantic import BaseModel @@ -56,53 +54,3 @@ def inner(fn): Register = namedtuple("Register", ["add", "registry"]) return Register(add, registry) - - -def valuedispatch(func) -> Callable: - """Value dispatch for methods and functions - - Args: - func (_type_): function to run - - Returns: - Callable: wrapper - """ - - registry = {} - - def _is_valid_dispatch(value): - return isinstance(value, str) - - def dispatch(value: str) -> Callable: - try: - impl = registry[value] - except KeyError: - impl = registry[object] - return impl - - def register(value, func=None) -> Callable: - if _is_valid_dispatch(value): - if func is None: - return lambda f: register(value, f) - else: - raise TypeError( - "Invalid first argument to reigister()." f"{value} is not a string." - ) - - registry[value] = func - return func - - def wrapper(*args, **kwargs) -> Any: - if not args: - raise TypeError(f"{func_name} requires at least 1 argument") - if isinstance(args[0], (str, bytes)): - return dispatch(str(args[0]))(*args, **kwargs) - return dispatch(args[1])(*args, **kwargs) - - func_name = getattr(func, "__name__", "method value dispatch") - registry[object] = func - wrapper.register = register - wrapper.dispatch = dispatch - wrapper.registry = MappingProxyType(registry) # making registry read only - update_wrapper(wrapper, func) - return wrapper diff --git a/ingestion/src/metadata/utils/importer.py b/ingestion/src/metadata/utils/importer.py index 12c03eb2ecef..5686c09fdc15 100644 --- a/ingestion/src/metadata/utils/importer.py +++ b/ingestion/src/metadata/utils/importer.py @@ -25,7 +25,7 @@ ) from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink -from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage +from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Stage from metadata.utils.class_helper import get_service_type_from_source_type from metadata.utils.client_version import get_client_version from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX @@ -126,37 +126,21 @@ def import_from_module(key: str) -> Type[Any]: """ Dynamically import an object from a module path """ - + logger.debug("Importing: %s", key) module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1) try: obj = getattr(importlib.import_module(module_name), obj_name) return obj - except Exception as err: + except ModuleNotFoundError as err: logger.debug(traceback.format_exc()) raise DynamicImportException(module=module_name, key=obj_name, cause=err) -# module building strings read better with .format instead of f-strings -# pylint: disable=consider-using-f-string -def import_source_class( - service_type: ServiceType, source_type: str, from_: str = "ingestion" -) -> Type[Source]: - return import_from_module( - "metadata.{}.source.{}.{}.{}.{}Source".format( - from_, - service_type.name.lower(), - get_module_dir(source_type), - get_source_module_name(source_type), - get_class_name_root(source_type), - ) - ) - - def import_processor_class( processor_type: str, from_: str = "ingestion" ) -> Type[Processor]: return import_from_module( - "metadata.{}.processor.{}.{}Processor".format( + "metadata.{}.processor.{}.{}Processor".format( # pylint: disable=consider-using-f-string from_, get_module_name(processor_type), get_class_name_root(processor_type), @@ -166,7 +150,7 @@ def import_processor_class( def import_stage_class(stage_type: str, from_: str = "ingestion") -> Type[Stage]: return import_from_module( - "metadata.{}.stage.{}.{}Stage".format( + "metadata.{}.stage.{}.{}Stage".format( # pylint: disable=consider-using-f-string from_, get_module_name(stage_type), get_class_name_root(stage_type), @@ -176,7 +160,7 @@ def import_stage_class(stage_type: str, from_: str = "ingestion") -> Type[Stage] def import_sink_class(sink_type: str, from_: str = "ingestion") -> Type[Sink]: return import_from_module( - "metadata.{}.sink.{}.{}Sink".format( + "metadata.{}.sink.{}.{}Sink".format( # pylint: disable=consider-using-f-string from_, get_module_name(sink_type), get_class_name_root(sink_type), @@ -188,7 +172,7 @@ def import_bulk_sink_type( bulk_sink_type: str, from_: str = "ingestion" ) -> Type[BulkSink]: return import_from_module( - "metadata.{}.bulksink.{}.{}BulkSink".format( + "metadata.{}.bulksink.{}.{}BulkSink".format( # pylint: disable=consider-using-f-string from_, get_module_name(bulk_sink_type), get_class_name_root(bulk_sink_type), @@ -273,7 +257,7 @@ def import_test_case_class( test_definition[0].upper() + test_definition[1:] ) # change test names to camel case return import_from_module( - "metadata.data_quality.validations.{}.{}.{}.{}Validator".format( + "metadata.data_quality.validations.{}.{}.{}.{}Validator".format( # pylint: disable=consider-using-f-string test_type.lower(), runner_type, test_definition, @@ -302,3 +286,7 @@ def import_side_effects(self, *modules): def import_side_effects(*modules): SideEffectsLoader().import_side_effects(*modules) + + +def get_class_path(module): + return module.__module__ + "." + module.__name__ diff --git a/ingestion/src/metadata/utils/lru_cache.py b/ingestion/src/metadata/utils/lru_cache.py index bcbb78eb9434..c24db6111a4b 100644 --- a/ingestion/src/metadata/utils/lru_cache.py +++ b/ingestion/src/metadata/utils/lru_cache.py @@ -13,47 +13,108 @@ LRU cache """ +import threading from collections import OrderedDict +from typing import Callable, Generic, TypeVar LRU_CACHE_SIZE = 4096 +T = TypeVar("T") -class LRUCache: + +class LRUCache(Generic[T]): """Least Recently Used cache""" def __init__(self, capacity: int) -> None: self._cache = OrderedDict() self.capacity = capacity + self.lock = threading.Lock() def clear(self): - self._cache = OrderedDict() + with self.lock: + self._cache = OrderedDict() - def get(self, key): + def get(self, key) -> T: """ Returns the value associated to `key` if it exists, updating the cache usage. Raises `KeyError` if `key doesn't exist in the cache. + + Args: + key: The key to get the value for + + Returns: + The value associated to `key` """ - self._cache.move_to_end(key) - return self._cache[key] + with self.lock: + self._cache.move_to_end(key) + return self._cache[key] - def put(self, key, value) -> None: + def put(self, key: str, value: T) -> None: """ Assigns `value` to `key`, overwriting `key` if it already exists in the cache and updating the cache usage. If the size of the cache grows above capacity, pops the least used element. + + Args: + key: The key to assign the value to + value: The value to assign to the key """ - self._cache[key] = value - self._cache.move_to_end(key) - if len(self._cache) > self.capacity: - self._cache.popitem(last=False) + with self.lock: + self._cache[key] = value + self._cache.move_to_end(key) + if len(self._cache) > self.capacity: + self._cache.popitem(last=False) def __contains__(self, key) -> bool: - if key not in self._cache: - return False - self._cache.move_to_end(key) - return True + with self.lock: + if key not in self._cache: + return False + self._cache.move_to_end(key) + return True def __len__(self) -> int: - return len(self._cache) + with self.lock: + return len(self._cache) + + def wrap(self, key_func: Callable[..., str]): + """Decorator to cache the result of a function based on its arguments. + + Example: + ```python + import time + from metadata.utils.lru_cache import LRUCache + cache = LRUCache(4096) + + @cache.wrap(lambda x, y: f"{x}-{y}") + def add(x, y): + time.sleep(1) + return x + y + start1 = time.time() + add(1, 2) # This will be cached and take 1 second + print('took', time.time() - start1, 'seconds') + start2 = time.time() + add(1, 2) # This will return the cached value and take no time + print('took', time.time() - start2, 'seconds') + ``` + Args: + key_func: A function that generates a key based on the arguments + of the decorated function. + + Returns: + A decorator that caches the result of the decorated function. + """ + + def wrapper(func: Callable[..., T]): + def wrapped(*args, **kwargs) -> T: + key = key_func(*args, **kwargs) + if key in self: + return self.get(key) + value = func(*args, **kwargs) + self.put(key, value) + return value + + return wrapped + + return wrapper diff --git a/ingestion/src/metadata/utils/service_spec/__init__.py b/ingestion/src/metadata/utils/service_spec/__init__.py new file mode 100644 index 000000000000..6543fd660cab --- /dev/null +++ b/ingestion/src/metadata/utils/service_spec/__init__.py @@ -0,0 +1,5 @@ +"""Module for the OpenMetadat Ingestion Service Specification (ServiceSpec)""" + +from metadata.utils.service_spec.service_spec import BaseSpec + +__all__ = ["BaseSpec"] diff --git a/ingestion/src/metadata/utils/service_spec/default.py b/ingestion/src/metadata/utils/service_spec/default.py new file mode 100644 index 000000000000..92558a1409a4 --- /dev/null +++ b/ingestion/src/metadata/utils/service_spec/default.py @@ -0,0 +1,15 @@ +""" +Default service specs for services. +""" + +from typing import Optional + +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.utils.importer import get_class_path +from metadata.utils.service_spec.service_spec import BaseSpec + + +class DefaultDatabaseSpec(BaseSpec): + profiler_class: Optional[str] = get_class_path(SQAProfilerInterface) diff --git a/ingestion/src/metadata/utils/service_spec/service_spec.py b/ingestion/src/metadata/utils/service_spec/service_spec.py new file mode 100644 index 000000000000..a401ba0e8154 --- /dev/null +++ b/ingestion/src/metadata/utils/service_spec/service_spec.py @@ -0,0 +1,99 @@ +""" +Manifests are used to store class information +""" + +from typing import Optional, Type, cast + +from pydantic import model_validator + +from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.ingestion.api.steps import Source +from metadata.ingestion.models.custom_pydantic import BaseModel +from metadata.utils.importer import ( + TYPE_SEPARATOR, + get_class_path, + get_module_dir, + import_from_module, +) +from metadata.utils.logger import utils_logger + +logger = utils_logger() + + +class BaseSpec(BaseModel): + """ + # The OpenMetadata Ingestion Service Specification (Spec) + + This is the API for defining a service in OpenMetadata it needs to be in the classpath of the connector in + the form: + + metadata.ingestion.source.{service_type}.{service_name}.service_spec.ServiceSpec + + Example for postres: + + metadata.ingestion.source.database.postgres.service_spec.ServiceSpec + + You can supply either strings with the full classpath or concrete classes that will be converted to strings. + + The use of strings for the values gives us a few advantages: + 1. manifests can be defined using json/yaml and deserialized into this class. + 2. We can dynamically import the class when needed and avoid dependency issues. + 3. We avoid circular imports. + 4. We can hot-swap the class implementation without changing the manifest (example: for testing). + """ + + profiler_class: Optional[str] = None + metadata_source_class: str + lineage_source_class: Optional[str] = None + usage_source_class: Optional[str] = None + + @model_validator(mode="before") + @classmethod + def transform_fields(cls, values): + """This allows us to pass in the class directly instead of the string representation of the class. The + validator will convert the class to a string representation of the class.""" + for field in list(cls.model_fields.keys()): + if isinstance(values.get(field), type): + values[field] = get_class_path(values[field]) + return values + + @classmethod + def get_for_source( + cls, service_type: ServiceType, source_type: str, from_: str = "ingestion" + ) -> "BaseSpec": + """Retrieves the manifest for a given source type. If it does not exist will attempt to retrieve + a default manifest for the service type. + + Args: + service_type (ServiceType): The service type. + source_type (str): The source type. + from_ (str, optional): The module to import from. Defaults to "ingestion". + + Returns: + BaseSpec: The manifest for the source type. + """ + return cls.model_validate( + import_from_module( + "metadata.{}.source.{}.{}.{}.ServiceSpec".format( # pylint: disable=C0209 + from_, + service_type.name.lower(), + get_module_dir(source_type), + "service_spec", + ) + ) + ) + + +def import_source_class( + service_type: ServiceType, source_type: str, from_: str = "ingestion" +) -> Type[Source]: + source_class_type = source_type.split(TYPE_SEPARATOR)[-1] + if source_class_type in ["usage", "lineage"]: + field = f"{source_class_type}_source_class" + else: + field = "metadata_source_class" + spec = BaseSpec.get_for_source(service_type, source_type, from_) + return cast( + Type[Source], + import_from_module(spec.model_dump()[field]), + ) diff --git a/ingestion/src/metadata/workflow/ingestion.py b/ingestion/src/metadata/workflow/ingestion.py index 1e28f5013178..1133304a8e7d 100644 --- a/ingestion/src/metadata/workflow/ingestion.py +++ b/ingestion/src/metadata/workflow/ingestion.py @@ -48,9 +48,9 @@ DynamicImportException, MissingPluginException, import_from_module, - import_source_class, ) from metadata.utils.logger import ingestion_logger +from metadata.utils.service_spec.service_spec import import_source_class from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException logger = ingestion_logger() diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 332bc852e0be..ec6aa8f07b40 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -13,7 +13,8 @@ Test database connectors with CLI """ from abc import abstractmethod -from typing import List, Optional +from datetime import datetime +from typing import List, Optional, Tuple from unittest import TestCase import pytest @@ -21,7 +22,7 @@ from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.data_quality.api.models import TestCaseDefinition -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import SystemProfile, Table from metadata.generated.schema.tests.basic import TestCaseResult from metadata.generated.schema.tests.testCase import TestCase as OMTestCase from metadata.ingestion.api.status import Status @@ -72,6 +73,7 @@ def test_create_table_with_profiler(self) -> None: result = self.run_command("profile") sink_status, source_status = self.retrieve_statuses(result) self.assert_for_table_with_profiler(source_status, sink_status) + self.system_profile_assertions() @pytest.mark.order(3) def test_delete_table_is_marked_as_deleted(self) -> None: @@ -416,3 +418,34 @@ def get_expected_test_case_results(self) -> List[TestCaseResult]: def assert_status_for_data_quality(self, source_status, sink_status): pass + + def system_profile_assertions(self): + cases = self.get_system_profile_cases() + if not cases: + return + for table_fqn, expected_profile in cases: + actual_profiles = self.openmetadata.get_profile_data( + table_fqn, + start_ts=int((datetime.now().timestamp() - 600) * 1000), + end_ts=int(datetime.now().timestamp() * 1000), + profile_type=SystemProfile, + ).entities + actual_profiles = sorted( + actual_profiles, key=lambda x: x.timestamp.root + ) + actual_profiles = actual_profiles[-len(expected_profile) :] + assert len(expected_profile) == len(actual_profiles) + for expected, actual in zip(expected_profile, actual_profiles): + try: + assert_equal_pydantic_objects( + expected.model_copy(update={"timestamp": actual.timestamp}), + actual, + ) + except AssertionError as e: + raise AssertionError( + f"System metrics profile did not return exepcted results for table: {table_fqn}" + ) from e + + def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]: + """Return a list of tuples with the table fqn and the expected system profile""" + return [] diff --git a/ingestion/tests/cli_e2e/test_cli_bigquery.py b/ingestion/tests/cli_e2e/test_cli_bigquery.py index 4396b762f066..77d333902982 100644 --- a/ingestion/tests/cli_e2e/test_cli_bigquery.py +++ b/ingestion/tests/cli_e2e/test_cli_bigquery.py @@ -14,6 +14,9 @@ """ from typing import List +from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.generated.schema.type.basic import Timestamp + from .common.test_cli_db import CliCommonDB from .common_e2e_sqa_mixins import SQACommonMethods @@ -33,8 +36,8 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): """ insert_data_queries: List[str] = [ - "INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES (1,'XBOX');", - "INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES (2,'PS');", + "INSERT INTO `open-metadata-beta.exclude_me`.orders (id, order_name) VALUES (1,'XBOX'), (2,'PS');", + "UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2", ] drop_table_query: str = """ @@ -126,3 +129,22 @@ def update_queries() -> List[str]: UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2 """, ] + + def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]: + return [ + ( + "local_bigquery.open-metadata-beta.exclude_me.orders", + [ + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.UPDATE, + rowsAffected=1, + ), + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=2, + ), + ], + ) + ] diff --git a/ingestion/tests/cli_e2e/test_cli_redshift.py b/ingestion/tests/cli_e2e/test_cli_redshift.py index 405dfe3e9d24..aa11e10f3102 100644 --- a/ingestion/tests/cli_e2e/test_cli_redshift.py +++ b/ingestion/tests/cli_e2e/test_cli_redshift.py @@ -12,9 +12,10 @@ """ Redshift E2E tests """ +from typing import List, Tuple -from typing import List - +from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.generated.schema.type.basic import Timestamp from metadata.ingestion.api.status import Status from .common.test_cli_db import CliCommonDB @@ -232,3 +233,17 @@ def update_queries() -> List[str]: UPDATE e2e_cli_tests.dbt_jaffle.persons SET full_name = 'Bruce Wayne' WHERE person_id = 3 """, ] + + def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]: + return [ + ( + "e2e_redshift.e2e_cli_tests.dbt_jaffle.persons", + [ + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=6, + ) + ], + ) + ] diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index 70eb1547dccf..7258882878dd 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -14,11 +14,10 @@ """ from datetime import datetime from time import sleep -from typing import List +from typing import List, Tuple import pytest -from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus from metadata.generated.schema.tests.testCase import TestCaseParameterValue @@ -167,7 +166,7 @@ def test_create_table_with_profiler(self) -> None: result = self.run_command("profile") sink_status, source_status = self.retrieve_statuses(result) self.assert_for_table_with_profiler(source_status, sink_status) - self.custom_profiler_assertions() + self.system_profile_assertions() @staticmethod def expected_tables() -> int: @@ -233,8 +232,8 @@ def update_queries() -> List[str]: """, ] - def custom_profiler_assertions(self): - cases = [ + def get_system_profile_cases(self) -> List[Tuple[str, List[SystemProfile]]]: + return [ ( "e2e_snowflake.E2E_DB.E2E_TEST.E2E_TABLE", [ @@ -286,22 +285,6 @@ def custom_profiler_assertions(self): ], ), ] - for table_fqn, expected_profile in cases: - actual_profiles = self.openmetadata.get_profile_data( - table_fqn, - start_ts=int((datetime.now().timestamp() - 600) * 1000), - end_ts=int(datetime.now().timestamp() * 1000), - profile_type=SystemProfile, - ).entities - actual_profiles = sorted(actual_profiles, key=lambda x: x.timestamp.root) - actual_profiles = actual_profiles[-len(expected_profile) :] - actual_profiles = [ - p.copy(update={"timestamp": Timestamp(root=0)}) for p in actual_profiles - ] - try: - assert_equal_pydantic_objects(expected_profile, actual_profiles) - except AssertionError as e: - raise AssertionError(f"Table: {table_fqn}\n{e}") @classmethod def wait_for_query_log(cls, timeout=600): diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py index b6e6609ca342..fd2c9ca68cc3 100644 --- a/ingestion/tests/integration/postgres/test_lineage.py +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -5,6 +5,9 @@ import pytest from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( + DatabaseLineageConfigType, +) from metadata.ingestion.lineage.sql_lineage import search_cache from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.metadata import MetadataWorkflow @@ -19,7 +22,9 @@ def native_lineage_config(db_service, workflow_config, sink_config): "source": { "type": "postgres-lineage", "serviceName": db_service.fullyQualifiedName.root, - "sourceConfig": {"config": {}}, + "sourceConfig": { + "config": {"type": DatabaseLineageConfigType.DatabaseLineage.value} + }, }, "sink": sink_config, "workflowConfig": workflow_config, @@ -39,6 +44,7 @@ def native_lineage_config(db_service, workflow_config, sink_config): ), ) def test_native_lineage( + patch_passwords_for_db_services, source_config, expected_nodes, run_workflow, diff --git a/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py b/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py index 940a01174635..9c31d5ffca56 100644 --- a/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py +++ b/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py @@ -34,6 +34,7 @@ ) from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.metrics.system.system import SystemMetricsComputer from metadata.profiler.orm.functions.sum import SumFn from metadata.profiler.processor.core import Profiler @@ -928,11 +929,7 @@ def test_sum_function(self): assert res == 61 def test_system_metric(self): - system = add_props(table=User, ometa_client=None, db_service=None)( - Metrics.SYSTEM.value - ) - session = self.sqa_profiler_interface.session - system().sql(session) + assert SystemMetricsComputer().get_system_metrics() == [] def test_table_custom_metric(self): table_entity = Table( diff --git a/ingestion/tests/unit/profiler/test_profiler_interface.py b/ingestion/tests/unit/profiler/test_profiler_interface.py index 2c3026da0b86..4392914f021a 100644 --- a/ingestion/tests/unit/profiler/test_profiler_interface.py +++ b/ingestion/tests/unit/profiler/test_profiler_interface.py @@ -30,31 +30,6 @@ DataStorageConfig, SampleDataStorageConfig, ) -from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( - BigQueryConnection, -) -from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( - DatabricksConnection, -) -from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( - DatalakeConnection, -) -from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( - MariaDBConnection, -) -from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( - SingleStoreConnection, -) -from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( - SnowflakeConnection, -) -from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( - TrinoConnection, -) -from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( - UnityCatalogConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) @@ -65,37 +40,7 @@ ProfileSampleConfig, TableConfig, ) -from metadata.profiler.interface.pandas.profiler_interface import ( - PandasProfilerInterface, -) from metadata.profiler.interface.profiler_interface import ProfilerInterface -from metadata.profiler.interface.profiler_interface_factory import ( - ProfilerInterfaceFactory, -) -from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( - BigQueryProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.databricks.profiler_interface import ( - DatabricksProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.mariadb.profiler_interface import ( - MariaDBProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.profiler_interface import ( - SQAProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.single_store.profiler_interface import ( - SingleStoreProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import ( - SnowflakeProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.trino.profiler_interface import ( - TrinoProfilerInterface, -) -from metadata.profiler.interface.sqlalchemy.unity_catalog.profiler_interface import ( - UnityCatalogProfilerInterface, -) class ProfilerInterfaceTest(TestCase): @@ -359,32 +304,3 @@ def test_table_config_casting(self): schema_config, table_fqn="demo" ), ) - - def test_register_many(self): - # Initialize factory - factory = ProfilerInterfaceFactory() - - # Define profiles dictionary - profiles = { - DatabaseConnection.__name__: SQAProfilerInterface, - BigQueryConnection.__name__: BigQueryProfilerInterface, - SingleStoreConnection.__name__: SingleStoreProfilerInterface, - DatalakeConnection.__name__: PandasProfilerInterface, - SnowflakeConnection.__name__: SnowflakeProfilerInterface, - TrinoConnection.__name__: TrinoProfilerInterface, - UnityCatalogConnection.__name__: UnityCatalogProfilerInterface, - DatabricksConnection.__name__: DatabricksProfilerInterface, - MariaDBConnection.__name__: MariaDBProfilerInterface, - } - - # Register profiles - factory.register_many(profiles) - - # Assert all expected interfaces are registered - expected_interfaces = set(profiles.keys()) - actual_interfaces = set(factory._interface_type.keys()) - assert expected_interfaces == actual_interfaces - - # Assert profiler classes match registered interfaces - for interface_type, interface_class in profiles.items(): - assert factory._interface_type[interface_type] == interface_class diff --git a/ingestion/tests/unit/profiler/test_profiler_interface_factory.py b/ingestion/tests/unit/profiler/test_profiler_interface_factory.py deleted file mode 100644 index 38a6263b0984..000000000000 --- a/ingestion/tests/unit/profiler/test_profiler_interface_factory.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Factory class for creating profiler interface objects -""" - -from typing import Dict -from unittest import TestCase - -from metadata.profiler.interface.profiler_interface_factory import ( - profiler_class_mapping, -) - - -class TestProfilerClassMapping(TestCase): - def setUp(self): - self.expected_mapping: Dict[str, str] = { - "DatabaseConnection": "metadata.profiler.interface.sqlalchemy.profiler_interface.SQAProfilerInterface", - "BigQueryConnection": "metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface.BigQueryProfilerInterface", - "SingleStoreConnection": "metadata.profiler.interface.sqlalchemy.single_store.profiler_interface.SingleStoreProfilerInterface", - "DatalakeConnection": "metadata.profiler.interface.pandas.profiler_interface.PandasProfilerInterface", - "MariaDBConnection": "metadata.profiler.interface.sqlalchemy.mariadb.profiler_interface.MariaDBProfilerInterface", - "SnowflakeConnection": "metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface.SnowflakeProfilerInterface", - "TrinoConnection": "metadata.profiler.interface.sqlalchemy.trino.profiler_interface.TrinoProfilerInterface", - "UnityCatalogConnection": "metadata.profiler.interface.sqlalchemy.unity_catalog.profiler_interface.UnityCatalogProfilerInterface", - "DatabricksConnection": "metadata.profiler.interface.sqlalchemy.databricks.profiler_interface.DatabricksProfilerInterface", - "Db2Connection": "metadata.profiler.interface.sqlalchemy.db2.profiler_interface.DB2ProfilerInterface", - "MongoDBConnection": "metadata.profiler.interface.nosql.profiler_interface.NoSQLProfilerInterface", - "DynamoDBConnection": "metadata.profiler.interface.nosql.profiler_interface.NoSQLProfilerInterface", - } - - def test_profiler_class_mapping(self): - self.assertEqual(len(profiler_class_mapping), len(self.expected_mapping)) - self.assertEqual(profiler_class_mapping, self.expected_mapping) diff --git a/ingestion/tests/unit/test_importer.py b/ingestion/tests/unit/test_importer.py index 522d8aa5a735..ce0ccb20b945 100644 --- a/ingestion/tests/unit/test_importer.py +++ b/ingestion/tests/unit/test_importer.py @@ -19,7 +19,6 @@ ) from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.utils.importer import ( - DynamicImportException, get_class_name_root, get_module_name, get_source_module_name, @@ -28,9 +27,9 @@ import_from_module, import_processor_class, import_sink_class, - import_source_class, import_stage_class, ) +from metadata.utils.service_spec.service_spec import import_source_class # pylint: disable=import-outside-toplevel @@ -61,12 +60,6 @@ def test_import_class(self) -> None: ) def test_import_source_class(self) -> None: - from metadata.ingestion.source.database.bigquery.lineage import ( - BigqueryLineageSource, - ) - from metadata.ingestion.source.database.bigquery.usage import ( - BigqueryUsageSource, - ) from metadata.ingestion.source.database.mysql.metadata import MysqlSource self.assertEqual( @@ -74,20 +67,6 @@ def test_import_source_class(self) -> None: MysqlSource, ) - self.assertEqual( - import_source_class( - service_type=ServiceType.Database, source_type="bigquery-lineage" - ), - BigqueryLineageSource, - ) - - self.assertEqual( - import_source_class( - service_type=ServiceType.Database, source_type="bigquery-usage" - ), - BigqueryUsageSource, - ) - def test_import_processor_class(self) -> None: from metadata.ingestion.processor.query_parser import QueryParserProcessor @@ -126,7 +105,7 @@ def test_import_get_connection(self) -> None: self.assertIsNotNone(get_connection_fn) self.assertRaises( - DynamicImportException, + AttributeError, import_connection_fn, connection=connection, function_name="random", diff --git a/ingestion/tests/unit/utils/test_service_spec.py b/ingestion/tests/unit/utils/test_service_spec.py new file mode 100644 index 000000000000..9c0588e7565c --- /dev/null +++ b/ingestion/tests/unit/utils/test_service_spec.py @@ -0,0 +1,16 @@ +from metadata.ingestion.source.database.mysql.metadata import MysqlSource +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.utils.importer import get_class_path +from metadata.utils.service_spec import BaseSpec +from metadata.utils.service_spec.default import DefaultDatabaseSpec + + +def test_service_spec(): + spec = BaseSpec(metadata_source_class=MysqlSource) + assert spec.metadata_source_class == get_class_path(MysqlSource) + + spec = DefaultDatabaseSpec(metadata_source_class=MysqlSource) + assert spec.metadata_source_class == get_class_path(MysqlSource) + assert spec.profiler_class == get_class_path(SQAProfilerInterface) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md index d08ab34f29e1..dc7cbedc551f 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md @@ -26,6 +26,12 @@ From the Service Topology you can understand what methods you need to implement: Can be found in [`ingestion/src/metadata/ingestion/source/database/database_service.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/source/database/database_service.py) +{%inlineCallout icon="description" bold="OpenMetadata 1.6.0 or later" href="/deployment"%} +Starting from 1.6.0 the OpenMetadata Ingestion Framewotk is using a ServiceSpec specificaiton +in order to define the entrypoints for the ingestion process. +{%/inlineCallout%} + + ```python class DatabaseServiceTopology(ServiceTopology): """ diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/releases/releases/index.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/releases/releases/index.md index d0cb9071fa32..70c96bcc0b64 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/releases/releases/index.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/releases/releases/index.md @@ -14,6 +14,13 @@ version. To see what's coming in next releases, please check our [Roadmap](/rele {% partial file="/v1.5/releases/latest.md" /%} +# 1.6.0 + +## Breaking Changes + +- The ingestion Framework now uses the OpenMetadata Ingestion Service Specificaiton (OMISS) to specify +enrtypoints to ingestion operations. [Click here](./todo-need-link) for more info. + # 1.5.6 Release {% note noteType="Tip" %}