Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GEN-356] Use ServiceSpec for loading sources based on connectors #18322

Merged
merged 20 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingestion/src/metadata/data_quality/source/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import import_source_class
from metadata.utils.logger import test_suite_logger
from metadata.utils.service_spec.service_spec import import_source_class

logger = test_suite_logger()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.api.rest.metadata import RestSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=RestSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from metadata.ingestion.source.dashboard.domodashboard.metadata import (
DomodashboardSource,
)
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=DomodashboardSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.lightdash.metadata import LightdashSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=LightdashSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=LookerSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=MetabaseSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.mode.metadata import ModeSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=ModeSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.mstr.metadata import MstrSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=MstrSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=PowerbiSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.qlikcloud.metadata import QlikcloudSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QlikcloudSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QliksenseSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=QuicksightSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.redash.metadata import RedashSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=RedashSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.sigma.metadata import SigmaSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=SigmaSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=SupersetSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.dashboard.tableau.metadata import TableauSource
from metadata.utils.service_spec import BaseSpec

ServiceSpec = BaseSpec(metadata_source_class=TableauSource)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from metadata.ingestion.source.database.athena.lineage import AthenaLineageSource
from metadata.ingestion.source.database.athena.metadata import AthenaSource
from metadata.ingestion.source.database.athena.usage import AthenaUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=AthenaSource,
lineage_source_class=AthenaLineageSource,
usage_source_class=AthenaUsageSource,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from metadata.ingestion.source.database.azuresql.lineage import AzuresqlLineageSource
from metadata.ingestion.source.database.azuresql.metadata import AzuresqlSource
from metadata.ingestion.source.database.azuresql.usage import AzuresqlUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=AzuresqlSource,
lineage_source_class=AzuresqlLineageSource,
usage_source_class=AzuresqlUsageSource,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List, Type

from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.source.database.bigquery.profiler.system import (
BigQuerySystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner


class BigQueryProfiler(BigQueryProfilerInterface):
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
return self.system_metrics_computer.get_system_metrics(
runner.table, self.service_connection_config
)

def initialize_system_metrics_computer(
self, **kwargs
) -> BigQuerySystemMetricsComputer:
return BigQuerySystemMetricsComputer(session=self.session)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from typing import List

from pydantic import TypeAdapter
from sqlalchemy.orm import DeclarativeMeta

from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.utils.logger import profiler_logger
from metadata.utils.time_utils import datetime_to_timestamp

logger = profiler_logger()


class BigQuerySystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get_kwargs(
self,
table: DeclarativeMeta,
service_connection: BigQueryConnection,
*args,
**kwargs,
):
return {
"table": table.__table__.name,
"dataset_id": table.__table_args__["schema"],
"project_id": super().get_session().get_bind().url.host,
"usage_location": service_connection.usageLocation,
}

def get_deletes(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
list(
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.DELETE,
],
)
),
"deleted_row_count",
DmlOperationType.DELETE,
)

def get_updates(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.UPDATE,
DatabaseDMLOperations.MERGE,
],
),
"updated_row_count",
DmlOperationType.UPDATE,
)

def get_inserts(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
[
DatabaseDMLOperations.INSERT,
DatabaseDMLOperations.MERGE,
],
),
"inserted_row_count",
DmlOperationType.INSERT,
)

def get_queries_by_operation(
self,
usage_location: str,
project_id: str,
dataset_id: str,
operations: List[DatabaseDMLOperations],
) -> List[BigQueryQueryResult]:
ops = {op.value for op in operations}
yield from (
query
for query in self.get_queries(usage_location, project_id, dataset_id)
if query.statement_type in ops
)

def get_queries(
self, usage_location: str, project_id: str, dataset_id: str
) -> List[BigQueryQueryResult]:
return self.get_or_update_cache(
f"{project_id}.{dataset_id}",
BigQueryQueryResult.get_for_table,
session=super().get_session(),
usage_location=usage_location,
project_id=project_id,
dataset_id=dataset_id,
)

@staticmethod
def get_system_profile(
project_id: str,
dataset_id: str,
table: str,
query_results: List[BigQueryQueryResult],
rows_affected_field: str,
operation: DmlOperationType,
) -> List[SystemProfile]:
if not BigQueryQueryResult.model_fields.get(rows_affected_field):
raise ValueError(
f"rows_affected_field [{rows_affected_field}] is not a valid field in BigQueryQueryResult."
)
return TypeAdapter(List[SystemProfile]).validate_python(
[
{
"timestamp": datetime_to_timestamp(q.start_time, milliseconds=True),
"operation": operation,
"rowsAffected": getattr(q, rows_affected_field),
}
for q in query_results
if getattr(q, rows_affected_field) > 0
and q.project_id == project_id
and q.dataset_id == dataset_id
and q.table_name == table
]
)


class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource):
pass
Comment on lines +160 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? I am wondering why it does not override the base SystemMetricsComputer directly?

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
"""

import textwrap
from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, TypeAdapter
from sqlalchemy import text
from sqlalchemy.orm import Session

from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations

BIGQUERY_STATEMENT = textwrap.dedent(
"""
Expand Down Expand Up @@ -172,3 +180,63 @@
AND resource.labels.dataset_id = "{dataset}"
AND timestamp >= "{start_date}"
"""


class BigQueryQueryResult(BaseModel):
project_id: str
dataset_id: str
table_name: str
inserted_row_count: Optional[int] = None
deleted_row_count: Optional[int] = None
updated_row_count: Optional[int] = None
start_time: datetime
statement_type: str

@staticmethod
def get_for_table(
session: Session,
usage_location: str,
dataset_id: str,
project_id: str,
):
rows = session.execute(
text(
JOBS.format(
usage_location=usage_location,
dataset_id=dataset_id,
project_id=project_id,
insert=DatabaseDMLOperations.INSERT.value,
update=DatabaseDMLOperations.UPDATE.value,
delete=DatabaseDMLOperations.DELETE.value,
merge=DatabaseDMLOperations.MERGE.value,
)
)
)

return TypeAdapter(List[BigQueryQueryResult]).validate_python(map(dict, rows))


JOBS = """
SELECT
statement_type,
start_time,
destination_table.project_id as project_id,
destination_table.dataset_id as dataset_id,
destination_table.table_id as table_name,
dml_statistics.inserted_row_count as inserted_row_count,
dml_statistics.deleted_row_count as deleted_row_count,
dml_statistics.updated_row_count as updated_row_count
FROM
`region-{usage_location}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) >= CURRENT_DATE() - 1 AND
destination_table.dataset_id = '{dataset_id}' AND
destination_table.project_id = '{project_id}' AND
statement_type IN (
'{insert}',
'{update}',
'{delete}',
'{merge}'
)
ORDER BY creation_time DESC;
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
from metadata.ingestion.source.database.bigquery.profiler.profiler import (
BigQueryProfiler,
)
from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=BigquerySource,
lineage_source_class=BigqueryLineageSource,
usage_source_class=BigqueryUsageSource,
profiler_class=BigQueryProfiler,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.database.bigtable.metadata import BigtableSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=BigtableSource)
Loading
Loading