Skip to content

Commit

Permalink
Part of #12998 - Query Service & Lineage filter processed queries (#1…
Browse files Browse the repository at this point in the history
…3215)

* Update mapping

* Prep

* prep

* Prep query migration

* Add query index deletion fix

* Docs and Maven CI fix

* Fix tests

* Add service filter

* Add query entity FQN col migration

* Fix lint

* supported serviceFQN in query api

* Prep repo

* Prep ES query search

* Do not recompute lineage

* Format

* Fix test

---------

Co-authored-by: Ashish Gupta <ashish@getcollate.io>
  • Loading branch information
pmbrull and Ashish8689 authored Sep 19, 2023
1 parent 1895fbd commit 22b0f44
Show file tree
Hide file tree
Showing 25 changed files with 502 additions and 40 deletions.
3 changes: 3 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,6 @@ SET json = JSON_INSERT(
JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit')
)
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';

-- Query Entity supports service, which requires FQN for name
ALTER TABLE query_entity CHANGE COLUMN nameHash fqnHash VARCHAR(256);
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ SET json = jsonb_set(
true
)
WHERE json #>> '{pipelineType}' = 'metadata';

-- Query Entity supports service, which requires FQN for name
ALTER TABLE query_entity RENAME COLUMN nameHash TO fqnHash;
40 changes: 39 additions & 1 deletion ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
To be used by OpenMetadata class
"""
import functools
import json
import traceback
from typing import Generic, List, Optional, Type, TypeVar
from typing import Generic, List, Optional, Set, Type, TypeVar

from pydantic import BaseModel
from requests.utils import quote

from metadata.generated.schema.api.createEventPublisherJob import (
CreateEventPublisherJob,
)
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult
from metadata.ingestion.ometa.client import REST, APIError
from metadata.utils.elasticsearch import ES_INDEX_MAP
Expand Down Expand Up @@ -139,3 +142,38 @@ def get_reindex_job_status(self, job_id: str) -> Optional[EventPublisherResult]:
logger.debug(traceback.format_exc())
logger.debug(f"Failed to fetch reindex job status due to {err}")
return None

@staticmethod
def get_query_with_lineage_filter(service_name: str) -> str:
query_lineage_filter = {
"query": {
"bool": {
"must": [
{"term": {"processedLineage": True}},
{"term": {"service.name.keyword": service_name}},
]
}
}
}
return quote(json.dumps(query_lineage_filter))

@functools.lru_cache(maxsize=12)
def es_get_queries_with_lineage(self, service_name: str) -> Optional[Set[str]]:
"""Get a set of query checksums that have already been processed for lineage"""
try:
resp = self.client.get(
f"/search/query?q=&index={ES_INDEX_MAP[Query.__name__]}"
"&include_source_fields=checksum&include_source_fields="
f"processedLineage&query_filter={self.get_query_with_lineage_filter(service_name)}"
)
return {elem["_source"]["checksum"] for elem in resp["hits"]["hits"]}

except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not get queries from ES due to [{err}]")
return None

except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Unknown error extracting results from ES query [{err}]")
return None
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
:return: tables or views, depending on config
"""
schema_name = self.context.database_schema.name.__root__
try:
schema_name = self.context.database_schema.name.__root__
if self.source_config.includeTables:
for table_and_type in self.query_table_names_and_types(schema_name):
table_name = self.standardize_table_name(
Expand Down
56 changes: 43 additions & 13 deletions ingestion/src/metadata/ingestion/source/database/lineage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
import csv
import traceback
from abc import ABC
from typing import Iterable, Iterator
from typing import Iterable, Iterator, Union

from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand Down Expand Up @@ -99,23 +102,50 @@ def yield_table_query(self) -> Iterator[TableQuery]:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {query_dict}: {exc}")

def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]:
def _query_already_processed(self, table_query: TableQuery) -> bool:
"""
Check if a query has already been processed by validating if exists
in ES with lineageProcessed as True
"""
checksums = self.metadata.es_get_queries_with_lineage(
service_name=table_query.serviceName,
)
return fqn.get_query_checksum(table_query.query) in checksums or {}

def _iter(
self, *_, **__
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
for table_query in self.get_table_query():
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)

for lineage_request in lineages or []:
yield lineage_request

for lineage_request in lineages or []:
yield lineage_request
# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(__root__=table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(
__root__=self.config.serviceName
),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,6 @@ def yield_procedure_query(
type="storedProcedure",
),
processedLineage=bool(self.context.stored_procedure_query_lineage),
service=self.context.database_service.name.__root__,
)
)
2 changes: 2 additions & 0 deletions ingestion/src/metadata/ingestion/stage/table_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def _add_sql_query(self, record, table):
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
service=record.serviceName,
)
)
else:
Expand All @@ -113,6 +114,7 @@ def _add_sql_query(self, record, table):
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
service=record.serviceName,
)
]

Expand Down
24 changes: 24 additions & 0 deletions ingestion/src/metadata/utils/fqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Filter information has been taken from the
ES indexes definitions
"""
import hashlib
import re
from typing import Dict, List, Optional, Type, TypeVar, Union

Expand All @@ -33,6 +34,7 @@
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.data.topic import Topic
Expand Down Expand Up @@ -433,6 +435,20 @@ def _(
return _build(service_name, "model", data_model_name)


@fqn_build_registry.add(Query)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for dashboard FQN building
*,
service_name: str,
query_checksum: str,
) -> str:
if not service_name or not query_checksum:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, query_checksum=`{query_checksum}``"
)
return _build(service_name, query_checksum)


def split_table_name(table_name: str) -> Dict[str, Optional[str]]:
"""
Given a table name, try to extract database, schema and
Expand Down Expand Up @@ -531,3 +547,11 @@ def search_table_from_es(
return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)


def get_query_checksum(query: str) -> str:
"""
Prepare the query checksum from its string representation.
The checksum is used as the query's name.
"""
return hashlib.md5(query.encode()).hexdigest()
91 changes: 87 additions & 4 deletions ingestion/tests/integration/ometa/test_ometa_es_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
import time
from unittest import TestCase

from requests.utils import quote

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
Expand All @@ -41,9 +45,12 @@
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.basic import SqlQuery
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn

QUERY_CHECKSUM = fqn.get_query_checksum("select * from awesome")


class OMetaESTest(TestCase):
"""
Expand Down Expand Up @@ -75,6 +82,19 @@ class OMetaESTest(TestCase):
)
),
)
another_service = CreateDatabaseServiceRequest(
name="another-test-service-es",
serviceType=DatabaseServiceType.Mysql,
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
authType=BasicAuth(
password="password",
),
hostPort="http://localhost:1234",
)
),
)
service_type = "databaseService"

@classmethod
Expand All @@ -85,13 +105,23 @@ def check_es_index(cls) -> None:
logging.info("Checking ES index status...")
tries = 0

res = None
while not res and tries <= 5: # Kill in 5 seconds
res = cls.metadata.es_search_from_fqn(
table_res = None
query_res = None
while not table_res and not query_res and tries <= 5: # Kill in 5 seconds
table_res = cls.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string="test-service-es.test-db-es.test-schema-es.test-es",
)
if not res:
query_res = cls.metadata.es_search_from_fqn(
entity_type=Query,
fqn_search_string=fqn.build(
metadata=None,
entity_type=Query,
service_name="test-service-es",
query_checksum=QUERY_CHECKSUM,
),
)
if not table_res or query_res:
tries += 1
time.sleep(1)

Expand Down Expand Up @@ -125,6 +155,32 @@ def setUpClass(cls) -> None:

cls.entity = cls.metadata.create_or_update(create)

# Create queries for the given service
query = CreateQueryRequest(
query=SqlQuery(__root__="select * from awesome"),
service=cls.service_entity.fullyQualifiedName,
processedLineage=True, # Only 1 with processed lineage
)
cls.metadata.create_or_update(query)

query2 = CreateQueryRequest(
query=SqlQuery(__root__="select * from another_awesome"),
service=cls.service_entity.fullyQualifiedName,
)
cls.metadata.create_or_update(query2)

# Create queries for another service
cls.another_service_entity = cls.metadata.create_or_update(
data=cls.another_service
)

another_query = CreateQueryRequest(
query=SqlQuery(__root__="select * from awesome"),
service=cls.another_service_entity.fullyQualifiedName,
processedLineage=True,
)
cls.metadata.create_or_update(another_query)

# Leave some time for indexes to get updated, otherwise this happens too fast
cls.check_es_index()

Expand All @@ -147,6 +203,19 @@ def tearDownClass(cls) -> None:
hard_delete=True,
)

another_service_id = str(
cls.metadata.get_by_name(
entity=DatabaseService, fqn=cls.another_service.name.__root__
).id.__root__
)

cls.metadata.delete(
entity=DatabaseService,
entity_id=another_service_id,
recursive=True,
hard_delete=True,
)

# Disabling this test because it fails with
# this pr: https://github.com/open-metadata/OpenMetadata/pull/11879
# and failure is repoducible only with docker deployment
Expand Down Expand Up @@ -211,3 +280,17 @@ def test_es_search_from_service_table_empty(self):
)

self.assertIsNone(res)

def test_get_query_with_lineage_filter(self):
"""Check we are building the proper filter"""
res = self.metadata.get_query_with_lineage_filter("my_service")
expected = (
'{"query": {"bool": {"must": [{"term": {"processedLineage": true}},'
' {"term": {"service.name.keyword": "my_service"}}]}}}'
)
self.assertEquals(res, quote(expected))

def test_get_queries_with_lineage(self):
"""Check the payload from ES"""
res = self.metadata.es_get_queries_with_lineage(self.service.name.__root__)
self.assertIn(QUERY_CHECKSUM, res)
Loading

0 comments on commit 22b0f44

Please sign in to comment.