Skip to content

Commit

Permalink
reindex docs feat
Browse files Browse the repository at this point in the history
  • Loading branch information
noah-paige committed Jun 28, 2024
2 parents be6d131 + 4a25243 commit 5d7414d
Show file tree
Hide file tree
Showing 24 changed files with 355 additions and 49 deletions.
11 changes: 11 additions & 0 deletions backend/dataall/modules/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ def __init__(self):
from dataall.modules.catalog import tasks


class CatalogAsyncHandlersModuleInterface(ModuleInterface):
"""Implements ModuleInterface for datapipelines async lambda"""

@staticmethod
def is_supported(modes: Set[ImportMode]):
return ImportMode.HANDLERS in modes

def __init__(self):
import dataall.modules.catalog.handlers


class CatalogApiModuleInterface(ModuleInterface):
"""
Implements ModuleInterface for catalog code in GraphQl lambda.
Expand Down
8 changes: 8 additions & 0 deletions backend/dataall/modules/catalog/api/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
create_term,
approve_term_association,
dismiss_term_association,
start_reindex_catalog,
)


Expand Down Expand Up @@ -107,3 +108,10 @@
resolver=dismiss_term_association,
args=[gql.Argument(name='linkUri', type=gql.NonNullableType(gql.String))],
)

startReindexCatalog = gql.MutationField(
name='startReindexCatalog',
args=[gql.Argument(name='handleDeletes', type=gql.NonNullableType(gql.Boolean))],
type=gql.Boolean,
resolver=start_reindex_catalog,
)
5 changes: 5 additions & 0 deletions backend/dataall/modules/catalog/api/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataall.modules.catalog.api.enums import GlossaryRole
from dataall.modules.catalog.services.glossaries_service import GlossariesService
from dataall.modules.catalog.services.catalog_service import CatalogService
from dataall.base.api.context import Context
from dataall.modules.catalog.db.glossary_models import TermLink, GlossaryNode
from dataall.base.db import exceptions
Expand Down Expand Up @@ -157,3 +158,7 @@ def search_glossary(context: Context, source, filter: dict = None):
if not filter:
filter = {}
return GlossariesService.search_glossary_terms(data=filter)


def start_reindex_catalog(context: Context, source, handleDeletes: bool):
return CatalogService.start_reindex_catalog(with_deletes=handleDeletes)
3 changes: 3 additions & 0 deletions backend/dataall/modules/catalog/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dataall.modules.catalog.handlers import ecs_catalog_handlers

__all__ = ['ecs_catalog_handlers']
27 changes: 27 additions & 0 deletions backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
import os

from dataall.core.tasks.service_handlers import Worker
from dataall.core.stacks.aws.ecs import Ecs
from dataall.core.tasks.db.task_models import Task
from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask

log = logging.getLogger(__name__)


class EcsCatalogIndexHandler:
@staticmethod
@Worker.handler(path='ecs.reindex.catalog')
def run_ecs_reindex_catalog_task(engine, task: Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False)))
else:
ecs_task_arn = Ecs.run_ecs_task(
task_definition_param='ecs/task_def_arn/share_management',
container_name_param='ecs/container/share_management',
context=[
{'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))},
],
)
return {'task_arn': ecs_task_arn}
11 changes: 11 additions & 0 deletions backend/dataall/modules/catalog/indexers/base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def _index(cls, doc_id, doc):
log.error(f'ES config is missing doc {doc} for id {doc_id} was not indexed')
return False

@classmethod
def search(cls, query):
es = cls.es()
if es:
res = es.search(index=cls._INDEX, body=query)
log.info(f'Search query {query} returned {res["hits"]["total"]["value"]} records')
return res
else:
log.error(f'ES config is missing, search query {query} failed')
return {}

@staticmethod
def _get_target_glossary_terms(session, target_uri):
q = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def __init__(self):
def all():
return CatalogIndexer._INDEXERS

def index(self, session) -> int:
def index(self, session) -> List[str]:
raise NotImplementedError('index is not implemented')
37 changes: 37 additions & 0 deletions backend/dataall/modules/catalog/services/catalog_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging

from dataall.base.context import get_context
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService

from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository
from dataall.modules.catalog.db.glossary_models import GlossaryNode
from dataall.modules.catalog.services.glossaries_permissions import MANAGE_GLOSSARIES
from dataall.modules.catalog.indexers.registry import GlossaryRegistry
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService
from dataall.core.tasks.db.task_models import Task
from dataall.core.tasks.service_handlers import Worker


logger = logging.getLogger(__name__)


def _session():
return get_context().db_engine.scoped_session()


class CatalogService:
@staticmethod
def start_reindex_catalog(with_deletes: bool) -> bool:
context = get_context()
groups = context.groups if context.groups is not None else []
if not TenantPolicyValidationService.is_tenant_admin(groups):
raise Exception('Only data.all admin group members can start re-index catalog task')

with context.db_engine.scoped_session() as session:
reindex_catalog_task: Task = Task(
action='ecs.reindex.catalog', targetUri='ALL', payload={'with_deletes': with_deletes}
)
session.add(reindex_catalog_task)

Worker.queue(engine=context.db_engine, task_ids=[reindex_catalog_task.taskUri])
return True
51 changes: 37 additions & 14 deletions backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os
import sys
from typing import List

from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
from dataall.modules.catalog.indexers.base_indexer import BaseIndexer
from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.base.utils.alarm_service import AlarmService
Expand All @@ -14,23 +16,44 @@
log = logging.getLogger(__name__)


def index_objects(engine):
try:
indexed_objects_counter = 0
with engine.scoped_session() as session:
for indexer in CatalogIndexer.all():
indexed_objects_counter += indexer.index(session)
class CatalogIndexerTask:
"""
This class is responsible for indexing objects in the catalog.
"""

log.info(f'Successfully indexed {indexed_objects_counter} objects')
return indexed_objects_counter
except Exception as e:
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
raise e
@classmethod
def index_objects(cls, engine, with_deletes='False'):
try:
indexed_object_uris = []
with engine.scoped_session() as session:
for indexer in CatalogIndexer.all():
indexed_object_uris += indexer.index(session)

log.info(f'Successfully indexed {len(indexed_object_uris)} objects')

if with_deletes == 'True':
CatalogIndexerTask._delete_old_objects(indexed_object_uris)
return len(indexed_object_uris)
except Exception as e:
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
raise e

@classmethod
def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None:
# Search for documents in opensearch without an ID in the indexed_object_uris list
query = {'query': {'bool': {'must_not': {'terms': {'_id': indexed_object_uris}}}}}
# Delete All "Outdated" Objects from Index
docs = BaseIndexer.search(query)
for doc in docs.get('hits', {}).get('hits', []):
log.info(f'Deleting document {doc["_id"]}...')
BaseIndexer.delete_doc(doc_id=doc['_id'])

log.info(f'Deleted {len(docs.get("hits", {}).get("hits", []))} records')


if __name__ == '__main__':
load_modules({ImportMode.CATALOG_INDEXER_TASK})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)

load_modules({ImportMode.CATALOG_INDEXER_TASK})
index_objects(engine=ENGINE)
with_deletes = os.environ.get('with_deletes', 'False')
CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from typing import List

from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer
from dataall.modules.dashboards.db.dashboard_models import Dashboard
from dataall.modules.dashboards.indexers.dashboard_indexer import DashboardIndexer
Expand All @@ -8,11 +10,14 @@


class DashboardCatalogIndexer(CatalogIndexer):
def index(self, session) -> int:
all_dashboards: [Dashboard] = session.query(Dashboard).all()
def index(self, session) -> List[str]:
all_dashboards: List[Dashboard] = session.query(Dashboard).all()
all_dashboard_uris = []

log.info(f'Found {len(all_dashboards)} dashboards')
dashboard: Dashboard
for dashboard in all_dashboards:
all_dashboard_uris.append(dashboard.dashboardUri)
DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri)

return len(all_dashboards)
return all_dashboard_uris
3 changes: 2 additions & 1 deletion backend/dataall/modules/s3_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ def is_supported(modes: Set[ImportMode]):
@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.datasets_base import DatasetBaseModuleInterface
from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface

return [DatasetBaseModuleInterface]
return [DatasetBaseModuleInterface, CatalogAsyncHandlersModuleInterface]

def __init__(self):
import dataall.modules.s3_datasets.handlers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from typing import List
from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.s3_datasets.indexers.location_indexer import DatasetLocationIndexer
from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer
Expand All @@ -16,13 +17,18 @@ class DatasetCatalogIndexer(CatalogIndexer):
Register automatically itself when CatalogIndexer instance is created
"""

def index(self, session) -> int:
all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session)
def index(self, session) -> List[str]:
all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session)
all_dataset_uris = []
log.info(f'Found {len(all_datasets)} datasets')
indexed = 0
for dataset in all_datasets:
tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri)
all_dataset_uris += [table.tableUri for table in tables]

folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
all_dataset_uris += [folder.locationUri for folder in folders]

DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
indexed += len(tables) + len(folders)
return indexed + len(all_datasets)
all_dataset_uris.append(dataset.datasetUri)

return all_dataset_uris
13 changes: 7 additions & 6 deletions backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ class DatasetIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, dataset_uri: str):
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)

count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')

if dataset:
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)

count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri)
count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri)
count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset')

glossary = BaseIndexer._get_target_glossary_terms(session, dataset_uri)
BaseIndexer._index(
doc_id=dataset_uri,
Expand Down
17 changes: 10 additions & 7 deletions backend/dataall/modules/s3_datasets/indexers/location_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
from dataall.core.organizations.db.organization_repositories import OrganizationRepository
from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer
from dataall.modules.catalog.indexers.base_indexer import BaseIndexer


class DatasetLocationIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, folder_uri: str):
def upsert(cls, session, folder_uri: str, dataset=None, env=None, org=None):
folder = DatasetLocationRepository.get_location_by_uri(session, folder_uri)

if folder:
dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) if not dataset else dataset
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, folder_uri)

BaseIndexer._index(
Expand Down Expand Up @@ -46,12 +45,16 @@ def upsert(cls, session, folder_uri: str):
'glossary': glossary,
},
)
DatasetIndexer.upsert(session=session, dataset_uri=folder.datasetUri)
return folder

@classmethod
def upsert_all(cls, session, dataset_uri: str):
folders = DatasetLocationRepository.get_dataset_folders(session, dataset_uri)
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for folder in folders:
DatasetLocationIndexer.upsert(session=session, folder_uri=folder.locationUri)
DatasetLocationIndexer.upsert(
session=session, folder_uri=folder.locationUri, dataset=dataset, env=env, org=org
)
return folders
14 changes: 8 additions & 6 deletions backend/dataall/modules/s3_datasets/indexers/table_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

class DatasetTableIndexer(BaseIndexer):
@classmethod
def upsert(cls, session, table_uri: str):
def upsert(cls, session, table_uri: str, dataset=None, env=None, org=None):
table = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri)

if table:
dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) if not dataset else dataset
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org
glossary = BaseIndexer._get_target_glossary_terms(session, table_uri)

tags = table.tags if table.tags else []
Expand Down Expand Up @@ -48,14 +48,16 @@ def upsert(cls, session, table_uri: str):
'glossary': glossary,
},
)
DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri)
return table

@classmethod
def upsert_all(cls, session, dataset_uri: str):
tables = DatasetTableRepository.find_all_active_tables(session, dataset_uri)
dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri)
for table in tables:
DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri)
DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri, dataset=dataset, env=env, org=org)
return tables

@classmethod
Expand Down
Loading

0 comments on commit 5d7414d

Please sign in to comment.