Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete docs not found when re indexing in catalog task #1365

Merged
merged 12 commits into from
Jul 1, 2024
11 changes: 11 additions & 0 deletions backend/dataall/modules/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ def __init__(self):
from dataall.modules.catalog import tasks


class CatalogAsyncHandlersModuleInterface(ModuleInterface):
"""Implements ModuleInterface for catalog async lambda"""

@staticmethod
def is_supported(modes: Set[ImportMode]):
return ImportMode.HANDLERS in modes

def __init__(self):
import dataall.modules.catalog.handlers


class CatalogApiModuleInterface(ModuleInterface):
"""
Implements ModuleInterface for catalog code in GraphQl lambda.
Expand Down
8 changes: 8 additions & 0 deletions backend/dataall/modules/catalog/api/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
create_term,
approve_term_association,
dismiss_term_association,
start_reindex_catalog,
)


Expand Down Expand Up @@ -107,3 +108,10 @@
resolver=dismiss_term_association,
args=[gql.Argument(name='linkUri', type=gql.NonNullableType(gql.String))],
)

startReindexCatalog = gql.MutationField(
name='startReindexCatalog',
args=[gql.Argument(name='handleDeletes', type=gql.NonNullableType(gql.Boolean))],
type=gql.Boolean,
resolver=start_reindex_catalog,
)
5 changes: 5 additions & 0 deletions backend/dataall/modules/catalog/api/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataall.modules.catalog.api.enums import GlossaryRole
from dataall.modules.catalog.services.glossaries_service import GlossariesService
from dataall.modules.catalog.services.catalog_service import CatalogService
from dataall.base.api.context import Context
from dataall.modules.catalog.db.glossary_models import TermLink, GlossaryNode
from dataall.base.db import exceptions
Expand Down Expand Up @@ -157,3 +158,7 @@ def search_glossary(context: Context, source, filter: dict = None):
if not filter:
filter = {}
return GlossariesService.search_glossary_terms(data=filter)


def start_reindex_catalog(context: Context, source, handleDeletes: bool):
return CatalogService.start_reindex_catalog(with_deletes=handleDeletes)
3 changes: 3 additions & 0 deletions backend/dataall/modules/catalog/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dataall.modules.catalog.handlers import ecs_catalog_handlers

__all__ = ['ecs_catalog_handlers']
27 changes: 27 additions & 0 deletions backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
import os

from dataall.core.tasks.service_handlers import Worker
from dataall.core.stacks.aws.ecs import Ecs
from dataall.core.tasks.db.task_models import Task
from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask

log = logging.getLogger(__name__)


class EcsCatalogIndexHandler:
@staticmethod
@Worker.handler(path='ecs.reindex.catalog')
def run_ecs_reindex_catalog_task(engine, task: Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False)))
else:
ecs_task_arn = Ecs.run_ecs_task(
task_definition_param='ecs/task_def_arn/catalog_indexer',
container_name_param='ecs/container/catalog_indexer',
context=[
{'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))},
],
)
return {'task_arn': ecs_task_arn}
11 changes: 11 additions & 0 deletions backend/dataall/modules/catalog/indexers/base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def _index(cls, doc_id, doc):
log.error(f'ES config is missing doc {doc} for id {doc_id} was not indexed')
return False

@classmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the search method could replace the run_query from backend/dataall/base/searchproxy/search.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can but has larger implications / leaving it for now to keep scope of this PR targeted

def search(cls, query):
es = cls.es()
if es:
res = es.search(index=cls._INDEX, body=query)
log.info(f'Search query {query} returned {res["hits"]["total"]["value"]} records')
return res
else:
log.error(f'ES config is missing, search query {query} failed')
return {}

@staticmethod
def _get_target_glossary_terms(session, target_uri):
q = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def __init__(self):
def all():
return CatalogIndexer._INDEXERS

def index(self, session) -> int:
def index(self, session) -> List[str]:
raise NotImplementedError('index is not implemented')
28 changes: 28 additions & 0 deletions backend/dataall/modules/catalog/services/catalog_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

from dataall.base.context import get_context

from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService
from dataall.core.tasks.db.task_models import Task
from dataall.core.tasks.service_handlers import Worker


logger = logging.getLogger(__name__)


class CatalogService:
@staticmethod
def start_reindex_catalog(with_deletes: bool) -> bool:
context = get_context()
groups = context.groups if context.groups is not None else []
if not TenantPolicyValidationService.is_tenant_admin(groups):
raise Exception('Only data.all admin group members can start re-index catalog task')

with context.db_engine.scoped_session() as session:
reindex_catalog_task: Task = Task(
action='ecs.reindex.catalog', targetUri='ALL', payload={'with_deletes': with_deletes}
)
session.add(reindex_catalog_task)

Worker.queue(engine=context.db_engine, task_ids=[reindex_catalog_task.taskUri])
return True
51 changes: 37 additions & 14 deletions backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os
import sys
from typing import List

from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
from dataall.modules.catalog.indexers.base_indexer import BaseIndexer
from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.base.utils.alarm_service import AlarmService
Expand All @@ -14,23 +16,44 @@
log = logging.getLogger(__name__)


def index_objects(engine):
try:
indexed_objects_counter = 0
with engine.scoped_session() as session:
for indexer in CatalogIndexer.all():
indexed_objects_counter += indexer.index(session)
class CatalogIndexerTask:
"""
This class is responsible for indexing objects in the catalog.
"""

log.info(f'Successfully indexed {indexed_objects_counter} objects')
return indexed_objects_counter
except Exception as e:
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
raise e
@classmethod
def index_objects(cls, engine, with_deletes='False'):
try:
indexed_object_uris = []
with engine.scoped_session() as session:
for indexer in CatalogIndexer.all():
indexed_object_uris += indexer.index(session)

log.info(f'Successfully indexed {len(indexed_object_uris)} objects')

if with_deletes == 'True':
CatalogIndexerTask._delete_old_objects(indexed_object_uris)
return len(indexed_object_uris)
except Exception as e:
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
raise e

@classmethod
def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None:
# Search for documents in opensearch without an ID in the indexed_object_uris list
query = {'query': {'bool': {'must_not': {'terms': {'_id': indexed_object_uris}}}}}
# Delete All "Outdated" Objects from Index
docs = BaseIndexer.search(query)
for doc in docs.get('hits', {}).get('hits', []):
log.info(f'Deleting document {doc["_id"]}...')
BaseIndexer.delete_doc(doc_id=doc['_id'])

log.info(f'Deleted {len(docs.get("hits", {}).get("hits", []))} records')


if __name__ == '__main__':
load_modules({ImportMode.CATALOG_INDEXER_TASK})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)

load_modules({ImportMode.CATALOG_INDEXER_TASK})
index_objects(engine=ENGINE)
with_deletes = os.environ.get('with_deletes', 'False')
CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes)
18 changes: 18 additions & 0 deletions backend/dataall/modules/dashboards/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,21 @@ def __init__(self):

DashboardCatalogIndexer()
log.info('Dashboard catalog indexer task has been loaded')


class DashboardAsyncHandlersModuleInterface(ModuleInterface):
"""Implements ModuleInterface for dashboard async lambda"""

@staticmethod
def is_supported(modes: Set[ImportMode]):
return ImportMode.HANDLERS in modes

@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface

return [CatalogAsyncHandlersModuleInterface]

def __init__(self):
pass
log.info('S3 Dataset handlers have been imported')
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from typing import List

from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
from dataall.modules.dashboards.db.dashboard_models import Dashboard
from dataall.modules.dashboards.indexers.dashboard_indexer import DashboardIndexer
Expand All @@ -8,11 +10,14 @@


class DashboardCatalogIndexer(CatalogIndexer):
def index(self, session) -> int:
all_dashboards: [Dashboard] = session.query(Dashboard).all()
def index(self, session) -> List[str]:
all_dashboards: List[Dashboard] = session.query(Dashboard).all()
all_dashboard_uris = []

log.info(f'Found {len(all_dashboards)} dashboards')
dashboard: Dashboard
for dashboard in all_dashboards:
all_dashboard_uris.append(dashboard.dashboardUri)
DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri)

return len(all_dashboards)
return all_dashboard_uris
3 changes: 2 additions & 1 deletion backend/dataall/modules/s3_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ def is_supported(modes: Set[ImportMode]):
@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.datasets_base import DatasetBaseModuleInterface
from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface

return [DatasetBaseModuleInterface]
return [DatasetBaseModuleInterface, CatalogAsyncHandlersModuleInterface]

def __init__(self):
import dataall.modules.s3_datasets.handlers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from typing import List
from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.s3_datasets.indexers.location_indexer import DatasetLocationIndexer
from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer
Expand All @@ -16,13 +17,18 @@ class DatasetCatalogIndexer(CatalogIndexer):
Register automatically itself when CatalogIndexer instance is created
"""

def index(self, session) -> int:
all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session)
def index(self, session) -> List[str]:
all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session)
all_dataset_uris = []
log.info(f'Found {len(all_datasets)} datasets')
indexed = 0
for dataset in all_datasets:
tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri)
all_dataset_uris += [table.tableUri for table in tables]

folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
all_dataset_uris += [folder.locationUri for folder in folders]

DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
indexed += len(tables) + len(folders) + 1
return indexed
all_dataset_uris.append(dataset.datasetUri)

return all_dataset_uris
13 changes: 7 additions & 6 deletions backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ class DatasetIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, dataset_uri: str):
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)

count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')

if dataset:
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)

count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')

glossary = BaseIndexer._get_target_glossary_terms(session, dataset_uri)
BaseIndexer._index(
doc_id=dataset_uri,
Expand Down
17 changes: 10 additions & 7 deletions backend/dataall/modules/s3_datasets/indexers/location_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
from dataall.core.organizations.db.organization_repositories import OrganizationRepository
from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.catalog.indexers.base_indexer import BaseIndexer


class DatasetLocationIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, folder_uri: str):
def upsert(cls, session, folder_uri: str, dataset=None, env=None, org=None):
folder = DatasetLocationRepository.get_location_by_uri(session, folder_uri)

if folder:
dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) if not dataset else dataset
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, folder_uri)

BaseIndexer._index(
Expand Down Expand Up @@ -46,12 +45,16 @@ def upsert(cls, session, folder_uri: str):
'glossary': glossary,
},
)
DatasetIndexer.upsert(session=session, dataset_uri=folder.datasetUri)
return folder

@classmethod
def upsert_all(cls, session, dataset_uri: str):
folders = DatasetLocationRepository.get_dataset_folders(session, dataset_uri)
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for folder in folders:
DatasetLocationIndexer.upsert(session=session, folder_uri=folder.locationUri)
DatasetLocationIndexer.upsert(
session=session, folder_uri=folder.locationUri, dataset=dataset, env=env, org=org
)
return folders
14 changes: 8 additions & 6 deletions backend/dataall/modules/s3_datasets/indexers/table_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

class DatasetTableIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, table_uri: str):
def upsert(cls, session, table_uri: str, dataset=None, env=None, org=None):
table = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri)

if table:
dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) if not dataset else dataset
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, table_uri)

tags = table.tags if table.tags else []
Expand Down Expand Up @@ -48,14 +48,16 @@ def upsert(cls, session, table_uri: str):
'glossary': glossary,
},
)
DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri)
return table

@classmethod
def upsert_all(cls, session, dataset_uri: str):
tables = DatasetTableRepository.find_all_active_tables(session, dataset_uri)
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for table in tables:
DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri)
DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri, dataset=dataset, env=env, org=org)
return tables

@classmethod
Expand Down
Loading
Loading