diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index 0cb82c116729..58de5ecb7eb9 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -93,3 +93,6 @@ SET json = JSON_INSERT( JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit') ) WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata'; + +-- Query Entity supports service, which requires FQN for name +ALTER TABLE query_entity CHANGE COLUMN nameHash fqnHash VARCHAR(256); \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index 9f42077c6108..9707d3ba0740 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -101,3 +101,6 @@ SET json = jsonb_set( true ) WHERE json #>> '{pipelineType}' = 'metadata'; + +-- Query Entity supports service, which requires FQN for name +ALTER TABLE query_entity RENAME COLUMN nameHash TO fqnHash; diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index b5898a09c7cf..8d79806cae28 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -14,14 +14,17 @@ To be used by OpenMetadata class """ import functools +import json import traceback -from typing import Generic, List, Optional, Type, TypeVar +from typing import Generic, List, Optional, Set, Type, TypeVar from pydantic import BaseModel +from requests.utils import quote from metadata.generated.schema.api.createEventPublisherJob import ( CreateEventPublisherJob, ) +from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult from metadata.ingestion.ometa.client import REST, APIError from metadata.utils.elasticsearch import ES_INDEX_MAP @@ -139,3 +142,38 @@ def get_reindex_job_status(self, job_id: str) -> Optional[EventPublisherResult]: logger.debug(traceback.format_exc()) logger.debug(f"Failed to fetch reindex job status due to {err}") return None + + @staticmethod + def get_query_with_lineage_filter(service_name: str) -> str: + query_lineage_filter = { + "query": { + "bool": { + "must": [ + {"term": {"processedLineage": True}}, + {"term": {"service.name.keyword": service_name}}, + ] + } + } + } + return quote(json.dumps(query_lineage_filter)) + + @functools.lru_cache(maxsize=12) + def es_get_queries_with_lineage(self, service_name: str) -> Optional[Set[str]]: + """Get a set of query checksums that have already been processed for lineage""" + try: + resp = self.client.get( + f"/search/query?q=&index={ES_INDEX_MAP[Query.__name__]}" + "&include_source_fields=checksum&include_source_fields=" + f"processedLineage&query_filter={self.get_query_with_lineage_filter(service_name)}" + ) + return {elem["_source"]["checksum"] for elem in resp["hits"]["hits"]} + + except APIError as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Could not get queries from ES due to [{err}]") + return None + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Unknown error extracting results from ES query [{err}]") + return None diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 0c726db61f06..8f320d7f577f 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -254,8 +254,8 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: :return: tables or views, depending on config """ + schema_name = self.context.database_schema.name.__root__ try: - schema_name = self.context.database_schema.name.__root__ if self.source_config.includeTables: for table_and_type in self.query_table_names_and_types(schema_name): table_name = self.standardize_table_name( diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 2cdf73bb9082..dc0217fddeca 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -14,14 +14,17 @@ import csv import traceback from abc import ABC -from typing import Iterable, Iterator +from typing import Iterable, Iterator, Union +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.source.database.query_parser_source import QueryParserSource +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -99,7 +102,19 @@ def yield_table_query(self) -> Iterator[TableQuery]: logger.debug(traceback.format_exc()) logger.warning(f"Error processing query_dict {query_dict}: {exc}") - def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: + def _query_already_processed(self, table_query: TableQuery) -> bool: + """ + Check if a query has already been processed by validating if exists + in ES with lineageProcessed as True + """ + checksums = self.metadata.es_get_queries_with_lineage( + service_name=table_query.serviceName, + ) + return fqn.get_query_checksum(table_query.query) in checksums or {} + + def _iter( + self, *_, **__ + ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: """ Based on the query logs, prepare the lineage and send it to the sink @@ -107,15 +122,30 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: connection_type = str(self.service_connection.type.value) dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) for table_query in self.get_table_query(): - lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( - self.metadata, - query=table_query.query, - service_name=table_query.serviceName, - database_name=table_query.databaseName, - schema_name=table_query.databaseSchema, - dialect=dialect, - timeout_seconds=self.source_config.parsingTimeoutLimit, - ) + if not self._query_already_processed(table_query): + lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( + self.metadata, + query=table_query.query, + service_name=table_query.serviceName, + database_name=table_query.databaseName, + schema_name=table_query.databaseSchema, + dialect=dialect, + timeout_seconds=self.source_config.parsingTimeoutLimit, + ) + + for lineage_request in lineages or []: + yield lineage_request - for lineage_request in lineages or []: - yield lineage_request + # If we identified lineage properly, ingest the original query + if lineage_request.right: + yield Either( + right=CreateQueryRequest( + query=SqlQuery(__root__=table_query.query), + query_type=table_query.query_type, + duration=table_query.duration, + processedLineage=True, + service=FullyQualifiedEntityName( + __root__=self.config.serviceName + ), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index 55300a5bf7c3..da9800e93340 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -179,5 +179,6 @@ def yield_procedure_query( type="storedProcedure", ), processedLineage=bool(self.context.stored_procedure_query_lineage), + service=self.context.database_service.name.__root__, ) ) diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 342ec8f58cb5..a8587d577abc 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -101,6 +101,7 @@ def _add_sql_query(self, record, table): queryDate=record.date, usedBy=used_by, duration=record.duration, + service=record.serviceName, ) ) else: @@ -113,6 +114,7 @@ def _add_sql_query(self, record, table): queryDate=record.date, usedBy=used_by, duration=record.duration, + service=record.serviceName, ) ] diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 63d317d07c18..6d437f404f5a 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -13,6 +13,7 @@ Filter information has been taken from the ES indexes definitions """ +import hashlib import re from typing import Dict, List, Optional, Type, TypeVar, Union @@ -33,6 +34,7 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.entity.data.searchIndex import SearchIndex from metadata.generated.schema.entity.data.table import Column, DataModel, Table from metadata.generated.schema.entity.data.topic import Topic @@ -433,6 +435,20 @@ def _( return _build(service_name, "model", data_model_name) +@fqn_build_registry.add(Query) +def _( + _: Optional[OpenMetadata], # ES Index not necessary for dashboard FQN building + *, + service_name: str, + query_checksum: str, +) -> str: + if not service_name or not query_checksum: + raise FQNBuildingException( + f"Args should be informed, but got service=`{service_name}`, query_checksum=`{query_checksum}``" + ) + return _build(service_name, query_checksum) + + def split_table_name(table_name: str) -> Dict[str, Optional[str]]: """ Given a table name, try to extract database, schema and @@ -531,3 +547,11 @@ def search_table_from_es( return get_entity_from_es_result( entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities ) + + +def get_query_checksum(query: str) -> str: + """ + Prepare the query checksum from its string representation. + The checksum is used as the query's name. + """ + return hashlib.md5(query.encode()).hexdigest() diff --git a/ingestion/tests/integration/ometa/test_ometa_es_api.py b/ingestion/tests/integration/ometa/test_ometa_es_api.py index 636ed58b4b03..e7a424b3c14d 100644 --- a/ingestion/tests/integration/ometa/test_ometa_es_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_es_api.py @@ -15,14 +15,18 @@ import time from unittest import TestCase +from requests.utils import quote + from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) +from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.entity.data.table import Column, DataType, Table from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( BasicAuth, @@ -41,9 +45,12 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) +from metadata.generated.schema.type.basic import SqlQuery from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn +QUERY_CHECKSUM = fqn.get_query_checksum("select * from awesome") + class OMetaESTest(TestCase): """ @@ -75,6 +82,19 @@ class OMetaESTest(TestCase): ) ), ) + another_service = CreateDatabaseServiceRequest( + name="another-test-service-es", + serviceType=DatabaseServiceType.Mysql, + connection=DatabaseConnection( + config=MysqlConnection( + username="username", + authType=BasicAuth( + password="password", + ), + hostPort="http://localhost:1234", + ) + ), + ) service_type = "databaseService" @classmethod @@ -85,13 +105,23 @@ def check_es_index(cls) -> None: logging.info("Checking ES index status...") tries = 0 - res = None - while not res and tries <= 5: # Kill in 5 seconds - res = cls.metadata.es_search_from_fqn( + table_res = None + query_res = None + while not table_res and not query_res and tries <= 5: # Kill in 5 seconds + table_res = cls.metadata.es_search_from_fqn( entity_type=Table, fqn_search_string="test-service-es.test-db-es.test-schema-es.test-es", ) - if not res: + query_res = cls.metadata.es_search_from_fqn( + entity_type=Query, + fqn_search_string=fqn.build( + metadata=None, + entity_type=Query, + service_name="test-service-es", + query_checksum=QUERY_CHECKSUM, + ), + ) + if not table_res or query_res: tries += 1 time.sleep(1) @@ -125,6 +155,32 @@ def setUpClass(cls) -> None: cls.entity = cls.metadata.create_or_update(create) + # Create queries for the given service + query = CreateQueryRequest( + query=SqlQuery(__root__="select * from awesome"), + service=cls.service_entity.fullyQualifiedName, + processedLineage=True, # Only 1 with processed lineage + ) + cls.metadata.create_or_update(query) + + query2 = CreateQueryRequest( + query=SqlQuery(__root__="select * from another_awesome"), + service=cls.service_entity.fullyQualifiedName, + ) + cls.metadata.create_or_update(query2) + + # Create queries for another service + cls.another_service_entity = cls.metadata.create_or_update( + data=cls.another_service + ) + + another_query = CreateQueryRequest( + query=SqlQuery(__root__="select * from awesome"), + service=cls.another_service_entity.fullyQualifiedName, + processedLineage=True, + ) + cls.metadata.create_or_update(another_query) + # Leave some time for indexes to get updated, otherwise this happens too fast cls.check_es_index() @@ -147,6 +203,19 @@ def tearDownClass(cls) -> None: hard_delete=True, ) + another_service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqn=cls.another_service.name.__root__ + ).id.__root__ + ) + + cls.metadata.delete( + entity=DatabaseService, + entity_id=another_service_id, + recursive=True, + hard_delete=True, + ) + # Disabling this test because it fails with # this pr: https://github.com/open-metadata/OpenMetadata/pull/11879 # and failure is repoducible only with docker deployment @@ -211,3 +280,17 @@ def test_es_search_from_service_table_empty(self): ) self.assertIsNone(res) + + def test_get_query_with_lineage_filter(self): + """Check we are building the proper filter""" + res = self.metadata.get_query_with_lineage_filter("my_service") + expected = ( + '{"query": {"bool": {"must": [{"term": {"processedLineage": true}},' + ' {"term": {"service.name.keyword": "my_service"}}]}}}' + ) + self.assertEquals(res, quote(expected)) + + def test_get_queries_with_lineage(self): + """Check the payload from ES""" + res = self.metadata.es_get_queries_with_lineage(self.service.name.__root__) + self.assertIn(QUERY_CHECKSUM, res) diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py index 0439ddd31a1c..47375f0c6f6a 100644 --- a/ingestion/tests/integration/ometa/test_ometa_table_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -63,6 +63,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) +from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -453,7 +454,10 @@ def test_table_queries(self): entity=Table, fqn=self.entity.fullyQualifiedName ) - query_no_user = CreateQueryRequest(query="select * from awesome") + query_no_user = CreateQueryRequest( + query=SqlQuery(__root__="select * from awesome"), + service=FullyQualifiedEntityName(__root__=self.service.name.__root__), + ) self.metadata.ingest_entity_queries_data(entity=res, queries=[query_no_user]) table_with_query: List[Query] = self.metadata.get_entity_queries( @@ -466,7 +470,9 @@ def test_table_queries(self): # Validate that we can properly add user information query_with_user = CreateQueryRequest( - query="select * from awesome", users=[self.owner.fullyQualifiedName] + query="select * from awesome", + users=[self.owner.fullyQualifiedName], + service=FullyQualifiedEntityName(__root__=self.service.name.__root__), ) self.metadata.ingest_entity_queries_data(entity=res, queries=[query_with_user]) diff --git a/openmetadata-docs/content/partials/v1.2/deployment/upgrade/upgrade-prerequisites.md b/openmetadata-docs/content/partials/v1.2/deployment/upgrade/upgrade-prerequisites.md index 155d54bff44c..fb25a4680d87 100644 --- a/openmetadata-docs/content/partials/v1.2/deployment/upgrade/upgrade-prerequisites.md +++ b/openmetadata-docs/content/partials/v1.2/deployment/upgrade/upgrade-prerequisites.md @@ -90,16 +90,25 @@ pip install openmetadata-ingestion[]==x.y.z The `plugin` parameter is a list of the sources that we want to ingest. An example would look like this `openmetadata-ingestion[mysql,snowflake,s3]==1.1.1`. You will find specific instructions for each connector [here](/connectors). -## 1.1.1 - Stable Release 🎉 +## 1.2 - Stable Release 🎉 -OpenMetadata 1.1 is a stable release. Please check the [release notes](/releases/latest-release). +OpenMetadata 1.2 is a stable release. Please check the [release notes](/releases/latest-release). If you are upgrading production this is the recommended version to upgrade to. ## Deprecation Notice +- OpenMetadata only supports Python version 3.8 to 3.10. -## Breaking Changes for 1.1 Stable Release +## Breaking Changes for 1.2 Stable Release + +### Query Entity + +The Query Entity now has the `service` property, linking the Query to the Database Service that it belongs to. Note +that `service` is a required property both for the Query Entity and the Create Query Entity. + +During the migrations, we pick up the service from the tables from `queryUsedIn`. If this information is not available, +then there is no way to link a query to a service and the query will be removed. ### Service Connection Changes diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 5ab1965f6c4a..c32f9bd8f59d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1893,7 +1893,7 @@ default Class getEntityClass() { @Override default String getNameHashColumn() { - return "nameHash"; + return "fqnHash"; } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryRepository.java index 813aff015d0e..131a7f5fbece 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryRepository.java @@ -2,6 +2,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.Entity.DATABASE_SERVICE; import static org.openmetadata.service.Entity.USER; import java.util.*; @@ -9,6 +10,7 @@ import javax.ws.rs.core.UriInfo; import lombok.SneakyThrows; import org.openmetadata.schema.entity.data.Query; +import org.openmetadata.schema.entity.services.DatabaseService; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeEvent; @@ -20,6 +22,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.resources.query.QueryResource; import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil; @@ -28,8 +31,8 @@ public class QueryRepository extends EntityRepository { private static final String QUERY_USERS_FIELD = "users"; private static final String QUERY_USED_BY_FIELD = "usedBy"; - private static final String QUERY_PATCH_FIELDS = "users,query,queryUsedIn"; - private static final String QUERY_UPDATE_FIELDS = "users,queryUsedIn"; + private static final String QUERY_PATCH_FIELDS = "users,query,queryUsedIn,processedLineage"; + private static final String QUERY_UPDATE_FIELDS = "users,queryUsedIn,processedLineage"; public QueryRepository(CollectionDAO dao) { super( @@ -43,6 +46,11 @@ public QueryRepository(CollectionDAO dao) { supportsSearchIndex = true; } + @Override + public void setFullyQualifiedName(Query query) { + query.setFullyQualifiedName(FullyQualifiedName.add(query.getService().getFullyQualifiedName(), query.getName())); + } + @Override public Query setFields(Query entity, EntityUtil.Fields fields) { entity.setQueryUsedIn(fields.contains(QUERY_USED_IN_FIELD) ? getQueryUsage(entity) : entity.getQueryUsedIn()); @@ -76,6 +84,8 @@ public void prepare(Query entity, boolean update) { entity.setName(checkSum); } entity.setUsers(EntityUtil.populateEntityReferences(entity.getUsers())); + DatabaseService service = Entity.getEntity(entity.getService(), "", Include.ALL); + entity.setService(service.getEntityReference()); } @Override @@ -100,6 +110,9 @@ public void storeRelationships(Query queryEntity) { // Store Query Used in Relation storeQueryUsedIn(queryEntity.getId(), queryEntity.getQueryUsedIn(), null); + // The service contains the query + addRelationship( + queryEntity.getService().getId(), queryEntity.getId(), DATABASE_SERVICE, Entity.QUERY, Relationship.CONTAINS); } @Override @@ -217,6 +230,8 @@ public void entitySpecificUpdate() { added, deleted, EntityUtil.entityReferenceMatch); + // Store processed Lineage + recordChange("processedLineage", original.getProcessedLineage(), updated.getProcessedLineage()); // Store Query Used in Relation recordChange("usedBy", original.getUsedBy(), updated.getUsedBy(), true); storeQueryUsedIn(updated.getId(), added, deleted); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v120/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v120/Migration.java new file mode 100644 index 000000000000..9936c41c8edd --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v120/Migration.java @@ -0,0 +1,32 @@ +package org.openmetadata.service.migration.mysql.v120; + +import static org.openmetadata.service.migration.utils.v120.MigrationUtil.addQueryService; + +import lombok.SneakyThrows; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; + +public class Migration extends MigrationProcessImpl { + private CollectionDAO collectionDAO; + + private Handle handle; + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + public void initialize(Handle handle) { + super.initialize(handle); + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + } + + @Override + @SneakyThrows + public void runDataMigration() { + addQueryService(handle, collectionDAO); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v120/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v120/Migration.java new file mode 100644 index 000000000000..90ecc0f33d8a --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v120/Migration.java @@ -0,0 +1,32 @@ +package org.openmetadata.service.migration.postgres.v120; + +import static org.openmetadata.service.migration.utils.v120.MigrationUtil.addQueryService; + +import lombok.SneakyThrows; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; + +public class Migration extends MigrationProcessImpl { + private CollectionDAO collectionDAO; + + private Handle handle; + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + public void initialize(Handle handle) { + super.initialize(handle); + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + } + + @Override + @SneakyThrows + public void runDataMigration() { + addQueryService(handle, collectionDAO); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v120/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v120/MigrationUtil.java new file mode 100644 index 000000000000..f940b143a123 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v120/MigrationUtil.java @@ -0,0 +1,112 @@ +package org.openmetadata.service.migration.utils.v120; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.openmetadata.schema.entity.data.Query; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.QueryRepository; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class MigrationUtil { + private MigrationUtil() { + /* Cannot create object util class*/ + } + + // Try to find the service for each of the queries by going through + // the table service hierarchy relationships + private static final String QUERY_LIST_SERVICE = + "SELECT " + + " q.id AS query_id, " + + " q.json AS query_json, " + + " er_table_query.fromId AS table_id, " + + " er_schema_table.fromId AS schema_id, " + + " er_database_schema.fromId AS database_id, " + + " er_service_database.fromId AS service_id, " + + " db_service.name AS service_name " + + "FROM query_entity q " + + "LEFT JOIN entity_relationship er_table_query " + + " ON er_table_query.fromEntity = 'table' " + + " AND er_table_query.toEntity = 'query' " + + " AND er_table_query.toId = q.id " + + "LEFT JOIN entity_relationship er_schema_table " + + " ON er_schema_table.fromEntity = 'databaseSchema' " + + " AND er_schema_table.toEntity = 'table' " + + " AND er_table_query.fromId = er_schema_table.toId " + + "LEFT JOIN entity_relationship er_database_schema " + + " ON er_database_schema.fromEntity = 'database' " + + " AND er_database_schema.toEntity = 'databaseSchema' " + + " AND er_schema_table.fromId = er_database_schema.toId " + + "LEFT JOIN entity_relationship er_service_database " + + " ON er_service_database.fromEntity = 'databaseService' " + + " AND er_service_database.toEntity = 'database' " + + " AND er_database_schema.fromId = er_service_database.toId " + + "LEFT JOIN dbservice_entity db_service " + + " ON db_service.id = er_service_database.fromId"; + + private static final String DELETE_QUERY = "DELETE FROM query_entity WHERE id = :id"; + private static final String DELETE_RELATIONSHIP = "DELETE FROM entity_relationship WHERE fromId = :id or toId = :id"; + + /** + * Queries have a `queryUsedIn` field as a list of EntityRef. We'll pick up the first element of the list, since the + * tables should normally be in the same service, and: 1. Get the table from the ID 2. Identify the service 3. Update + * the Query.service EntityRef + */ + public static void addQueryService(Handle handle, CollectionDAO collectionDAO) { + QueryRepository queryRepository = new QueryRepository(collectionDAO); + + try (handle) { + handle + .createQuery(QUERY_LIST_SERVICE) + .mapToMap() + .forEach( + row -> { + try { + + JsonObject queryJson = JsonUtils.readJson((String) row.get("query_json")).asJsonObject(); + String serviceName = (String) row.get("service_name"); + String serviceId = (String) row.get("service_id"); + + if (serviceId == null) { + LOG.warn( + String.format( + "Query [%s] cannot be linked to a service. Deleting...", queryJson.getString("id"))); + // We cannot directly call the queryRepository for deletion, since the Query object is missing + // the new `service` property we introduced and the `delete` operation would fail. + // We need to delete the query entry and the relationships from/to this ID by hand. + // It should be OK since queries are simple structures without any children. We should only + // have relationship table <> query & user <> query + handle.createUpdate(DELETE_QUERY).bind("id", queryJson.getString("id")).execute(); + handle.createUpdate(DELETE_RELATIONSHIP).bind("id", queryJson.getString("id")).execute(); + + } else { + // Since the query does not have the service yet, it cannot be cast to the Query class. + + JsonObject serviceJson = + Json.createObjectBuilder() + .add("id", serviceId) + .add("name", serviceName) + .add("fullyQualifiedName", serviceName) + .add("type", "databaseService") + .build(); + + JsonObjectBuilder queryWithService = Json.createObjectBuilder(); + queryJson.forEach(queryWithService::add); + queryWithService.add("service", serviceJson); + + Query query = JsonUtils.readValue(queryWithService.build().toString(), Query.class); + queryRepository.setFullyQualifiedName(query); + collectionDAO.queryDAO().update(query); + } + } catch (Exception ex) { + LOG.warn(String.format("Error updating query [%s] due to [%s]", row, ex)); + } + }); + } catch (Exception ex) { + LOG.warn("Error running the query migration ", ex); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java index f7625eea4ba6..bcb7760511fa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java @@ -116,6 +116,9 @@ public ResultList listQueries( @Parameter(description = "UUID of the entity for which to list the Queries", schema = @Schema(type = "UUID")) @QueryParam("entityId") UUID entityId, + @Parameter(description = "Filter Queries by service Fully Qualified Name", schema = @Schema(type = "string")) + @QueryParam("service") + String service, @Parameter(description = "Limit the number queries returned. " + "(1 to 1000000, default = 10)") @DefaultValue("10") @Min(0) @@ -132,6 +135,7 @@ public ResultList listQueries( if (!CommonUtil.nullOrEmpty(entityId)) { filter.addQueryParam("entityId", entityId.toString()); } + filter.addQueryParam("service", service); ResultList queries = super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); return PIIMasker.getQueries(queries, authorizer, securityContext); @@ -509,6 +513,7 @@ private Query getQuery(CreateQuery create, String user) { return copy(new Query(), create, user) .withTags(create.getTags()) .withQuery(create.getQuery()) + .withService(getEntityReference(Entity.DATABASE_SERVICE, create.getService())) .withDuration(create.getDuration()) .withVotes(new Votes().withUpVotes(0).withDownVotes(0)) .withUsers(getEntityReferences(USER, create.getUsers())) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java index 467f5bb118b9..737efb0559c5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java @@ -343,7 +343,8 @@ public Response search(SearchRequest request) throws IOException { /* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */ if (request.getIndex().equalsIgnoreCase("domain_search_index") - || request.getIndex().equalsIgnoreCase("data_products_search_index")) { + || request.getIndex().equalsIgnoreCase("data_products_search_index") + || request.getIndex().equalsIgnoreCase("query_search_index")) { searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query())); } else { searchSourceBuilder.query( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/openSearch/OpenSearchClientImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/openSearch/OpenSearchClientImpl.java index 2c088d005513..c18815790559 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/openSearch/OpenSearchClientImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/openSearch/OpenSearchClientImpl.java @@ -284,8 +284,6 @@ public Response search(SearchRequest request) throws IOException { searchSourceBuilder = buildTableSearchBuilder(request.getQuery(), request.getFrom(), request.getSize()); break; case "user_search_index": - searchSourceBuilder = buildUserOrTeamSearchBuilder(request.getQuery(), request.getFrom(), request.getSize()); - break; case "team_search_index": searchSourceBuilder = buildUserOrTeamSearchBuilder(request.getQuery(), request.getFrom(), request.getSize()); break; @@ -337,7 +335,8 @@ public Response search(SearchRequest request) throws IOException { /* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */ if (request.getIndex().equalsIgnoreCase("domain_search_index") - || request.getIndex().equalsIgnoreCase("data_products_search_index")) { + || request.getIndex().equalsIgnoreCase("data_products_search_index") + || request.getIndex().equalsIgnoreCase("query_search_index")) { searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query())); } else { searchSourceBuilder.query( diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/query_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/query_index_mapping.json index 90c4eac3c0df..ada6cabde6d2 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/query_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/query_index_mapping.json @@ -40,6 +40,18 @@ "id": { "type": "keyword" }, + "checksum": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "processedLineage": { + "type": "boolean" + }, "name": { "type": "text", "fields": { @@ -53,6 +65,43 @@ "type": "keyword", "normalizer": "lowercase_normalizer" }, + "service": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "keyword" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "text" + }, + "href": { + "type": "text" + } + } + }, "displayName": { "type": "text", "analyzer": "om_analyzer", diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java index 2ae42bcb4e27..49e9816ee8ca 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java @@ -76,7 +76,8 @@ public CreateQuery createRequest(String type) { .withQueryUsedIn(List.of(TABLE_REF)) .withQuery(QUERY) .withDuration(0.0) - .withQueryDate(1673857635064L); + .withQueryDate(1673857635064L) + .withService(SNOWFLAKE_REFERENCE.getFullyQualifiedName()); } @Override diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json index de343720ca8d..e5fcebcffcd8 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createQuery.json @@ -81,8 +81,12 @@ "description": "Flag if this query has already been successfully processed for lineage", "type": "boolean", "default": false + }, + "service": { + "description": "Link to the database service fully qualified name where this query has been run", + "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" } }, - "required": ["query"], + "required": ["query", "service"], "additionalProperties": false } \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json index 011f2192d54e..250df5697e79 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/query.json @@ -119,8 +119,12 @@ "description": "Flag if this query has already been successfully processed for lineage", "type": "boolean", "default": false + }, + "service": { + "description": "Link to the service this query belongs to.", + "$ref": "../../type/entityReference.json" } }, - "required": ["name","query"], + "required": ["name", "query", "service"], "additionalProperties": false } diff --git a/openmetadata-ui/src/main/resources/ui/src/mocks/Queries.mock.ts b/openmetadata-ui/src/main/resources/ui/src/mocks/Queries.mock.ts index 7be91180f0dd..2b6cd658e2ea 100644 --- a/openmetadata-ui/src/main/resources/ui/src/mocks/Queries.mock.ts +++ b/openmetadata-ui/src/main/resources/ui/src/mocks/Queries.mock.ts @@ -33,6 +33,11 @@ export const MOCK_QUERIES = [ ], previousVersion: 0.2, }, + service: { + id: '51286e5d-0590-457b-a1ec-bc53c1effa1ee', + type: 'databaseService', + fullyQualifiedName: 'redshift', + }, owner: { id: '471353cb-f925-4c4e-be6c-14da2c0b00ce', type: 'user', @@ -194,6 +199,11 @@ export const MOCK_QUERIES = [ fieldsDeleted: [], previousVersion: 0.1, }, + service: { + id: '44c71a8d-130a-4857-aa88-23bf7e371d5ee', + type: 'databaseService', + fullyQualifiedName: 'redshift', + }, votes: { upVotes: 1, downVotes: 1, diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/AddQueryPage/AddQueryPage.component.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/AddQueryPage/AddQueryPage.component.tsx index 13b95c9ac38f..49daf6f7748d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/AddQueryPage/AddQueryPage.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/AddQueryPage/AddQueryPage.component.tsx @@ -40,7 +40,7 @@ import { useParams } from 'react-router-dom'; import { searchData } from 'rest/miscAPI'; import { postQuery } from 'rest/queryAPI'; import { getTableDetailsByFQN } from 'rest/tableAPI'; -import { getCurrentUserId } from 'utils/CommonUtils'; +import { getCurrentUserId, getPartialNameFromFQN } from 'utils/CommonUtils'; import { getCurrentMillis } from 'utils/date-time/DateTimeUtils'; import { getEntityBreadcrumbs, getEntityName } from 'utils/EntityUtils'; import { showErrorToast, showSuccessToast } from 'utils/ToastUtils'; @@ -54,7 +54,6 @@ const AddQueryPage = () => { TitleBreadcrumbProps['titleLinks'] >([]); const [description, setDescription] = useState(''); - const [sqlQuery, setSqlQuery] = useState(''); const [table, setTable] = useState(); const [initialOptions, setInitialOptions] = useState(); const [isSaving, setIsSaving] = useState(false); @@ -133,7 +132,7 @@ const AddQueryPage = () => { history.back(); }; - const handleSubmit: FormProps['onFinish'] = async (values) => { + const handleSubmit: FormProps['onFinish'] = async (values): Promise => { const updatedValues: CreateQuery = { ...values, description: isEmpty(description) ? undefined : description, @@ -152,6 +151,7 @@ const AddQueryPage = () => { })), ], queryDate: getCurrentMillis(), + service: getPartialNameFromFQN(datasetFQN, ['service']), }; try { @@ -214,15 +214,14 @@ const AddQueryPage = () => { field: t('label.sql-uppercase-query'), }), }, - ]}> + ]} + trigger="onChange"> setSqlQuery(value)} />