From 721103b5cbd3e132200fa2a0721d1d6148cf9c09 Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Tue, 25 Jun 2024 22:26:26 -0400 Subject: [PATCH 1/6] Add delete docs not found when re indexing in catalog task --- .../modules/catalog/indexers/base_indexer.py | 11 +++++++ .../catalog/indexers/catalog_indexer.py | 2 +- .../catalog/tasks/catalog_indexer_task.py | 31 ++++++++++++++++--- .../indexers/dashboard_catalog_indexer.py | 11 +++++-- .../indexers/dataset_catalog_indexer.py | 16 +++++++--- .../s3_datasets/indexers/location_indexer.py | 1 - .../s3_datasets/indexers/table_indexer.py | 1 - .../services/dataset_location_service.py | 3 ++ .../services/dataset_table_service.py | 2 ++ .../s3_datasets/tasks/tables_syncer.py | 2 ++ 10 files changed, 64 insertions(+), 16 deletions(-) diff --git a/backend/dataall/modules/catalog/indexers/base_indexer.py b/backend/dataall/modules/catalog/indexers/base_indexer.py index 05a51d9ad..78cfcc051 100644 --- a/backend/dataall/modules/catalog/indexers/base_indexer.py +++ b/backend/dataall/modules/catalog/indexers/base_indexer.py @@ -52,6 +52,17 @@ def _index(cls, doc_id, doc): log.error(f'ES config is missing doc {doc} for id {doc_id} was not indexed') return False + @classmethod + def search(cls, query): + es = cls.es() + if es: + res = es.search(index=cls._INDEX, body=query) + log.info(f'Search query {query} returned {res["hits"]["total"]["value"]} records') + return res + else: + log.error(f'ES config is missing, search query {query} failed') + return None + @staticmethod def _get_target_glossary_terms(session, target_uri): q = ( diff --git a/backend/dataall/modules/catalog/indexers/catalog_indexer.py b/backend/dataall/modules/catalog/indexers/catalog_indexer.py index 2f27c7df8..035d46b6f 100644 --- a/backend/dataall/modules/catalog/indexers/catalog_indexer.py +++ b/backend/dataall/modules/catalog/indexers/catalog_indexer.py @@ -12,5 +12,5 @@ def __init__(self): def all(): return CatalogIndexer._INDEXERS - def index(self, session) -> int: + def index(self, session) -> List[str]: raise NotImplementedError('index is not implemented') diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index 570c8b6c3..b653e4e36 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -3,6 +3,7 @@ import sys from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer +from dataall.modules.catalog.indexers.base_indexer import BaseIndexer from dataall.base.db import get_engine from dataall.base.loader import load_modules, ImportMode from dataall.base.utils.alarm_service import AlarmService @@ -16,13 +17,33 @@ def index_objects(engine): try: - indexed_objects_counter = 0 + indexed_object_uris = [] with engine.scoped_session() as session: for indexer in CatalogIndexer.all(): - indexed_objects_counter += indexer.index(session) - - log.info(f'Successfully indexed {indexed_objects_counter} objects') - return indexed_objects_counter + indexed_object_uris += indexer.index(session) + + log.info(f'Successfully indexed {len(indexed_object_uris)} objects') + + # Search for documents in opensearch without an ID in the indexed_object_uris list + query = { + "query": { + "bool": { + "must_not": { + "terms": { + "_id": indexed_object_uris + } + } + } + } + } + + docs = BaseIndexer.search(query) + for doc in docs["hits"]["hits"]: + log.info(f'Deleting document {doc["_id"]}...') + BaseIndexer.delete_doc(doc_id=doc["_id"]) + + log.info(f'Deleted {len(indexed_object_uris)} records') + return len(indexed_object_uris) except Exception as e: AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) raise e diff --git a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py index 7cc0884fc..8d09a42b3 100644 --- a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py +++ b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py @@ -1,5 +1,7 @@ import logging +from typing import List + from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer from dataall.modules.dashboards.db.dashboard_models import Dashboard from dataall.modules.dashboards.indexers.dashboard_indexer import DashboardIndexer @@ -8,11 +10,14 @@ class DashboardCatalogIndexer(CatalogIndexer): - def index(self, session) -> int: - all_dashboards: [Dashboard] = session.query(Dashboard).all() + def index(self, session) -> List[str]: + all_dashboards: List[Dashboard] = session.query(Dashboard).all() + all_dashboard_uris = [] + log.info(f'Found {len(all_dashboards)} dashboards') dashboard: Dashboard for dashboard in all_dashboards: + all_dashboard_uris += dashboard.dashboardUri DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri) - return len(all_dashboards) + return all_dashboard_uris diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py index 7140d64f1..bece2e98a 100644 --- a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py @@ -1,5 +1,6 @@ import logging +from typing import List from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.indexers.location_indexer import DatasetLocationIndexer from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer @@ -16,13 +17,18 @@ class DatasetCatalogIndexer(CatalogIndexer): Register automatically itself when CatalogIndexer instance is created """ - def index(self, session) -> int: - all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session) + def index(self, session) -> List[str]: + all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session) + all_doc_uris = [] log.info(f'Found {len(all_datasets)} datasets') - indexed = 0 for dataset in all_datasets: tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri) + all_doc_uris += [table.tableUri for table in tables] + folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + all_doc_uris += [folder.locationUri for folder in folders] + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) - indexed += len(tables) + len(folders) + 1 - return indexed + all_doc_uris.append(dataset.datasetUri) + + return all_doc_uris diff --git a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py index b5216dacb..ff8fa8720 100644 --- a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py @@ -46,7 +46,6 @@ def upsert(cls, session, folder_uri: str): 'glossary': glossary, }, ) - DatasetIndexer.upsert(session=session, dataset_uri=folder.datasetUri) return folder @classmethod diff --git a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py index 15b6320c1..91cf77b07 100644 --- a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py @@ -48,7 +48,6 @@ def upsert(cls, session, table_uri: str): 'glossary': glossary, }, ) - DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri) return table @classmethod diff --git a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py index 9da13bf61..3796c55ea 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py @@ -1,3 +1,4 @@ +from backend.dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.base.context import get_context from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService @@ -48,6 +49,7 @@ def create_storage_location(uri: str, data: dict): S3LocationClient(location, dataset).create_bucket_prefix() DatasetLocationIndexer.upsert(session=session, folder_uri=location.locationUri) + DatasetIndexer.upsert(session, dataset.datasetUri) return location @staticmethod @@ -77,6 +79,7 @@ def update_storage_location(uri: str, data: dict): DatasetLocationService._create_glossary_links(session, location, data['terms']) DatasetLocationIndexer.upsert(session, folder_uri=location.locationUri) + DatasetIndexer.upsert(session, dataset.datasetUri) return location diff --git a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py index 50b63c61e..96f9921b6 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py @@ -9,6 +9,7 @@ from dataall.modules.s3_datasets.aws.glue_dataset_client import DatasetCrawler from dataall.modules.s3_datasets.db.dataset_table_repositories import DatasetTableRepository from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.services.dataset_permissions import ( UPDATE_DATASET_TABLE, MANAGE_DATASETS, @@ -115,6 +116,7 @@ def sync_tables_for_dataset(cls, uri): cls.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables) DatasetTableIndexer.upsert_all(session=session, dataset_uri=dataset.datasetUri) DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri) + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) return DatasetRepository.paginated_dataset_tables( session=session, uri=uri, diff --git a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py index 0fbf52709..e530a9e6a 100644 --- a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py +++ b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py @@ -13,6 +13,7 @@ from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.services.dataset_alarm_service import DatasetAlarmService root = logging.getLogger() @@ -68,6 +69,7 @@ def sync_tables(engine): processed_tables.extend(tables) DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) except Exception as e: log.error( f'Failed to sync tables for dataset ' From 431669f2b38d3fbd07e636985998ee01f1b99bd3 Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Thu, 27 Jun 2024 21:09:42 -0400 Subject: [PATCH 2/6] Add FE and API to start re index task --- .../dataall/modules/catalog/api/mutations.py | 8 + .../dataall/modules/catalog/api/resolvers.py | 5 + .../modules/catalog/handlers/__init__.py | 3 + .../catalog/handlers/ecs_catalog_handlers.py | 27 ++++ .../modules/catalog/indexers/base_indexer.py | 2 +- .../catalog/services/catalog_service.py | 145 ++++++++++++++++++ .../catalog/tasks/catalog_indexer_task.py | 65 ++++---- .../indexers/dashboard_catalog_indexer.py | 2 +- .../indexers/dataset_catalog_indexer.py | 10 +- .../s3_datasets/indexers/dataset_indexer.py | 13 +- .../s3_datasets/indexers/location_indexer.py | 16 +- .../s3_datasets/indexers/table_indexer.py | 13 +- .../services/dataset_location_service.py | 2 +- .../services/dataset_table_service.py | 1 + deploy/stacks/container.py | 14 ++ .../components/MaintenanceViewer.js | 131 +++++++++++++++- .../src/modules/Maintenance/services/index.js | 1 + .../services/startReindexCatalog.js | 10 ++ 18 files changed, 409 insertions(+), 59 deletions(-) create mode 100644 backend/dataall/modules/catalog/handlers/__init__.py create mode 100644 backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py create mode 100644 backend/dataall/modules/catalog/services/catalog_service.py create mode 100644 frontend/src/modules/Maintenance/services/startReindexCatalog.js diff --git a/backend/dataall/modules/catalog/api/mutations.py b/backend/dataall/modules/catalog/api/mutations.py index f16188f53..4df0f2fdf 100644 --- a/backend/dataall/modules/catalog/api/mutations.py +++ b/backend/dataall/modules/catalog/api/mutations.py @@ -7,6 +7,7 @@ create_term, approve_term_association, dismiss_term_association, + start_reindex_catalog, ) @@ -107,3 +108,10 @@ resolver=dismiss_term_association, args=[gql.Argument(name='linkUri', type=gql.NonNullableType(gql.String))], ) + +startReindexCatalog = gql.MutationField( + name='startReindexCatalog', + args=[gql.Argument(name='handleDeletes', type=gql.NonNullableType(gql.Boolean))], + type=gql.Boolean, + resolver=start_reindex_catalog, +) diff --git a/backend/dataall/modules/catalog/api/resolvers.py b/backend/dataall/modules/catalog/api/resolvers.py index 3d8052786..40ccec24d 100644 --- a/backend/dataall/modules/catalog/api/resolvers.py +++ b/backend/dataall/modules/catalog/api/resolvers.py @@ -1,5 +1,6 @@ from dataall.modules.catalog.api.enums import GlossaryRole from dataall.modules.catalog.services.glossaries_service import GlossariesService +from dataall.modules.catalog.services.catalog_service import CatalogService from dataall.base.api.context import Context from dataall.modules.catalog.db.glossary_models import TermLink, GlossaryNode from dataall.base.db import exceptions @@ -157,3 +158,7 @@ def search_glossary(context: Context, source, filter: dict = None): if not filter: filter = {} return GlossariesService.search_glossary_terms(data=filter) + + +def start_reindex_catalog(context: Context, source, handleDeletes: bool): + return CatalogService.start_reindex_catalog(with_deletes=handleDeletes) diff --git a/backend/dataall/modules/catalog/handlers/__init__.py b/backend/dataall/modules/catalog/handlers/__init__.py new file mode 100644 index 000000000..0305d3b87 --- /dev/null +++ b/backend/dataall/modules/catalog/handlers/__init__.py @@ -0,0 +1,3 @@ +from dataall.modules.catalog.handlers import ecs_catalog_handlers + +__all__ = ['ecs_catalog_handler'] diff --git a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py new file mode 100644 index 000000000..99240112f --- /dev/null +++ b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py @@ -0,0 +1,27 @@ +import logging +import os + +from dataall.core.tasks.service_handlers import Worker +from dataall.core.stacks.aws.ecs import Ecs +from dataall.core.tasks.db.task_models import Task +from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask + +log = logging.getLogger(__name__) + + +class EcsMaintenanceHandler: + @staticmethod + @Worker.handler(path='ecs.reindex.catalog') + def run_ecs_reindex_catalog_task(engine, task: Task): + envname = os.environ.get('envname', 'local') + if envname in ['local', 'dkrcompose']: + CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False))) + else: + ecs_task_arn = Ecs.run_ecs_task( + task_definition_param='ecs/task_def_arn/share_management', + container_name_param='ecs/container/share_management', + context=[ + {'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))}, + ], + ) + return {'task_arn': ecs_task_arn} diff --git a/backend/dataall/modules/catalog/indexers/base_indexer.py b/backend/dataall/modules/catalog/indexers/base_indexer.py index 78cfcc051..98ce693e7 100644 --- a/backend/dataall/modules/catalog/indexers/base_indexer.py +++ b/backend/dataall/modules/catalog/indexers/base_indexer.py @@ -61,7 +61,7 @@ def search(cls, query): return res else: log.error(f'ES config is missing, search query {query} failed') - return None + return {} @staticmethod def _get_target_glossary_terms(session, target_uri): diff --git a/backend/dataall/modules/catalog/services/catalog_service.py b/backend/dataall/modules/catalog/services/catalog_service.py new file mode 100644 index 000000000..b46e6b755 --- /dev/null +++ b/backend/dataall/modules/catalog/services/catalog_service.py @@ -0,0 +1,145 @@ +import logging + +from dataall.base.context import get_context +from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService + +from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository +from dataall.modules.catalog.db.glossary_models import GlossaryNode +from dataall.modules.catalog.services.glossaries_permissions import MANAGE_GLOSSARIES +from dataall.modules.catalog.indexers.registry import GlossaryRegistry +from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService +from dataall.core.tasks.db.task_models import Task +from dataall.core.tasks.service_handlers import Worker + + +logger = logging.getLogger(__name__) + + +def _session(): + return get_context().db_engine.scoped_session() + + +class CatalogService: + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) + def create_glossary(data: dict = None) -> GlossaryNode: + with _session() as session: + return GlossaryRepository.create_glossary(session=session, data=data) + + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) + def create_category(uri: str, data: dict = None): + with _session() as session: + return GlossaryRepository.create_category(session=session, uri=uri, data=data) + + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) + def create_term(uri: str, data: dict = None): + with _session() as session: + return GlossaryRepository.create_term(session=session, uri=uri, data=data) + + @staticmethod + def list_glossaries(data: dict = None): + with _session() as session: + return GlossaryRepository.list_glossaries(session=session, data=data) + + @staticmethod + def list_categories(uri: str, data: dict = None): + with _session() as session: + return GlossaryRepository.list_categories(session=session, uri=uri, data=data) + + @staticmethod + def list_terms(uri: str, data: dict = None): + with _session() as session: + return GlossaryRepository.list_terms(session=session, uri=uri, data=data) + + @staticmethod + def list_node_children(path: str, filter: dict = None): + with _session() as session: + return GlossaryRepository.list_node_children(session=session, path=path, filter=filter) + + @staticmethod + def get_node_tree(path: str, filter: dict = None): + with _session() as session: + return GlossaryRepository.get_node_tree(session=session, path=path, filter=filter) + + @staticmethod + def get_node_link_to_target( + uri: str, + targetUri: str, + ): + with _session() as session: + return GlossaryRepository.get_node_link_to_target( + session=session, username=get_context().username, uri=uri, targetUri=targetUri + ) + + @staticmethod + def get_glossary_categories_terms_and_associations(path: str): + with _session() as session: + return GlossaryRepository.get_glossary_categories_terms_and_associations(session=session, path=path) + + @staticmethod + def list_term_associations(node: GlossaryNode, filter: dict = None): + with _session() as session: + return GlossaryRepository.list_term_associations( + session=session, node=node, filter=filter, target_model_definitions=GlossaryRegistry.definitions() + ) + + @staticmethod + def get_node(uri: str): + with _session() as session: + return GlossaryRepository.get_node(session=session, uri=uri) + + @staticmethod + def get_link_target(targetUri: str, targetType: str): + with _session() as session: + model = GlossaryRegistry.find_model(targetType) + target = session.query(model).get(targetUri) + return target + + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) + def update_node(uri: str = None, data: dict = None): + with _session() as session: + return GlossaryRepository.update_node(session=session, uri=uri, data=data) + + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) + def delete_node(uri: str = None): + with _session() as session: + return GlossaryRepository.delete_node(session=session, uri=uri) + + @staticmethod + def approve_term_association(linkUri: str): + with _session() as session: + return GlossaryRepository.approve_term_association( + session=session, username=get_context().username, groups=get_context().groups, linkUri=linkUri + ) + + @staticmethod + def dismiss_term_association(linkUri: str): + with _session() as session: + return GlossaryRepository.dismiss_term_association( + session=session, username=get_context().username, groups=get_context().groups, linkUri=linkUri + ) + + @staticmethod + def search_glossary_terms(data: dict = None): + with _session() as session: + return GlossaryRepository.search_glossary_terms(session=session, data=data) + + @staticmethod + def start_reindex_catalog(with_deletes: bool) -> bool: + context = get_context() + groups = context.groups if context.groups is not None else [] + if not TenantPolicyValidationService.is_tenant_admin(groups): + raise Exception('Only data.all admin group members can start re-index catalog task') + + with context.db_engine.scoped_session() as session: + reindex_catalog_task: Task = Task( + action='ecs.reindex.catalog', targetUri='ALL', payload={'with_deletes': with_deletes} + ) + session.add(reindex_catalog_task) + + Worker.queue(engine=context.db_engine, task_ids=[reindex_catalog_task.taskUri]) + return True diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index b653e4e36..0a6825023 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -1,6 +1,7 @@ import logging import os import sys +from typing import List from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer from dataall.modules.catalog.indexers.base_indexer import BaseIndexer @@ -15,43 +16,41 @@ log = logging.getLogger(__name__) -def index_objects(engine): - try: - indexed_object_uris = [] - with engine.scoped_session() as session: - for indexer in CatalogIndexer.all(): - indexed_object_uris += indexer.index(session) - - log.info(f'Successfully indexed {len(indexed_object_uris)} objects') - - # Search for documents in opensearch without an ID in the indexed_object_uris list - query = { - "query": { - "bool": { - "must_not": { - "terms": { - "_id": indexed_object_uris - } - } - } - } - } - - docs = BaseIndexer.search(query) - for doc in docs["hits"]["hits"]: - log.info(f'Deleting document {doc["_id"]}...') - BaseIndexer.delete_doc(doc_id=doc["_id"]) - - log.info(f'Deleted {len(indexed_object_uris)} records') - return len(indexed_object_uris) - except Exception as e: - AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) - raise e +class CatalogIndexerTask: + @classmethod + def index_objects(cls, engine, with_deletes): + try: + indexed_object_uris = [] + with engine.scoped_session() as session: + for indexer in CatalogIndexer.all(): + indexed_object_uris += indexer.index(session) + + log.info(f'Successfully indexed {len(indexed_object_uris)} objects') + + if with_deletes == 'True': + CatalogIndexerTask._delete_old_objects(indexed_object_uris) + return len(indexed_object_uris) + except Exception as e: + AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) + raise e + + @classmethod + def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None: + # Search for documents in opensearch without an ID in the indexed_object_uris list + query = {'query': {'bool': {'must_not': {'terms': {'_id': indexed_object_uris}}}}} + # Delete All "Outdated" Objects from Index + docs = BaseIndexer.search(query) + for doc in docs.get('hits', {}).get('hits', []): + log.info(f'Deleting document {doc["_id"]}...') + BaseIndexer.delete_doc(doc_id=doc['_id']) + + log.info(f'Deleted {len(docs.get("hits", {}).get("hits", []))} records') if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) + with_deletes = os.environ.get('with_deletes', 'False') load_modules({ImportMode.CATALOG_INDEXER_TASK}) - index_objects(engine=ENGINE) + CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes) diff --git a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py index 8d09a42b3..474d6dc5d 100644 --- a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py +++ b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py @@ -17,7 +17,7 @@ def index(self, session) -> List[str]: log.info(f'Found {len(all_dashboards)} dashboards') dashboard: Dashboard for dashboard in all_dashboards: - all_dashboard_uris += dashboard.dashboardUri + all_dashboard_uris.append(dashboard.dashboardUri) DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri) return all_dashboard_uris diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py index bece2e98a..3e4853bca 100644 --- a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py @@ -19,16 +19,16 @@ class DatasetCatalogIndexer(CatalogIndexer): def index(self, session) -> List[str]: all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session) - all_doc_uris = [] + all_dataset_uris = [] log.info(f'Found {len(all_datasets)} datasets') for dataset in all_datasets: tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri) - all_doc_uris += [table.tableUri for table in tables] + all_dataset_uris += [table.tableUri for table in tables] folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) - all_doc_uris += [folder.locationUri for folder in folders] + all_dataset_uris += [folder.locationUri for folder in folders] DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) - all_doc_uris.append(dataset.datasetUri) + all_dataset_uris.append(dataset.datasetUri) - return all_doc_uris + return all_dataset_uris diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py index e245bcea1..2d2a8c5d5 100644 --- a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py @@ -14,14 +14,15 @@ class DatasetIndexer(BaseIndexer): @classmethod def upsert(cls, session, dataset_uri: str): dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) - - count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri) - count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri) - count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset') if dataset: + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + + count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri) + count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri) + count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset') + glossary = BaseIndexer._get_target_glossary_terms(session, dataset_uri) BaseIndexer._index( doc_id=dataset_uri, diff --git a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py index ff8fa8720..82170e8f9 100644 --- a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py @@ -6,19 +6,18 @@ from dataall.core.organizations.db.organization_repositories import OrganizationRepository from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository -from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.catalog.indexers.base_indexer import BaseIndexer class DatasetLocationIndexer(BaseIndexer): @classmethod - def upsert(cls, session, folder_uri: str): + def upsert(cls, session, folder_uri: str, dataset=None, env=None, org=None): folder = DatasetLocationRepository.get_location_by_uri(session, folder_uri) if folder: - dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) if not dataset else dataset + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org glossary = BaseIndexer._get_target_glossary_terms(session, folder_uri) BaseIndexer._index( @@ -51,6 +50,11 @@ def upsert(cls, session, folder_uri: str): @classmethod def upsert_all(cls, session, dataset_uri: str): folders = DatasetLocationRepository.get_dataset_folders(session, dataset_uri) + dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) for folder in folders: - DatasetLocationIndexer.upsert(session=session, folder_uri=folder.locationUri) + DatasetLocationIndexer.upsert( + session=session, folder_uri=folder.locationUri, dataset=dataset, env=env, org=org + ) return folders diff --git a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py index 91cf77b07..cf24c4852 100644 --- a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py @@ -12,13 +12,13 @@ class DatasetTableIndexer(BaseIndexer): @classmethod - def upsert(cls, session, table_uri: str): + def upsert(cls, session, table_uri: str, dataset=None, env=None, org=None): table = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri) if table: - dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) if not dataset else dataset + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org glossary = BaseIndexer._get_target_glossary_terms(session, table_uri) tags = table.tags if table.tags else [] @@ -53,8 +53,11 @@ def upsert(cls, session, table_uri: str): @classmethod def upsert_all(cls, session, dataset_uri: str): tables = DatasetTableRepository.find_all_active_tables(session, dataset_uri) + dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) for table in tables: - DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri) + DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri, dataset=dataset, env=env, org=org) return tables @classmethod diff --git a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py index 3796c55ea..7e90d52fd 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py @@ -79,7 +79,7 @@ def update_storage_location(uri: str, data: dict): DatasetLocationService._create_glossary_links(session, location, data['terms']) DatasetLocationIndexer.upsert(session, folder_uri=location.locationUri) - DatasetIndexer.upsert(session, dataset.datasetUri) + DatasetIndexer.upsert(session, location.datasetUri) return location diff --git a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py index 96f9921b6..804156912 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py @@ -59,6 +59,7 @@ def update_table(uri: str, table_data: dict = None): ) DatasetTableIndexer.upsert(session, table_uri=table.tableUri) + DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri) return table @staticmethod diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index aebd00d7d..6429bd0fe 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -198,6 +198,20 @@ def add_catalog_indexer_task(self): prod_sizing=self._prod_sizing, ) + ssm.StringParameter( + self, + f'CatalogIndexerTaskARNSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/task_def_arn/catalog_indexer', + string_value=catalog_indexer_task_def.task_definition_arn, + ) + + ssm.StringParameter( + self, + f'CatalogIndexerTaskContainerSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/container/catalog_indexer', + string_value=catalog_indexer_task.container_name, + ) + self.ecs_task_definitions_families.append(catalog_indexer_task.task_definition.family) @run_if(['modules.s3_datasets.active']) diff --git a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js index 2feb3713e..1cd0a323f 100644 --- a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js +++ b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js @@ -10,6 +10,7 @@ import { IconButton, MenuItem, TextField, + Switch, Typography } from '@mui/material'; import React, { useCallback, useEffect, useState } from 'react'; @@ -19,7 +20,8 @@ import { Label } from 'design'; import { getMaintenanceStatus, stopMaintenanceWindow, - startMaintenanceWindow + startMaintenanceWindow, + startReindexCatalog } from '../services'; import { useClient } from 'services'; import { SET_ERROR, useDispatch } from 'globalErrors'; @@ -162,12 +164,112 @@ export const MaintenanceConfirmationPopUp = (props) => { ); }; +export const ReIndexConfirmationPopUp = (props) => { + const { popUpReIndex, setPopUpReIndex, setUpdatingReIndex } = props; + const client = useClient(); + const dispatch = useDispatch(); + const { enqueueSnackbar } = useSnackbar(); + const [withDelete, setWithDelete] = useState(false); + + const handlePopUpModal = async () => { + setUpdatingReIndex(true); + if (!client) { + dispatch({ + type: SET_ERROR, + error: 'Client not initialized for re-indexing catalog task' + }); + } + const response = await client.mutate( + startReindexCatalog({ handleDeletes: withDelete }) + ); + if (!response.errors && response.data.startReindexCatalog != null) { + const respData = response.data.startReindexCatalog; + if (respData === true) { + enqueueSnackbar('Re Index Task has Started. Please check the status', { + anchorOrigin: { + horizontal: 'right', + vertical: 'top' + }, + variant: 'success' + }); + } else { + enqueueSnackbar('Could not start re index task', { + anchorOrigin: { + horizontal: 'right', + vertical: 'top' + }, + variant: 'success' + }); + } + } else { + const error = response.errors + ? response.errors[0].message + : 'Something went wrong while starting re index task. Please check gql logs'; + dispatch({ type: SET_ERROR, error }); + } + setPopUpReIndex(false); + setUpdatingReIndex(false); + }; + + return ( + + + + + Are you sure you want to start re-indexing the ENTIRE data.all + Catalog? + + } + /> + + { + setWithDelete(!withDelete); + }} + edge="start" + name="withDelete" + /> + + + + + + + + ); +}; + export const MaintenanceViewer = () => { const client = useClient(); const [refreshing, setRefreshing] = useState(false); + const [refreshingReIndex, setRefreshingReIndex] = useState(false); + const [updatingReIndex, setUpdatingReIndex] = useState(false); const [updating, setUpdating] = useState(false); const [mode, setMode] = useState(''); const [popUp, setPopUp] = useState(false); + const [popUpReIndex, setPopUpReIndex] = useState(false); const [confirmedMode, setConfirmedMode] = useState(''); const [maintenanceButtonText, setMaintenanceButtonText] = useState(START_MAINTENANCE); @@ -339,6 +441,33 @@ export const MaintenanceViewer = () => { return ( + {refreshingReIndex ? ( + + ) : ( + + + Re-Index Data.all Catalog} /> + + + setPopUpReIndex(true)} + startIcon={} + sx={{ m: 1 }} + variant="contained" + > + Start Re-Index Catalog Task + + + + + + )} {refreshing ? ( ) : ( diff --git a/frontend/src/modules/Maintenance/services/index.js b/frontend/src/modules/Maintenance/services/index.js index ca55e15e2..0f057a7e6 100644 --- a/frontend/src/modules/Maintenance/services/index.js +++ b/frontend/src/modules/Maintenance/services/index.js @@ -1,3 +1,4 @@ export * from './getMaintenanceStatus'; export * from './stopMaintenanceWindow'; export * from './startMaintenanceWindow'; +export * from './startReindexCatalog'; diff --git a/frontend/src/modules/Maintenance/services/startReindexCatalog.js b/frontend/src/modules/Maintenance/services/startReindexCatalog.js new file mode 100644 index 000000000..cfde0a568 --- /dev/null +++ b/frontend/src/modules/Maintenance/services/startReindexCatalog.js @@ -0,0 +1,10 @@ +import { gql } from 'apollo-boost'; + +export const startReindexCatalog = ({ handleDeletes }) => ({ + variables: { handleDeletes }, + mutation: gql` + mutation startReindexCatalog($handleDeletes: Boolean!) { + startReindexCatalog(handleDeletes: $handleDeletes) + } + ` +}); From 8d238f97982f51053e6a796f86dbeb908ab044c3 Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Thu, 27 Jun 2024 21:15:27 -0400 Subject: [PATCH 3/6] Clean up Catalog Service --- .../catalog/services/catalog_service.py | 108 ------------------ 1 file changed, 108 deletions(-) diff --git a/backend/dataall/modules/catalog/services/catalog_service.py b/backend/dataall/modules/catalog/services/catalog_service.py index b46e6b755..92ae33d67 100644 --- a/backend/dataall/modules/catalog/services/catalog_service.py +++ b/backend/dataall/modules/catalog/services/catalog_service.py @@ -20,114 +20,6 @@ def _session(): class CatalogService: - @staticmethod - @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) - def create_glossary(data: dict = None) -> GlossaryNode: - with _session() as session: - return GlossaryRepository.create_glossary(session=session, data=data) - - @staticmethod - @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) - def create_category(uri: str, data: dict = None): - with _session() as session: - return GlossaryRepository.create_category(session=session, uri=uri, data=data) - - @staticmethod - @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) - def create_term(uri: str, data: dict = None): - with _session() as session: - return GlossaryRepository.create_term(session=session, uri=uri, data=data) - - @staticmethod - def list_glossaries(data: dict = None): - with _session() as session: - return GlossaryRepository.list_glossaries(session=session, data=data) - - @staticmethod - def list_categories(uri: str, data: dict = None): - with _session() as session: - return GlossaryRepository.list_categories(session=session, uri=uri, data=data) - - @staticmethod - def list_terms(uri: str, data: dict = None): - with _session() as session: - return GlossaryRepository.list_terms(session=session, uri=uri, data=data) - - @staticmethod - def list_node_children(path: str, filter: dict = None): - with _session() as session: - return GlossaryRepository.list_node_children(session=session, path=path, filter=filter) - - @staticmethod - def get_node_tree(path: str, filter: dict = None): - with _session() as session: - return GlossaryRepository.get_node_tree(session=session, path=path, filter=filter) - - @staticmethod - def get_node_link_to_target( - uri: str, - targetUri: str, - ): - with _session() as session: - return GlossaryRepository.get_node_link_to_target( - session=session, username=get_context().username, uri=uri, targetUri=targetUri - ) - - @staticmethod - def get_glossary_categories_terms_and_associations(path: str): - with _session() as session: - return GlossaryRepository.get_glossary_categories_terms_and_associations(session=session, path=path) - - @staticmethod - def list_term_associations(node: GlossaryNode, filter: dict = None): - with _session() as session: - return GlossaryRepository.list_term_associations( - session=session, node=node, filter=filter, target_model_definitions=GlossaryRegistry.definitions() - ) - - @staticmethod - def get_node(uri: str): - with _session() as session: - return GlossaryRepository.get_node(session=session, uri=uri) - - @staticmethod - def get_link_target(targetUri: str, targetType: str): - with _session() as session: - model = GlossaryRegistry.find_model(targetType) - target = session.query(model).get(targetUri) - return target - - @staticmethod - @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) - def update_node(uri: str = None, data: dict = None): - with _session() as session: - return GlossaryRepository.update_node(session=session, uri=uri, data=data) - - @staticmethod - @TenantPolicyService.has_tenant_permission(MANAGE_GLOSSARIES) - def delete_node(uri: str = None): - with _session() as session: - return GlossaryRepository.delete_node(session=session, uri=uri) - - @staticmethod - def approve_term_association(linkUri: str): - with _session() as session: - return GlossaryRepository.approve_term_association( - session=session, username=get_context().username, groups=get_context().groups, linkUri=linkUri - ) - - @staticmethod - def dismiss_term_association(linkUri: str): - with _session() as session: - return GlossaryRepository.dismiss_term_association( - session=session, username=get_context().username, groups=get_context().groups, linkUri=linkUri - ) - - @staticmethod - def search_glossary_terms(data: dict = None): - with _session() as session: - return GlossaryRepository.search_glossary_terms(session=session, data=data) - @staticmethod def start_reindex_catalog(with_deletes: bool) -> bool: context = get_context() From 6fa937af88c2c4aea5cf4cd20df9bf0e2bf426ac Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Thu, 27 Jun 2024 23:36:37 -0400 Subject: [PATCH 4/6] Add module interface and fix imports --- backend/dataall/modules/catalog/__init__.py | 11 +++++++++++ backend/dataall/modules/catalog/handlers/__init__.py | 2 +- .../modules/catalog/handlers/ecs_catalog_handlers.py | 2 +- .../modules/catalog/tasks/catalog_indexer_task.py | 4 ++++ backend/dataall/modules/s3_datasets/__init__.py | 3 ++- .../s3_datasets/services/dataset_location_service.py | 2 +- .../Maintenance/components/MaintenanceViewer.js | 4 ++-- 7 files changed, 22 insertions(+), 6 deletions(-) diff --git a/backend/dataall/modules/catalog/__init__.py b/backend/dataall/modules/catalog/__init__.py index c02f06803..1444359ed 100644 --- a/backend/dataall/modules/catalog/__init__.py +++ b/backend/dataall/modules/catalog/__init__.py @@ -16,6 +16,17 @@ def __init__(self): from dataall.modules.catalog import tasks +class CatalogAsyncHandlersModuleInterface(ModuleInterface): + """Implements ModuleInterface for datapipelines async lambda""" + + @staticmethod + def is_supported(modes: Set[ImportMode]): + return ImportMode.HANDLERS in modes + + def __init__(self): + import dataall.modules.catalog.handlers + + class CatalogApiModuleInterface(ModuleInterface): """ Implements ModuleInterface for catalog code in GraphQl lambda. diff --git a/backend/dataall/modules/catalog/handlers/__init__.py b/backend/dataall/modules/catalog/handlers/__init__.py index 0305d3b87..a1f6ee221 100644 --- a/backend/dataall/modules/catalog/handlers/__init__.py +++ b/backend/dataall/modules/catalog/handlers/__init__.py @@ -1,3 +1,3 @@ from dataall.modules.catalog.handlers import ecs_catalog_handlers -__all__ = ['ecs_catalog_handler'] +__all__ = ['ecs_catalog_handlers'] diff --git a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py index 99240112f..b215a933f 100644 --- a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py +++ b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py @@ -9,7 +9,7 @@ log = logging.getLogger(__name__) -class EcsMaintenanceHandler: +class EcsCatalogIndexHandler: @staticmethod @Worker.handler(path='ecs.reindex.catalog') def run_ecs_reindex_catalog_task(engine, task: Task): diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index 0a6825023..ed57eea8c 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -17,6 +17,10 @@ class CatalogIndexerTask: + """ + This class is responsible for indexing objects in the catalog. + """ + @classmethod def index_objects(cls, engine, with_deletes): try: diff --git a/backend/dataall/modules/s3_datasets/__init__.py b/backend/dataall/modules/s3_datasets/__init__.py index c828e7993..f0b73a6d0 100644 --- a/backend/dataall/modules/s3_datasets/__init__.py +++ b/backend/dataall/modules/s3_datasets/__init__.py @@ -90,8 +90,9 @@ def is_supported(modes: Set[ImportMode]): @staticmethod def depends_on() -> List[Type['ModuleInterface']]: from dataall.modules.datasets_base import DatasetBaseModuleInterface + from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface - return [DatasetBaseModuleInterface] + return [DatasetBaseModuleInterface, CatalogAsyncHandlersModuleInterface] def __init__(self): import dataall.modules.s3_datasets.handlers diff --git a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py index 7e90d52fd..a4ac2b33f 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py @@ -1,4 +1,4 @@ -from backend.dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.base.context import get_context from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService diff --git a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js index 1cd0a323f..99c022145 100644 --- a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js +++ b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js @@ -226,7 +226,6 @@ export const ReIndexConfirmationPopUp = (props) => { { setWithDelete(!withDelete); }} @@ -264,7 +263,8 @@ export const ReIndexConfirmationPopUp = (props) => { export const MaintenanceViewer = () => { const client = useClient(); const [refreshing, setRefreshing] = useState(false); - const [refreshingReIndex, setRefreshingReIndex] = useState(false); + // const [refreshingReIndex, setRefreshingReIndex] = useState(false); + const refreshingReIndex = false; const [updatingReIndex, setUpdatingReIndex] = useState(false); const [updating, setUpdating] = useState(false); const [mode, setMode] = useState(''); From bb111c431920223a2a856d21fb701b5549408440 Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Fri, 28 Jun 2024 09:52:23 -0400 Subject: [PATCH 5/6] fix import catalog indexers --- .../dataall/modules/catalog/tasks/catalog_indexer_task.py | 3 +-- backend/local_graphql_server.py | 2 +- deploy/stacks/container.py | 5 +++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index ed57eea8c..74fab7e3d 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -52,9 +52,8 @@ def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None: if __name__ == '__main__': + load_modules({ImportMode.CATALOG_INDEXER_TASK}) ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - with_deletes = os.environ.get('with_deletes', 'False') - load_modules({ImportMode.CATALOG_INDEXER_TASK}) CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes) diff --git a/backend/local_graphql_server.py b/backend/local_graphql_server.py index 339016b42..1ea96a732 100644 --- a/backend/local_graphql_server.py +++ b/backend/local_graphql_server.py @@ -31,7 +31,7 @@ es = connect(envname=ENVNAME) logger.info('Connected') # create_schema_and_tables(engine, envname=ENVNAME) -load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK}) +load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK, ImportMode.CATALOG_INDEXER_TASK}) Base.metadata.create_all(engine.engine) CDKPROXY_URL = 'http://cdkproxy:2805' if ENVNAME == 'dkrcompose' else 'http://localhost:2805' config.set_property('cdk_proxy_url', CDKPROXY_URL) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index 6429bd0fe..a9981b621 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -181,10 +181,11 @@ def __init__( @run_if(['modules.s3_datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): + container_id = 'container' catalog_indexer_task, catalog_indexer_task_def = self.set_scheduled_task( cluster=self.ecs_cluster, command=['python3.9', '-m', 'dataall.modules.catalog.tasks.catalog_indexer_task'], - container_id='container', + container_id=container_id, ecr_repository=self._ecr_repository, environment=self._create_env('INFO'), image_tag=self._cdkproxy_image_tag, @@ -209,7 +210,7 @@ def add_catalog_indexer_task(self): self, f'CatalogIndexerTaskContainerSSM{self._envname}', parameter_name=f'/dataall/{self._envname}/ecs/container/catalog_indexer', - string_value=catalog_indexer_task.container_name, + string_value=container_id, ) self.ecs_task_definitions_families.append(catalog_indexer_task.task_definition.family) From 4a2524376372fe223fcff83467132c241abb80d4 Mon Sep 17 00:00:00 2001 From: Noah Paige Date: Fri, 28 Jun 2024 10:03:41 -0400 Subject: [PATCH 6/6] Fix Tests --- backend/dataall/modules/catalog/tasks/catalog_indexer_task.py | 2 +- .../modules/s3_datasets/tasks/test_dataset_catalog_indexer.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index 74fab7e3d..7fd628465 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -22,7 +22,7 @@ class CatalogIndexerTask: """ @classmethod - def index_objects(cls, engine, with_deletes): + def index_objects(cls, engine, with_deletes='False'): try: indexed_object_uris = [] with engine.scoped_session() as session: diff --git a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py index 7b67c3335..259a05d1c 100644 --- a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py +++ b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py @@ -1,6 +1,6 @@ import pytest -from dataall.modules.catalog.tasks.catalog_indexer_task import index_objects +from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset @@ -54,6 +54,6 @@ def test_catalog_indexer(db, org, env, sync_dataset, table, mocker): mocker.patch( 'dataall.modules.s3_datasets.indexers.dataset_indexer.DatasetIndexer.upsert', return_value=sync_dataset ) - indexed_objects_counter = index_objects(engine=db) + indexed_objects_counter = CatalogIndexerTask.index_objects(engine=db) # Count should be One table + One Dataset = 2 assert indexed_objects_counter == 2