Skip to content

Commit

Permalink
[GEN-356] Use ServiceSpec for loading sources based on connectors (#1…
Browse files Browse the repository at this point in the history
…8322)

* ref(profiler): use di for system profile

- use source classes that can be overridden in system profiles
- use a manifest class instead of factory to specify which class to resolve for connectors
- example usage can be seen in redshift and snowflake

* - added manifests for all custom profilers
- used super() dependency injection in order for system metrics source
- formatting

* - implement spec for all source types
- added docs for the new specification
- added some pylint ignores in the importer module

* remove TYPE_CHECKING in core.py

* - deleted valuedispatch function
- deleted get_system_metrics_by_dialect
- implemented BigQueryProfiler with a system metrics source
- moved import_source_class to BaseSpec

* - removed tests related to the profiler factory

* - reverted start_time
- removed DML_STAT_TO_DML_STATEMENT_MAPPING
- removed unused logger

* - reverted start_time
- removed DML_STAT_TO_DML_STATEMENT_MAPPING
- removed unused logger

* fixed tests

* format

* bigquery system profile e2e tests

* fixed module docstring

* - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps.
- removed leftover methods

* - tests for BaseSpec
- moved get_class_path to importer

* - moved constructors around to get rid of useless kwargs

* - changed test_system_metric

* - added linage and usage to service_spec
- fixed postgres native lineage test

* add comments on collaborative constructors
  • Loading branch information
sushi30 authored and harshach committed Nov 3, 2024
1 parent 5a7d248 commit ec7cd4e
Show file tree
Hide file tree
Showing 116 changed files with 1,453 additions and 809 deletions.
2 changes: 1 addition & 1 deletion ingestion/src/metadata/data_quality/source/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import import_source_class
from metadata.utils.logger import test_suite_logger
from metadata.utils.service_spec.service_spec import import_source_class

logger = test_suite_logger()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.api.rest.metadata import RestSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=RestSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from metadata.ingestion.source.dashboard.domodashboard.metadata import (
DomodashboardSource,
)
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=DomodashboardSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.lightdash.metadata import LightdashSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=LightdashSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=LookerSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=MetabaseSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.mode.metadata import ModeSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=ModeSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.mstr.metadata import MstrSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=MstrSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=PowerbiSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.qlikcloud.metadata import QlikcloudSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QlikcloudSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QliksenseSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QuicksightSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.redash.metadata import RedashSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=RedashSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.sigma.metadata import SigmaSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=SigmaSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=SupersetSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.tableau.metadata import TableauSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=TableauSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from metadata.ingestion.source.database.athena.lineage import AthenaLineageSource
from metadata.ingestion.source.database.athena.metadata import AthenaSource
from metadata.ingestion.source.database.athena.usage import AthenaUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=AthenaSource,
lineage_source_class=AthenaLineageSource,
usage_source_class=AthenaUsageSource,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from metadata.ingestion.source.database.azuresql.lineage import AzuresqlLineageSource
from metadata.ingestion.source.database.azuresql.metadata import AzuresqlSource
from metadata.ingestion.source.database.azuresql.usage import AzuresqlUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=AzuresqlSource,
lineage_source_class=AzuresqlLineageSource,
usage_source_class=AzuresqlUsageSource,
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List, Type

from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.source.database.bigquery.profiler.system import (
BigQuerySystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner


class BigQueryProfiler(BigQueryProfilerInterface):
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
return self.system_metrics_computer.get_system_metrics(
runner.table, self.service_connection_config
)

def initialize_system_metrics_computer(
self, **kwargs
) -> BigQuerySystemMetricsComputer:
return BigQuerySystemMetricsComputer(session=self.session)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from typing import List

from pydantic import TypeAdapter
from sqlalchemy.orm import DeclarativeMeta

from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.utils.logger import profiler_logger
from metadata.utils.time_utils import datetime_to_timestamp

logger = profiler_logger()


class BigQuerySystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get_kwargs(
self,
table: DeclarativeMeta,
service_connection: BigQueryConnection,
*args,
**kwargs,
):
return {
"table": table.__table__.name,
"dataset_id": table.__table_args__["schema"],
"project_id": super().get_session().get_bind().url.host,
"usage_location": service_connection.usageLocation,
}

def get_deletes(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
list(
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.DELETE,
],
)
),
"deleted_row_count",
DmlOperationType.DELETE,
)

def get_updates(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.UPDATE,
DatabaseDMLOperations.MERGE,
],
),
"updated_row_count",
DmlOperationType.UPDATE,
)

def get_inserts(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.INSERT,
DatabaseDMLOperations.MERGE,
],
),
"inserted_row_count",
DmlOperationType.INSERT,
)

def get_queries_by_operation(
self,
usage_location: str,
project_id: str,
dataset_id: str,
operations: List[DatabaseDMLOperations],
) -> List[BigQueryQueryResult]:
ops = {op.value for op in operations}
yield from (
query
for query in self.get_queries(usage_location, project_id, dataset_id)
if query.statement_type in ops
)

def get_queries(
self, usage_location: str, project_id: str, dataset_id: str
) -> List[BigQueryQueryResult]:
return self.get_or_update_cache(
f"{project_id}.{dataset_id}",
BigQueryQueryResult.get_for_table,
session=super().get_session(),
usage_location=usage_location,
project_id=project_id,
dataset_id=dataset_id,
)

@staticmethod
def get_system_profile(
project_id: str,
dataset_id: str,
table: str,
query_results: List[BigQueryQueryResult],
rows_affected_field: str,
operation: DmlOperationType,
) -> List[SystemProfile]:
if not BigQueryQueryResult.model_fields.get(rows_affected_field):
raise ValueError(
f"rows_affected_field [{rows_affected_field}] is not a valid field in BigQueryQueryResult."
)
return TypeAdapter(List[SystemProfile]).validate_python(
[
{
"timestamp": datetime_to_timestamp(q.start_time, milliseconds=True),
"operation": operation,
"rowsAffected": getattr(q, rows_affected_field),
}
for q in query_results
if getattr(q, rows_affected_field) > 0
and q.project_id == project_id
and q.dataset_id == dataset_id
and q.table_name == table
]
)


class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
"""

import textwrap
from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, TypeAdapter
from sqlalchemy import text
from sqlalchemy.orm import Session

from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations

BIGQUERY_STATEMENT = textwrap.dedent(
"""
Expand Down Expand Up @@ -172,3 +180,63 @@
AND resource.labels.dataset_id = "{dataset}"
AND timestamp >= "{start_date}"
"""


class BigQueryQueryResult(BaseModel):
project_id: str
dataset_id: str
table_name: str
inserted_row_count: Optional[int] = None
deleted_row_count: Optional[int] = None
updated_row_count: Optional[int] = None
start_time: datetime
statement_type: str

@staticmethod
def get_for_table(
session: Session,
usage_location: str,
dataset_id: str,
project_id: str,
):
rows = session.execute(
text(
JOBS.format(
usage_location=usage_location,
dataset_id=dataset_id,
project_id=project_id,
insert=DatabaseDMLOperations.INSERT.value,
update=DatabaseDMLOperations.UPDATE.value,
delete=DatabaseDMLOperations.DELETE.value,
merge=DatabaseDMLOperations.MERGE.value,
)
)
)

return TypeAdapter(List[BigQueryQueryResult]).validate_python(map(dict, rows))


JOBS = """
SELECT
statement_type,
start_time,
destination_table.project_id as project_id,
destination_table.dataset_id as dataset_id,
destination_table.table_id as table_name,
dml_statistics.inserted_row_count as inserted_row_count,
dml_statistics.deleted_row_count as deleted_row_count,
dml_statistics.updated_row_count as updated_row_count
FROM
`region-{usage_location}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) >= CURRENT_DATE() - 1 AND
destination_table.dataset_id = '{dataset_id}' AND
destination_table.project_id = '{project_id}' AND
statement_type IN (
'{insert}',
'{update}',
'{delete}',
'{merge}'
)
ORDER BY creation_time DESC;
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
from metadata.ingestion.source.database.bigquery.profiler.profiler import (
BigQueryProfiler,
)
from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=BigquerySource,
lineage_source_class=BigqueryLineageSource,
usage_source_class=BigqueryUsageSource,
profiler_class=BigQueryProfiler,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.database.bigtable.metadata import BigtableSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=BigtableSource)
Loading

0 comments on commit ec7cd4e

Please sign in to comment.