-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[GEN-356] Use ServiceSpec for loading sources based on connectors (#1…
…8322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
- Loading branch information
Showing
116 changed files
with
1,453 additions
and
809 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/api/rest/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.api.rest.metadata import RestSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=RestSource) |
6 changes: 6 additions & 0 deletions
6
ingestion/src/metadata/ingestion/source/dashboard/domodashboard/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from metadata.ingestion.source.dashboard.domodashboard.metadata import ( | ||
DomodashboardSource, | ||
) | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=DomodashboardSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/lightdash/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.lightdash.metadata import LightdashSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=LightdashSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/looker/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=LookerSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/metabase/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=MetabaseSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/mode/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.mode.metadata import ModeSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=ModeSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/mstr/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.mstr.metadata import MstrSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=MstrSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/powerbi/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=PowerbiSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.qlikcloud.metadata import QlikcloudSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=QlikcloudSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/qliksense/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=QliksenseSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/quicksight/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=QuicksightSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/redash/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.redash.metadata import RedashSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=RedashSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/sigma/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.sigma.metadata import SigmaSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=SigmaSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/superset/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=SupersetSource) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/dashboard/tableau/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.dashboard.tableau.metadata import TableauSource | ||
from metadata.utils.service_spec import BaseSpec | ||
|
||
ServiceSpec = BaseSpec(metadata_source_class=TableauSource) |
10 changes: 10 additions & 0 deletions
10
ingestion/src/metadata/ingestion/source/database/athena/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from metadata.ingestion.source.database.athena.lineage import AthenaLineageSource | ||
from metadata.ingestion.source.database.athena.metadata import AthenaSource | ||
from metadata.ingestion.source.database.athena.usage import AthenaUsageSource | ||
from metadata.utils.service_spec.default import DefaultDatabaseSpec | ||
|
||
ServiceSpec = DefaultDatabaseSpec( | ||
metadata_source_class=AthenaSource, | ||
lineage_source_class=AthenaLineageSource, | ||
usage_source_class=AthenaUsageSource, | ||
) |
10 changes: 10 additions & 0 deletions
10
ingestion/src/metadata/ingestion/source/database/azuresql/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from metadata.ingestion.source.database.azuresql.lineage import AzuresqlLineageSource | ||
from metadata.ingestion.source.database.azuresql.metadata import AzuresqlSource | ||
from metadata.ingestion.source.database.azuresql.usage import AzuresqlUsageSource | ||
from metadata.utils.service_spec.default import DefaultDatabaseSpec | ||
|
||
ServiceSpec = DefaultDatabaseSpec( | ||
metadata_source_class=AzuresqlSource, | ||
lineage_source_class=AzuresqlLineageSource, | ||
usage_source_class=AzuresqlUsageSource, | ||
) |
Empty file.
29 changes: 29 additions & 0 deletions
29
ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from typing import List, Type | ||
|
||
from metadata.generated.schema.entity.data.table import SystemProfile | ||
from metadata.ingestion.source.database.bigquery.profiler.system import ( | ||
BigQuerySystemMetricsComputer, | ||
) | ||
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( | ||
BigQueryProfilerInterface, | ||
) | ||
from metadata.profiler.metrics.system.system import System | ||
from metadata.profiler.processor.runner import QueryRunner | ||
|
||
|
||
class BigQueryProfiler(BigQueryProfilerInterface): | ||
def _compute_system_metrics( | ||
self, | ||
metrics: Type[System], | ||
runner: QueryRunner, | ||
*args, | ||
**kwargs, | ||
) -> List[SystemProfile]: | ||
return self.system_metrics_computer.get_system_metrics( | ||
runner.table, self.service_connection_config | ||
) | ||
|
||
def initialize_system_metrics_computer( | ||
self, **kwargs | ||
) -> BigQuerySystemMetricsComputer: | ||
return BigQuerySystemMetricsComputer(session=self.session) |
161 changes: 161 additions & 0 deletions
161
ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
from typing import List | ||
|
||
from pydantic import TypeAdapter | ||
from sqlalchemy.orm import DeclarativeMeta | ||
|
||
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile | ||
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( | ||
BigQueryConnection, | ||
) | ||
from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult | ||
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations | ||
from metadata.profiler.metrics.system.system import ( | ||
CacheProvider, | ||
EmptySystemMetricsSource, | ||
SQASessionProvider, | ||
SystemMetricsComputer, | ||
) | ||
from metadata.utils.logger import profiler_logger | ||
from metadata.utils.time_utils import datetime_to_timestamp | ||
|
||
logger = profiler_logger() | ||
|
||
|
||
class BigQuerySystemMetricsSource( | ||
SQASessionProvider, EmptySystemMetricsSource, CacheProvider | ||
): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def get_kwargs( | ||
self, | ||
table: DeclarativeMeta, | ||
service_connection: BigQueryConnection, | ||
*args, | ||
**kwargs, | ||
): | ||
return { | ||
"table": table.__table__.name, | ||
"dataset_id": table.__table_args__["schema"], | ||
"project_id": super().get_session().get_bind().url.host, | ||
"usage_location": service_connection.usageLocation, | ||
} | ||
|
||
def get_deletes( | ||
self, table: str, project_id: str, usage_location: str, dataset_id: str | ||
) -> List[SystemProfile]: | ||
return self.get_system_profile( | ||
project_id, | ||
dataset_id, | ||
table, | ||
list( | ||
self.get_queries_by_operation( | ||
usage_location, | ||
project_id, | ||
dataset_id, | ||
[ | ||
DatabaseDMLOperations.DELETE, | ||
], | ||
) | ||
), | ||
"deleted_row_count", | ||
DmlOperationType.DELETE, | ||
) | ||
|
||
def get_updates( | ||
self, table: str, project_id: str, usage_location: str, dataset_id: str | ||
) -> List[SystemProfile]: | ||
return self.get_system_profile( | ||
project_id, | ||
dataset_id, | ||
table, | ||
self.get_queries_by_operation( | ||
usage_location, | ||
project_id, | ||
dataset_id, | ||
[ | ||
DatabaseDMLOperations.UPDATE, | ||
DatabaseDMLOperations.MERGE, | ||
], | ||
), | ||
"updated_row_count", | ||
DmlOperationType.UPDATE, | ||
) | ||
|
||
def get_inserts( | ||
self, table: str, project_id: str, usage_location: str, dataset_id: str | ||
) -> List[SystemProfile]: | ||
return self.get_system_profile( | ||
project_id, | ||
dataset_id, | ||
table, | ||
self.get_queries_by_operation( | ||
usage_location, | ||
project_id, | ||
dataset_id, | ||
[ | ||
DatabaseDMLOperations.INSERT, | ||
DatabaseDMLOperations.MERGE, | ||
], | ||
), | ||
"inserted_row_count", | ||
DmlOperationType.INSERT, | ||
) | ||
|
||
def get_queries_by_operation( | ||
self, | ||
usage_location: str, | ||
project_id: str, | ||
dataset_id: str, | ||
operations: List[DatabaseDMLOperations], | ||
) -> List[BigQueryQueryResult]: | ||
ops = {op.value for op in operations} | ||
yield from ( | ||
query | ||
for query in self.get_queries(usage_location, project_id, dataset_id) | ||
if query.statement_type in ops | ||
) | ||
|
||
def get_queries( | ||
self, usage_location: str, project_id: str, dataset_id: str | ||
) -> List[BigQueryQueryResult]: | ||
return self.get_or_update_cache( | ||
f"{project_id}.{dataset_id}", | ||
BigQueryQueryResult.get_for_table, | ||
session=super().get_session(), | ||
usage_location=usage_location, | ||
project_id=project_id, | ||
dataset_id=dataset_id, | ||
) | ||
|
||
@staticmethod | ||
def get_system_profile( | ||
project_id: str, | ||
dataset_id: str, | ||
table: str, | ||
query_results: List[BigQueryQueryResult], | ||
rows_affected_field: str, | ||
operation: DmlOperationType, | ||
) -> List[SystemProfile]: | ||
if not BigQueryQueryResult.model_fields.get(rows_affected_field): | ||
raise ValueError( | ||
f"rows_affected_field [{rows_affected_field}] is not a valid field in BigQueryQueryResult." | ||
) | ||
return TypeAdapter(List[SystemProfile]).validate_python( | ||
[ | ||
{ | ||
"timestamp": datetime_to_timestamp(q.start_time, milliseconds=True), | ||
"operation": operation, | ||
"rowsAffected": getattr(q, rows_affected_field), | ||
} | ||
for q in query_results | ||
if getattr(q, rows_affected_field) > 0 | ||
and q.project_id == project_id | ||
and q.dataset_id == dataset_id | ||
and q.table_name == table | ||
] | ||
) | ||
|
||
|
||
class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
14 changes: 14 additions & 0 deletions
14
ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource | ||
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource | ||
from metadata.ingestion.source.database.bigquery.profiler.profiler import ( | ||
BigQueryProfiler, | ||
) | ||
from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource | ||
from metadata.utils.service_spec.default import DefaultDatabaseSpec | ||
|
||
ServiceSpec = DefaultDatabaseSpec( | ||
metadata_source_class=BigquerySource, | ||
lineage_source_class=BigqueryLineageSource, | ||
usage_source_class=BigqueryUsageSource, | ||
profiler_class=BigQueryProfiler, | ||
) |
4 changes: 4 additions & 0 deletions
4
ingestion/src/metadata/ingestion/source/database/bigtable/service_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from metadata.ingestion.source.database.bigtable.metadata import BigtableSource | ||
from metadata.utils.service_spec.default import DefaultDatabaseSpec | ||
|
||
ServiceSpec = DefaultDatabaseSpec(metadata_source_class=BigtableSource) |
Oops, something went wrong.