diff --git a/backend/dataall/modules/catalog/__init__.py b/backend/dataall/modules/catalog/__init__.py
index c02f06803..1444359ed 100644
--- a/backend/dataall/modules/catalog/__init__.py
+++ b/backend/dataall/modules/catalog/__init__.py
@@ -16,6 +16,17 @@ def __init__(self):
from dataall.modules.catalog import tasks
+class CatalogAsyncHandlersModuleInterface(ModuleInterface):
+ """Implements ModuleInterface for datapipelines async lambda"""
+
+ @staticmethod
+ def is_supported(modes: Set[ImportMode]):
+ return ImportMode.HANDLERS in modes
+
+ def __init__(self):
+ import dataall.modules.catalog.handlers
+
+
class CatalogApiModuleInterface(ModuleInterface):
"""
Implements ModuleInterface for catalog code in GraphQl lambda.
diff --git a/backend/dataall/modules/catalog/api/mutations.py b/backend/dataall/modules/catalog/api/mutations.py
index f16188f53..4df0f2fdf 100644
--- a/backend/dataall/modules/catalog/api/mutations.py
+++ b/backend/dataall/modules/catalog/api/mutations.py
@@ -7,6 +7,7 @@
create_term,
approve_term_association,
dismiss_term_association,
+ start_reindex_catalog,
)
@@ -107,3 +108,10 @@
resolver=dismiss_term_association,
args=[gql.Argument(name='linkUri', type=gql.NonNullableType(gql.String))],
)
+
+startReindexCatalog = gql.MutationField(
+ name='startReindexCatalog',
+ args=[gql.Argument(name='handleDeletes', type=gql.NonNullableType(gql.Boolean))],
+ type=gql.Boolean,
+ resolver=start_reindex_catalog,
+)
diff --git a/backend/dataall/modules/catalog/api/resolvers.py b/backend/dataall/modules/catalog/api/resolvers.py
index 3d8052786..40ccec24d 100644
--- a/backend/dataall/modules/catalog/api/resolvers.py
+++ b/backend/dataall/modules/catalog/api/resolvers.py
@@ -1,5 +1,6 @@
from dataall.modules.catalog.api.enums import GlossaryRole
from dataall.modules.catalog.services.glossaries_service import GlossariesService
+from dataall.modules.catalog.services.catalog_service import CatalogService
from dataall.base.api.context import Context
from dataall.modules.catalog.db.glossary_models import TermLink, GlossaryNode
from dataall.base.db import exceptions
@@ -157,3 +158,7 @@ def search_glossary(context: Context, source, filter: dict = None):
if not filter:
filter = {}
return GlossariesService.search_glossary_terms(data=filter)
+
+
+def start_reindex_catalog(context: Context, source, handleDeletes: bool):
+ return CatalogService.start_reindex_catalog(with_deletes=handleDeletes)
diff --git a/backend/dataall/modules/catalog/handlers/__init__.py b/backend/dataall/modules/catalog/handlers/__init__.py
new file mode 100644
index 000000000..a1f6ee221
--- /dev/null
+++ b/backend/dataall/modules/catalog/handlers/__init__.py
@@ -0,0 +1,3 @@
+from dataall.modules.catalog.handlers import ecs_catalog_handlers
+
+__all__ = ['ecs_catalog_handlers']
diff --git a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
new file mode 100644
index 000000000..b215a933f
--- /dev/null
+++ b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
@@ -0,0 +1,27 @@
+import logging
+import os
+
+from dataall.core.tasks.service_handlers import Worker
+from dataall.core.stacks.aws.ecs import Ecs
+from dataall.core.tasks.db.task_models import Task
+from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask
+
+log = logging.getLogger(__name__)
+
+
+class EcsCatalogIndexHandler:
+ @staticmethod
+ @Worker.handler(path='ecs.reindex.catalog')
+ def run_ecs_reindex_catalog_task(engine, task: Task):
+ envname = os.environ.get('envname', 'local')
+ if envname in ['local', 'dkrcompose']:
+ CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False)))
+ else:
+ ecs_task_arn = Ecs.run_ecs_task(
+ task_definition_param='ecs/task_def_arn/share_management',
+ container_name_param='ecs/container/share_management',
+ context=[
+ {'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))},
+ ],
+ )
+ return {'task_arn': ecs_task_arn}
diff --git a/backend/dataall/modules/catalog/indexers/base_indexer.py b/backend/dataall/modules/catalog/indexers/base_indexer.py
index 05a51d9ad..98ce693e7 100644
--- a/backend/dataall/modules/catalog/indexers/base_indexer.py
+++ b/backend/dataall/modules/catalog/indexers/base_indexer.py
@@ -52,6 +52,17 @@ def _index(cls, doc_id, doc):
log.error(f'ES config is missing doc {doc} for id {doc_id} was not indexed')
return False
+ @classmethod
+ def search(cls, query):
+ es = cls.es()
+ if es:
+ res = es.search(index=cls._INDEX, body=query)
+ log.info(f'Search query {query} returned {res["hits"]["total"]["value"]} records')
+ return res
+ else:
+ log.error(f'ES config is missing, search query {query} failed')
+ return {}
+
@staticmethod
def _get_target_glossary_terms(session, target_uri):
q = (
diff --git a/backend/dataall/modules/catalog/indexers/catalog_indexer.py b/backend/dataall/modules/catalog/indexers/catalog_indexer.py
index 2f27c7df8..035d46b6f 100644
--- a/backend/dataall/modules/catalog/indexers/catalog_indexer.py
+++ b/backend/dataall/modules/catalog/indexers/catalog_indexer.py
@@ -12,5 +12,5 @@ def __init__(self):
def all():
return CatalogIndexer._INDEXERS
- def index(self, session) -> int:
+ def index(self, session) -> List[str]:
raise NotImplementedError('index is not implemented')
diff --git a/backend/dataall/modules/catalog/services/catalog_service.py b/backend/dataall/modules/catalog/services/catalog_service.py
new file mode 100644
index 000000000..92ae33d67
--- /dev/null
+++ b/backend/dataall/modules/catalog/services/catalog_service.py
@@ -0,0 +1,37 @@
+import logging
+
+from dataall.base.context import get_context
+from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
+
+from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository
+from dataall.modules.catalog.db.glossary_models import GlossaryNode
+from dataall.modules.catalog.services.glossaries_permissions import MANAGE_GLOSSARIES
+from dataall.modules.catalog.indexers.registry import GlossaryRegistry
+from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService
+from dataall.core.tasks.db.task_models import Task
+from dataall.core.tasks.service_handlers import Worker
+
+
+logger = logging.getLogger(__name__)
+
+
+def _session():
+ return get_context().db_engine.scoped_session()
+
+
+class CatalogService:
+ @staticmethod
+ def start_reindex_catalog(with_deletes: bool) -> bool:
+ context = get_context()
+ groups = context.groups if context.groups is not None else []
+ if not TenantPolicyValidationService.is_tenant_admin(groups):
+ raise Exception('Only data.all admin group members can start re-index catalog task')
+
+ with context.db_engine.scoped_session() as session:
+ reindex_catalog_task: Task = Task(
+ action='ecs.reindex.catalog', targetUri='ALL', payload={'with_deletes': with_deletes}
+ )
+ session.add(reindex_catalog_task)
+
+ Worker.queue(engine=context.db_engine, task_ids=[reindex_catalog_task.taskUri])
+ return True
diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
index 570c8b6c3..7fd628465 100644
--- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
+++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
@@ -1,8 +1,10 @@
import logging
import os
import sys
+from typing import List
from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
+from dataall.modules.catalog.indexers.base_indexer import BaseIndexer
from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.base.utils.alarm_service import AlarmService
@@ -14,23 +16,44 @@
log = logging.getLogger(__name__)
-def index_objects(engine):
- try:
- indexed_objects_counter = 0
- with engine.scoped_session() as session:
- for indexer in CatalogIndexer.all():
- indexed_objects_counter += indexer.index(session)
+class CatalogIndexerTask:
+ """
+ This class is responsible for indexing objects in the catalog.
+ """
- log.info(f'Successfully indexed {indexed_objects_counter} objects')
- return indexed_objects_counter
- except Exception as e:
- AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
- raise e
+ @classmethod
+ def index_objects(cls, engine, with_deletes='False'):
+ try:
+ indexed_object_uris = []
+ with engine.scoped_session() as session:
+ for indexer in CatalogIndexer.all():
+ indexed_object_uris += indexer.index(session)
+
+ log.info(f'Successfully indexed {len(indexed_object_uris)} objects')
+
+ if with_deletes == 'True':
+ CatalogIndexerTask._delete_old_objects(indexed_object_uris)
+ return len(indexed_object_uris)
+ except Exception as e:
+ AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
+ raise e
+
+ @classmethod
+ def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None:
+ # Search for documents in opensearch without an ID in the indexed_object_uris list
+ query = {'query': {'bool': {'must_not': {'terms': {'_id': indexed_object_uris}}}}}
+ # Delete All "Outdated" Objects from Index
+ docs = BaseIndexer.search(query)
+ for doc in docs.get('hits', {}).get('hits', []):
+ log.info(f'Deleting document {doc["_id"]}...')
+ BaseIndexer.delete_doc(doc_id=doc['_id'])
+
+ log.info(f'Deleted {len(docs.get("hits", {}).get("hits", []))} records')
if __name__ == '__main__':
+ load_modules({ImportMode.CATALOG_INDEXER_TASK})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
-
- load_modules({ImportMode.CATALOG_INDEXER_TASK})
- index_objects(engine=ENGINE)
+ with_deletes = os.environ.get('with_deletes', 'False')
+ CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes)
diff --git a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py
index 7cc0884fc..474d6dc5d 100644
--- a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py
+++ b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py
@@ -1,5 +1,7 @@
import logging
+from typing import List
+
from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
from dataall.modules.dashboards.db.dashboard_models import Dashboard
from dataall.modules.dashboards.indexers.dashboard_indexer import DashboardIndexer
@@ -8,11 +10,14 @@
class DashboardCatalogIndexer(CatalogIndexer):
- def index(self, session) -> int:
- all_dashboards: [Dashboard] = session.query(Dashboard).all()
+ def index(self, session) -> List[str]:
+ all_dashboards: List[Dashboard] = session.query(Dashboard).all()
+ all_dashboard_uris = []
+
log.info(f'Found {len(all_dashboards)} dashboards')
dashboard: Dashboard
for dashboard in all_dashboards:
+ all_dashboard_uris.append(dashboard.dashboardUri)
DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri)
- return len(all_dashboards)
+ return all_dashboard_uris
diff --git a/backend/dataall/modules/s3_datasets/__init__.py b/backend/dataall/modules/s3_datasets/__init__.py
index c828e7993..f0b73a6d0 100644
--- a/backend/dataall/modules/s3_datasets/__init__.py
+++ b/backend/dataall/modules/s3_datasets/__init__.py
@@ -90,8 +90,9 @@ def is_supported(modes: Set[ImportMode]):
@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.datasets_base import DatasetBaseModuleInterface
+ from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface
- return [DatasetBaseModuleInterface]
+ return [DatasetBaseModuleInterface, CatalogAsyncHandlersModuleInterface]
def __init__(self):
import dataall.modules.s3_datasets.handlers
diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py
index a93510fad..3e4853bca 100644
--- a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py
+++ b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py
@@ -1,5 +1,6 @@
import logging
+from typing import List
from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.s3_datasets.indexers.location_indexer import DatasetLocationIndexer
from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer
@@ -16,13 +17,18 @@ class DatasetCatalogIndexer(CatalogIndexer):
Register automatically itself when CatalogIndexer instance is created
"""
- def index(self, session) -> int:
- all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session)
+ def index(self, session) -> List[str]:
+ all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session)
+ all_dataset_uris = []
log.info(f'Found {len(all_datasets)} datasets')
- indexed = 0
for dataset in all_datasets:
tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri)
+ all_dataset_uris += [table.tableUri for table in tables]
+
folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
+ all_dataset_uris += [folder.locationUri for folder in folders]
+
DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
- indexed += len(tables) + len(folders)
- return indexed + len(all_datasets)
+ all_dataset_uris.append(dataset.datasetUri)
+
+ return all_dataset_uris
diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py
index e245bcea1..2d2a8c5d5 100644
--- a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py
+++ b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py
@@ -14,14 +14,15 @@ class DatasetIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, dataset_uri: str):
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
- env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
- org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
-
- count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
- count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
- count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')
if dataset:
+ env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
+ org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
+
+ count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
+ count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
+ count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')
+
glossary = BaseIndexer._get_target_glossary_terms(session, dataset_uri)
BaseIndexer._index(
doc_id=dataset_uri,
diff --git a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py
index b5216dacb..82170e8f9 100644
--- a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py
+++ b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py
@@ -6,19 +6,18 @@
from dataall.core.organizations.db.organization_repositories import OrganizationRepository
from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
-from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.catalog.indexers.base_indexer import BaseIndexer
class DatasetLocationIndexer(BaseIndexer):
@classmethod
- def upsert(cls, session, folder_uri: str):
+ def upsert(cls, session, folder_uri: str, dataset=None, env=None, org=None):
folder = DatasetLocationRepository.get_location_by_uri(session, folder_uri)
if folder:
- dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri)
- env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
- org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
+ dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) if not dataset else dataset
+ env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
+ org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, folder_uri)
BaseIndexer._index(
@@ -46,12 +45,16 @@ def upsert(cls, session, folder_uri: str):
'glossary': glossary,
},
)
- DatasetIndexer.upsert(session=session, dataset_uri=folder.datasetUri)
return folder
@classmethod
def upsert_all(cls, session, dataset_uri: str):
folders = DatasetLocationRepository.get_dataset_folders(session, dataset_uri)
+ dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
+ env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
+ org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for folder in folders:
- DatasetLocationIndexer.upsert(session=session, folder_uri=folder.locationUri)
+ DatasetLocationIndexer.upsert(
+ session=session, folder_uri=folder.locationUri, dataset=dataset, env=env, org=org
+ )
return folders
diff --git a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py
index 15b6320c1..cf24c4852 100644
--- a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py
+++ b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py
@@ -12,13 +12,13 @@
class DatasetTableIndexer(BaseIndexer):
@classmethod
- def upsert(cls, session, table_uri: str):
+ def upsert(cls, session, table_uri: str, dataset=None, env=None, org=None):
table = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri)
if table:
- dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri)
- env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
- org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
+ dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) if not dataset else dataset
+ env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
+ org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, table_uri)
tags = table.tags if table.tags else []
@@ -48,14 +48,16 @@ def upsert(cls, session, table_uri: str):
'glossary': glossary,
},
)
- DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri)
return table
@classmethod
def upsert_all(cls, session, dataset_uri: str):
tables = DatasetTableRepository.find_all_active_tables(session, dataset_uri)
+ dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
+ env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
+ org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for table in tables:
- DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri)
+ DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri, dataset=dataset, env=env, org=org)
return tables
@classmethod
diff --git a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py
index 9da13bf61..a4ac2b33f 100644
--- a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py
+++ b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py
@@ -1,3 +1,4 @@
+from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.base.context import get_context
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
@@ -48,6 +49,7 @@ def create_storage_location(uri: str, data: dict):
S3LocationClient(location, dataset).create_bucket_prefix()
DatasetLocationIndexer.upsert(session=session, folder_uri=location.locationUri)
+ DatasetIndexer.upsert(session, dataset.datasetUri)
return location
@staticmethod
@@ -77,6 +79,7 @@ def update_storage_location(uri: str, data: dict):
DatasetLocationService._create_glossary_links(session, location, data['terms'])
DatasetLocationIndexer.upsert(session, folder_uri=location.locationUri)
+ DatasetIndexer.upsert(session, location.datasetUri)
return location
diff --git a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py
index 50b63c61e..804156912 100644
--- a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py
+++ b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py
@@ -9,6 +9,7 @@
from dataall.modules.s3_datasets.aws.glue_dataset_client import DatasetCrawler
from dataall.modules.s3_datasets.db.dataset_table_repositories import DatasetTableRepository
from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer
+from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.s3_datasets.services.dataset_permissions import (
UPDATE_DATASET_TABLE,
MANAGE_DATASETS,
@@ -58,6 +59,7 @@ def update_table(uri: str, table_data: dict = None):
)
DatasetTableIndexer.upsert(session, table_uri=table.tableUri)
+ DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri)
return table
@staticmethod
@@ -115,6 +117,7 @@ def sync_tables_for_dataset(cls, uri):
cls.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables)
DatasetTableIndexer.upsert_all(session=session, dataset_uri=dataset.datasetUri)
DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri)
+ DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
return DatasetRepository.paginated_dataset_tables(
session=session,
uri=uri,
diff --git a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py
index 0fbf52709..e530a9e6a 100644
--- a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py
+++ b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py
@@ -13,6 +13,7 @@
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer
+from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.s3_datasets.services.dataset_alarm_service import DatasetAlarmService
root = logging.getLogger()
@@ -68,6 +69,7 @@ def sync_tables(engine):
processed_tables.extend(tables)
DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
+ DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
except Exception as e:
log.error(
f'Failed to sync tables for dataset '
diff --git a/backend/local_graphql_server.py b/backend/local_graphql_server.py
index e6b48a7c7..9d1ddd714 100644
--- a/backend/local_graphql_server.py
+++ b/backend/local_graphql_server.py
@@ -31,7 +31,7 @@
es = connect(envname=ENVNAME)
logger.info('Connected')
# create_schema_and_tables(engine, envname=ENVNAME)
-load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK})
+load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK, ImportMode.CATALOG_INDEXER_TASK})
Base.metadata.create_all(engine.engine)
CDKPROXY_URL = 'http://cdkproxy:2805' if ENVNAME == 'dkrcompose' else 'http://localhost:2805'
config.set_property('cdk_proxy_url', CDKPROXY_URL)
diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py
index aebd00d7d..a9981b621 100644
--- a/deploy/stacks/container.py
+++ b/deploy/stacks/container.py
@@ -181,10 +181,11 @@ def __init__(
@run_if(['modules.s3_datasets.active', 'modules.dashboards.active'])
def add_catalog_indexer_task(self):
+ container_id = 'container'
catalog_indexer_task, catalog_indexer_task_def = self.set_scheduled_task(
cluster=self.ecs_cluster,
command=['python3.9', '-m', 'dataall.modules.catalog.tasks.catalog_indexer_task'],
- container_id='container',
+ container_id=container_id,
ecr_repository=self._ecr_repository,
environment=self._create_env('INFO'),
image_tag=self._cdkproxy_image_tag,
@@ -198,6 +199,20 @@ def add_catalog_indexer_task(self):
prod_sizing=self._prod_sizing,
)
+ ssm.StringParameter(
+ self,
+ f'CatalogIndexerTaskARNSSM{self._envname}',
+ parameter_name=f'/dataall/{self._envname}/ecs/task_def_arn/catalog_indexer',
+ string_value=catalog_indexer_task_def.task_definition_arn,
+ )
+
+ ssm.StringParameter(
+ self,
+ f'CatalogIndexerTaskContainerSSM{self._envname}',
+ parameter_name=f'/dataall/{self._envname}/ecs/container/catalog_indexer',
+ string_value=container_id,
+ )
+
self.ecs_task_definitions_families.append(catalog_indexer_task.task_definition.family)
@run_if(['modules.s3_datasets.active'])
diff --git a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js
index 2feb3713e..99c022145 100644
--- a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js
+++ b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js
@@ -10,6 +10,7 @@ import {
IconButton,
MenuItem,
TextField,
+ Switch,
Typography
} from '@mui/material';
import React, { useCallback, useEffect, useState } from 'react';
@@ -19,7 +20,8 @@ import { Label } from 'design';
import {
getMaintenanceStatus,
stopMaintenanceWindow,
- startMaintenanceWindow
+ startMaintenanceWindow,
+ startReindexCatalog
} from '../services';
import { useClient } from 'services';
import { SET_ERROR, useDispatch } from 'globalErrors';
@@ -162,12 +164,112 @@ export const MaintenanceConfirmationPopUp = (props) => {
);
};
+export const ReIndexConfirmationPopUp = (props) => {
+ const { popUpReIndex, setPopUpReIndex, setUpdatingReIndex } = props;
+ const client = useClient();
+ const dispatch = useDispatch();
+ const { enqueueSnackbar } = useSnackbar();
+ const [withDelete, setWithDelete] = useState(false);
+
+ const handlePopUpModal = async () => {
+ setUpdatingReIndex(true);
+ if (!client) {
+ dispatch({
+ type: SET_ERROR,
+ error: 'Client not initialized for re-indexing catalog task'
+ });
+ }
+ const response = await client.mutate(
+ startReindexCatalog({ handleDeletes: withDelete })
+ );
+ if (!response.errors && response.data.startReindexCatalog != null) {
+ const respData = response.data.startReindexCatalog;
+ if (respData === true) {
+ enqueueSnackbar('Re Index Task has Started. Please check the status', {
+ anchorOrigin: {
+ horizontal: 'right',
+ vertical: 'top'
+ },
+ variant: 'success'
+ });
+ } else {
+ enqueueSnackbar('Could not start re index task', {
+ anchorOrigin: {
+ horizontal: 'right',
+ vertical: 'top'
+ },
+ variant: 'success'
+ });
+ }
+ } else {
+ const error = response.errors
+ ? response.errors[0].message
+ : 'Something went wrong while starting re index task. Please check gql logs';
+ dispatch({ type: SET_ERROR, error });
+ }
+ setPopUpReIndex(false);
+ setUpdatingReIndex(false);
+ };
+
+ return (
+
+ );
+};
+
export const MaintenanceViewer = () => {
const client = useClient();
const [refreshing, setRefreshing] = useState(false);
+ // const [refreshingReIndex, setRefreshingReIndex] = useState(false);
+ const refreshingReIndex = false;
+ const [updatingReIndex, setUpdatingReIndex] = useState(false);
const [updating, setUpdating] = useState(false);
const [mode, setMode] = useState('');
const [popUp, setPopUp] = useState(false);
+ const [popUpReIndex, setPopUpReIndex] = useState(false);
const [confirmedMode, setConfirmedMode] = useState('');
const [maintenanceButtonText, setMaintenanceButtonText] =
useState(START_MAINTENANCE);
@@ -339,6 +441,33 @@ export const MaintenanceViewer = () => {
return (
+ {refreshingReIndex ? (
+
+ ) : (
+
+
+ Re-Index Data.all Catalog} />
+
+
+ setPopUpReIndex(true)}
+ startIcon={}
+ sx={{ m: 1 }}
+ variant="contained"
+ >
+ Start Re-Index Catalog Task
+
+
+
+
+
+ )}
{refreshing ? (
) : (
diff --git a/frontend/src/modules/Maintenance/services/index.js b/frontend/src/modules/Maintenance/services/index.js
index ca55e15e2..0f057a7e6 100644
--- a/frontend/src/modules/Maintenance/services/index.js
+++ b/frontend/src/modules/Maintenance/services/index.js
@@ -1,3 +1,4 @@
export * from './getMaintenanceStatus';
export * from './stopMaintenanceWindow';
export * from './startMaintenanceWindow';
+export * from './startReindexCatalog';
diff --git a/frontend/src/modules/Maintenance/services/startReindexCatalog.js b/frontend/src/modules/Maintenance/services/startReindexCatalog.js
new file mode 100644
index 000000000..cfde0a568
--- /dev/null
+++ b/frontend/src/modules/Maintenance/services/startReindexCatalog.js
@@ -0,0 +1,10 @@
+import { gql } from 'apollo-boost';
+
+export const startReindexCatalog = ({ handleDeletes }) => ({
+ variables: { handleDeletes },
+ mutation: gql`
+ mutation startReindexCatalog($handleDeletes: Boolean!) {
+ startReindexCatalog(handleDeletes: $handleDeletes)
+ }
+ `
+});
diff --git a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py
index 00f607dae..259a05d1c 100644
--- a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py
+++ b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py
@@ -1,6 +1,6 @@
import pytest
-from dataall.modules.catalog.tasks.catalog_indexer_task import index_objects
+from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
@@ -54,6 +54,6 @@ def test_catalog_indexer(db, org, env, sync_dataset, table, mocker):
mocker.patch(
'dataall.modules.s3_datasets.indexers.dataset_indexer.DatasetIndexer.upsert', return_value=sync_dataset
)
- indexed_objects_counter = index_objects(engine=db)
- # Count should be One table + One Dataset = 2
+ indexed_objects_counter = CatalogIndexerTask.index_objects(engine=db)
+ # Count should be One table + One Dataset = 2
assert indexed_objects_counter == 2