Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lineage-1: Move view lineage processing to lineage workflow #18220

Merged
merged 8 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import quote
from metadata.ingestion.source.models import TableView
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger

Expand Down Expand Up @@ -90,7 +92,7 @@ class ESMixin(Generic[T]):

# sort_field needs to be unique for the pagination to work, so we can use the FQN
paginate_query = (
"/search/query?q=&size={size}&deleted=false{filter}&index={index}"
"/search/query?q=&size={size}&deleted=false{filter}&index={index}{include_fields}"
"&sort_field=fullyQualifiedName{after}"
)

Expand Down Expand Up @@ -301,13 +303,19 @@ def es_get_queries_with_lineage(self, service_name: str) -> Optional[Set[str]]:
logger.warning(f"Unknown error extracting results from ES query [{err}]")
return None

def paginate_es(
def _get_include_fields_query(self, fields: Optional[List[str]]) -> str:
"""Get the include fields query"""
if fields:
return "&include_source_fields=" + "&include_source_fields=".join(fields)
return ""

def _paginate_es_internal(
self,
entity: Type[T],
query_filter: Optional[str] = None,
size: int = 100,
fields: Optional[List[str]] = None,
) -> Iterator[T]:
include_fields: Optional[List[str]] = None,
) -> Iterator[ESResponse]:
"""Paginate through the ES results, ignoring individual errors"""
after: Optional[str] = None
error_pages = 0
Expand All @@ -316,6 +324,7 @@ def paginate_es(
index=ES_INDEX_MAP[entity.__name__],
filter="&query_filter=" + quote_plus(query_filter) if query_filter else "",
size=size,
include_fields=self._get_include_fields_query(include_fields),
)
while True:
query_string = query(
Expand All @@ -330,10 +339,7 @@ def paginate_es(
continue
else:
break

yield from self._yield_hits_from_api(
response=response, entity=entity, fields=fields
)
yield response

# Get next page
last_hit = response.hits.hits[-1] if response.hits.hits else None
Expand All @@ -343,6 +349,18 @@ def paginate_es(

after = ",".join(last_hit.sort)

def paginate_es(
self,
entity: Type[T],
query_filter: Optional[str] = None,
size: int = 100,
fields: Optional[List[str]] = None,
) -> Iterator[T]:
for response in self._paginate_es_internal(entity, query_filter, size):
yield from self._yield_hits_from_api(
response=response, entity=entity, fields=fields
)

def _get_es_response(self, query_string: str) -> Optional[ESResponse]:
"""Get the Elasticsearch response"""
try:
Expand All @@ -369,3 +387,43 @@ def _yield_hits_from_api(
logger.warning(
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)

def yield_es_view_def(
self,
service_name: str,
) -> Iterable[TableView]:
"""
Get the view definition from ES
"""

from metadata.utils import fqn

query = {
"query": {
"bool": {
"must": [
{"term": {"service.name.keyword": service_name}},
{"term": {"tableType": TableType.View.value}},
{"term": {"deleted": False}},
{"exists": {"field": "schemaDefinition"}},
]
}
}
}
query = json.dumps(query)
for response in self._paginate_es_internal(
entity=Table,
query_filter=query,
include_fields=["schemaDefinition", "fullyQualifiedName"],
):
for hit in response.hits.hits:
_, database_name, schema_name, table_name = fqn.split(
hit.source["fullyQualifiedName"]
)
yield TableView(
view_definition=hit.source["schemaDefinition"],
service_name=service_name,
db_name=database_name,
schema_name=schema_name,
table_name=table_name,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,26 @@
"""
Handle big query lineage extraction
"""
from metadata.ingestion.source.database.bigquery.queries import BIGQUERY_STATEMENT
from typing import Dict, List

from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_STATEMENT,
)
from metadata.ingestion.source.database.bigquery.query_parser import (
BigqueryQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end


class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
class BigqueryLineageSource(
BigqueryQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
"""
Implements the necessary methods to extract
Database lineage from Bigquery Source
Expand All @@ -32,3 +44,18 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
OR (statement_type = "INSERT" and UPPER(query) like '%%INSERT%%INTO%%SELECT%%')
)
"""

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
BigQueryStoredProcedure,
)
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_GET_STORED_PROCEDURES,
BIGQUERY_LIFE_CYCLE_QUERY,
BIGQUERY_SCHEMA_DESCRIPTION,
Expand All @@ -95,14 +94,9 @@
LifeCycleQueryMixin,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_ddls,
Expand Down Expand Up @@ -223,9 +217,7 @@ def _build_formatted_table_id(table):
Inspector.get_table_ddl = get_table_ddl


class BigquerySource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
Expand Down Expand Up @@ -850,22 +842,6 @@ def yield_stored_procedure(
)
)

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
)

return queries_dict

def mark_tables_as_deleted(self):
"""
Use the current inspector to mark tables as deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
AND job_type = "QUERY"
AND state = "DONE"
AND error_result is NULL
AND query LIKE 'CALL%%'
AND UPPER(query) LIKE 'CALL%%'
),
Q_HISTORY AS (
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class DatabaseServiceTopology(ServiceTopology):
# until we have finished ingesting all the metadata from the source.
post_process=[
"yield_view_lineage",
"yield_procedure_lineage_and_queries",
"yield_external_table_lineage",
],
)
Expand Down
46 changes: 46 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/db2/lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Db2 lineage module
"""
from typing import Optional

from metadata.generated.schema.entity.services.connections.database.db2Connection import (
Db2Connection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class Db2LineageSource(LineageSource):
"""
Db2 lineage source implements view lineage
"""

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: Db2Connection = config.serviceConnection.root.config
if not isinstance(connection, Db2Connection):
raise InvalidSourceException(
f"Expected Db2Connection, but got {connection}"
)
return cls(config, metadata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Druid lineage module
"""
from typing import Optional

from metadata.generated.schema.entity.services.connections.database.druidConnection import (
DruidConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class DruidLineageSource(LineageSource):
"""
Druid lineage source implements view lineage
"""

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: DruidConnection = config.serviceConnection.root.config
if not isinstance(connection, DruidConnection):
raise InvalidSourceException(
f"Expected DruidConnection, but got {connection}"
)
return cls(config, metadata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Greenplum lineage module
"""
from typing import Optional

from metadata.generated.schema.entity.services.connections.database.greenplumConnection import (
GreenplumConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class GreenplumLineageSource(LineageSource):
"""
Greenplum lineage source implements view lineage
"""

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: GreenplumConnection = config.serviceConnection.root.config
if not isinstance(connection, GreenplumConnection):
raise InvalidSourceException(
f"Expected GreenplumConnection, but got {connection}"
)
return cls(config, metadata)
Loading
Loading