From ecaaf6e7ede1ac1ca88f33b09f3ab56ea20b588a Mon Sep 17 00:00:00 2001 From: nikpodsh <124577300+nikpodsh@users.noreply.github.com> Date: Thu, 4 May 2023 14:40:49 +0200 Subject: [PATCH] Datasets modularization pt.4 (#441) ### Feature or Bugfix - Refactoring ### Detail Refactoring of DatasetTable: Get rid of ElasticSearch connection for every request. Created a lazy way to establish connection. ### Relates #412 and #295 By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Co-authored-by: dbalintx <132444646+dbalintx@users.noreply.github.com> --- backend/api_handler.py | 6 +-- .../api/Objects/Dashboard/resolvers.py | 2 +- .../dataall/api/Objects/Dataset/resolvers.py | 10 ++-- backend/dataall/api/Objects/Feed/registry.py | 1 - .../dataall/api/Objects/Glossary/registry.py | 8 ++- .../dataall/api/Objects/Glossary/resolvers.py | 2 +- .../api/Objects/ShareObject/resolvers.py | 4 +- backend/dataall/api/Objects/Vote/resolvers.py | 4 +- backend/dataall/api/Objects/__init__.py | 1 - backend/dataall/api/context.py | 2 - backend/dataall/aws/handlers/glue.py | 3 +- backend/dataall/aws/handlers/redshift.py | 3 +- backend/dataall/core/context.py | 2 - backend/dataall/db/api/dataset.py | 33 ++++++------ backend/dataall/db/api/share_object.py | 26 +++++----- backend/dataall/db/models/DatasetTable.py | 32 ------------ backend/dataall/db/models/__init__.py | 1 - backend/dataall/modules/datasets/__init__.py | 5 +- .../dataall/modules/datasets/api/__init__.py | 5 +- .../api/storage_location/resolvers.py | 2 +- .../datasets/api/table}/__init__.py | 2 +- .../datasets/api/table}/input_types.py | 4 +- .../datasets/api/table}/mutations.py | 11 ++-- .../datasets/api/table}/queries.py | 12 +++-- .../datasets/api/table}/resolvers.py | 36 ++++++------- .../datasets/api/table}/schema.py | 12 +++-- .../datasets/api/table_column/resolvers.py | 8 +-- backend/dataall/modules/datasets/db/models.py | 30 ++++++++++- .../datasets/handlers/glue_column_handler.py | 12 ++--- .../datasets/indexers/table_indexer.py | 47 +++++++++++------ .../services/dataset_profiling_service.py | 28 +++++------ .../services/dataset_share_service.py | 18 +++---- .../datasets/services/dataset_table.py | 50 +++++++++---------- .../datasets/tasks/subscription_service.py | 10 ++-- .../modules/datasets/tasks/tables_syncer.py | 15 +++--- backend/dataall/searchproxy/base_indexer.py | 6 +++ backend/dataall/searchproxy/indexers.py | 23 --------- .../dataall/tasks/bucket_policy_updater.py | 20 ++++---- backend/dataall/tasks/catalog_indexer.py | 17 ++----- .../share_managers/lf_share_manager.py | 23 +++++---- .../lf_process_cross_account_share.py | 5 +- .../lf_process_same_account_share.py | 7 +-- backend/dataall/utils/alarm_service.py | 5 +- backend/local_graphql_server.py | 3 +- ...215e_backfill_dataset_table_permissions.py | 5 +- tests/api/conftest.py | 10 ++-- tests/api/test_dataset.py | 4 +- tests/api/test_dataset_profiling.py | 12 ++--- tests/api/test_dataset_table.py | 34 ++++++------- tests/api/test_glossary.py | 6 +-- tests/api/test_share.py | 11 ++-- tests/cdkproxy/conftest.py | 5 +- tests/searchproxy/test_indexers.py | 5 +- tests/tasks/conftest.py | 8 +-- tests/tasks/test_catalog_indexer.py | 3 +- tests/tasks/test_lf_share_manager.py | 35 ++++++------- tests/tasks/test_policies.py | 3 +- tests/tasks/test_subscriptions.py | 3 +- tests/tasks/test_tables_sync.py | 9 ++-- 59 files changed, 350 insertions(+), 359 deletions(-) delete mode 100644 backend/dataall/db/models/DatasetTable.py rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/__init__.py (75%) rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/input_types.py (93%) rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/mutations.py (82%) rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/queries.py (81%) rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/resolvers.py (88%) rename backend/dataall/{api/Objects/DatasetTable => modules/datasets/api/table}/schema.py (94%) diff --git a/backend/api_handler.py b/backend/api_handler.py index 890235347..714e107b2 100644 --- a/backend/api_handler.py +++ b/backend/api_handler.py @@ -15,7 +15,6 @@ from dataall.core.context import set_context, dispose_context, RequestContext from dataall.db import init_permissions, get_engine, api, permissions from dataall.modules.loader import load_modules, ImportMode -from dataall.searchproxy import connect logger = logging.getLogger() logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) @@ -30,7 +29,6 @@ TYPE_DEFS = gql(SCHEMA.gql(with_directives=False)) ENVNAME = os.getenv('envname', 'local') ENGINE = get_engine(envname=ENVNAME) -ES = connect(envname=ENVNAME) Worker.queue = SqsQueue.send init_permissions(ENGINE) @@ -99,7 +97,6 @@ def handler(event, context): log.info('Lambda Event %s', event) log.debug('Env name %s', ENVNAME) - log.debug('ElasticSearch %s', ES) log.debug('Engine %s', ENGINE.engine.url) if event['httpMethod'] == 'OPTIONS': @@ -137,11 +134,10 @@ def handler(event, context): print(f'Error managing groups due to: {e}') groups = [] - set_context(RequestContext(ENGINE, username, groups, ES)) + set_context(RequestContext(ENGINE, username, groups)) app_context = { 'engine': ENGINE, - 'es': ES, 'username': username, 'groups': groups, 'schema': SCHEMA, diff --git a/backend/dataall/api/Objects/Dashboard/resolvers.py b/backend/dataall/api/Objects/Dashboard/resolvers.py index 84a2a1bcc..94372f5d1 100644 --- a/backend/dataall/api/Objects/Dashboard/resolvers.py +++ b/backend/dataall/api/Objects/Dashboard/resolvers.py @@ -311,7 +311,7 @@ def delete_dashboard(context: Context, source, dashboardUri: str = None): data=None, check_perm=True, ) - indexers.delete_doc(es=context.es, doc_id=dashboardUri) + DashboardIndexer.delete_doc(doc_id=dashboardUri) return True diff --git a/backend/dataall/api/Objects/Dataset/resolvers.py b/backend/dataall/api/Objects/Dataset/resolvers.py index f01a376dc..a1d16cfae 100644 --- a/backend/dataall/api/Objects/Dataset/resolvers.py +++ b/backend/dataall/api/Objects/Dataset/resolvers.py @@ -345,9 +345,7 @@ def sync_tables(context: Context, source, datasetUri: str = None): indexers.upsert_dataset_tables( session=session, es=context.es, datasetUri=dataset.datasetUri ) - indexers.remove_deleted_tables( - session=session, es=context.es, datasetUri=dataset.datasetUri - ) + DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri) return Dataset.paginated_dataset_tables( session=session, username=context.username, @@ -574,13 +572,13 @@ def delete_dataset( tables = [t.tableUri for t in Dataset.get_dataset_tables(session, datasetUri)] for uri in tables: - indexers.delete_doc(es=context.es, doc_id=uri) + DatasetIndexer.delete_doc(doc_id=uri) folders = [f.locationUri for f in Dataset.get_dataset_folders(session, datasetUri)] for uri in folders: - indexers.delete_doc(es=context.es, doc_id=uri) + DatasetIndexer.delete_doc(doc_id=uri) - indexers.delete_doc(es=context.es, doc_id=datasetUri) + DatasetIndexer.delete_doc(doc_id=datasetUri) Dataset.delete_dataset( session=session, diff --git a/backend/dataall/api/Objects/Feed/registry.py b/backend/dataall/api/Objects/Feed/registry.py index 6a01a488a..4fedd252a 100644 --- a/backend/dataall/api/Objects/Feed/registry.py +++ b/backend/dataall/api/Objects/Feed/registry.py @@ -38,5 +38,4 @@ def types(cls): FeedRegistry.register(FeedDefinition("Worksheet", models.Worksheet)) FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline)) -FeedRegistry.register(FeedDefinition("DatasetTable", models.DatasetTable)) FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard)) diff --git a/backend/dataall/api/Objects/Glossary/registry.py b/backend/dataall/api/Objects/Glossary/registry.py index fb7e6edf7..494d5d502 100644 --- a/backend/dataall/api/Objects/Glossary/registry.py +++ b/backend/dataall/api/Objects/Glossary/registry.py @@ -1,7 +1,5 @@ -from dataclasses import dataclass, field -from typing import Type, Dict, Optional, Protocol, Union, Callable, Any - -from opensearchpy import OpenSearch +from dataclasses import dataclass +from typing import Type, Dict, Optional, Protocol, Union from dataall.api import gql from dataall.api.gql.graphql_union_type import UnionTypeRegistry @@ -56,7 +54,7 @@ def types(cls): return [gql.Ref(definition.object_type) for definition in cls._DEFINITIONS.values()] @classmethod - def reindex(cls, session, es: OpenSearch, target_type: str, target_uri: str): + def reindex(cls, session, target_type: str, target_uri: str): definition = cls._DEFINITIONS[target_type] if definition.reindexer: definition.reindexer.upsert(session, target_uri) diff --git a/backend/dataall/api/Objects/Glossary/resolvers.py b/backend/dataall/api/Objects/Glossary/resolvers.py index fdc4c3eea..42fae88ce 100644 --- a/backend/dataall/api/Objects/Glossary/resolvers.py +++ b/backend/dataall/api/Objects/Glossary/resolvers.py @@ -458,7 +458,7 @@ def reindex(context, linkUri): if not link: return - GlossaryRegistry.reindex(session, context.es, link.targetType, link.targetUri) + GlossaryRegistry.reindex(session, link.targetType, link.targetUri) def _target_model(target_type: str): diff --git a/backend/dataall/api/Objects/ShareObject/resolvers.py b/backend/dataall/api/Objects/ShareObject/resolvers.py index 16e4e1353..49f20fc17 100644 --- a/backend/dataall/api/Objects/ShareObject/resolvers.py +++ b/backend/dataall/api/Objects/ShareObject/resolvers.py @@ -7,7 +7,7 @@ from ....api.context import Context from ....aws.handlers.service_handlers import Worker from ....db import models -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable log = logging.getLogger(__name__) @@ -265,7 +265,7 @@ def resolve_dataset(context: Context, source: models.ShareObject, **kwargs): def union_resolver(object, *_): - if isinstance(object, models.DatasetTable): + if isinstance(object, DatasetTable): return 'DatasetTable' elif isinstance(object, DatasetStorageLocation): return 'DatasetStorageLocation' diff --git a/backend/dataall/api/Objects/Vote/resolvers.py b/backend/dataall/api/Objects/Vote/resolvers.py index 42f5c20f5..d9f739872 100644 --- a/backend/dataall/api/Objects/Vote/resolvers.py +++ b/backend/dataall/api/Objects/Vote/resolvers.py @@ -28,11 +28,11 @@ def upvote(context: Context, source, input=None): data=input, check_perm=True, ) - reindex(session, context.es, vote) + reindex(session, vote) return vote -def reindex(session, es, vote): +def reindex(session, vote): if vote.targetType == 'dataset': DatasetIndexer.upsert(session=session, dataset_uri=vote.targetUri) elif vote.targetType == 'dashboard': diff --git a/backend/dataall/api/Objects/__init__.py b/backend/dataall/api/Objects/__init__.py index 7c064fb1f..5cc73fbdf 100644 --- a/backend/dataall/api/Objects/__init__.py +++ b/backend/dataall/api/Objects/__init__.py @@ -17,7 +17,6 @@ DataPipeline, Environment, Activity, - DatasetTable, Dataset, Group, Principal, diff --git a/backend/dataall/api/context.py b/backend/dataall/api/context.py index a210dc0a1..238627a81 100644 --- a/backend/dataall/api/context.py +++ b/backend/dataall/api/context.py @@ -2,11 +2,9 @@ class Context: def __init__( self, engine=None, - es=None, username=None, groups=None, ): self.engine = engine - self.es = es self.username = username self.groups = groups diff --git a/backend/dataall/aws/handlers/glue.py b/backend/dataall/aws/handlers/glue.py index 86f39161f..1a3a4e123 100644 --- a/backend/dataall/aws/handlers/glue.py +++ b/backend/dataall/aws/handlers/glue.py @@ -6,6 +6,7 @@ from .sts import SessionHelper from ... import db from ...db import models +from dataall.modules.datasets.db.models import DatasetTable log = logging.getLogger('aws:glue') @@ -523,7 +524,7 @@ def get_job_runs(engine, task: models.Task): @staticmethod def grant_principals_all_table_permissions( - table: models.DatasetTable, principals: [str], client=None + table: DatasetTable, principals: [str], client=None ): """ Update the table permissions on Lake Formation diff --git a/backend/dataall/aws/handlers/redshift.py b/backend/dataall/aws/handlers/redshift.py index c186d5df7..1fe6c738c 100644 --- a/backend/dataall/aws/handlers/redshift.py +++ b/backend/dataall/aws/handlers/redshift.py @@ -11,6 +11,7 @@ from ...db import models # TODO should be migrated in the redshift module from dataall.modules.datasets.services.dataset_table import DatasetTableService +from dataall.modules.datasets.db.models import DatasetTable log = logging.getLogger(__name__) @@ -448,7 +449,7 @@ def copy_data(engine, task: models.Task): session, task.payload['datasetUri'] ) - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, task.payload['tableUri'] ) diff --git a/backend/dataall/core/context.py b/backend/dataall/core/context.py index dcf594896..a6cc2d4ba 100644 --- a/backend/dataall/core/context.py +++ b/backend/dataall/core/context.py @@ -12,7 +12,6 @@ from dataall.db.connection import Engine from threading import local -import opensearchpy _request_storage = local() @@ -24,7 +23,6 @@ class RequestContext: db_engine: Engine username: str groups: List[str] - es_engine: opensearchpy.OpenSearch def get_context() -> RequestContext: diff --git a/backend/dataall/db/api/dataset.py b/backend/dataall/db/api/dataset.py index cd2c6ff6f..8e822d2af 100644 --- a/backend/dataall/db/api/dataset.py +++ b/backend/dataall/db/api/dataset.py @@ -16,9 +16,10 @@ from . import Organization from .. import models, api, exceptions, permissions, paginate from ..models.Enums import Language, ConfidentialityClassification -from ...modules.datasets.db.dataset_repository import DatasetRepository -from ...modules.datasets.services.dataset_location import DatasetLocationService -from ...utils.naming_convention import ( +from dataall.modules.datasets.db.dataset_repository import DatasetRepository +from dataall.modules.datasets.db.models import DatasetTable +from dataall.modules.datasets.services.dataset_location import DatasetLocationService +from dataall.utils.naming_convention import ( NamingConventionService, NamingConventionPattern, ) @@ -266,21 +267,21 @@ def paginated_dataset_tables( session, username, groups, uri, data=None, check_perm=None ) -> dict: query = ( - session.query(models.DatasetTable) + session.query(DatasetTable) .filter( and_( - models.DatasetTable.datasetUri == uri, - models.DatasetTable.LastGlueTableStatus != 'Deleted', + DatasetTable.datasetUri == uri, + DatasetTable.LastGlueTableStatus != 'Deleted', ) ) - .order_by(models.DatasetTable.created.desc()) + .order_by(DatasetTable.created.desc()) ) if data and data.get('term'): query = query.filter( or_( *[ - models.DatasetTable.name.ilike('%' + data.get('term') + '%'), - models.DatasetTable.GlueTableName.ilike( + DatasetTable.name.ilike('%' + data.get('term') + '%'), + DatasetTable.GlueTableName.ilike( '%' + data.get('term') + '%' ), ] @@ -379,7 +380,7 @@ def transfer_stewardship_to_new_stewards(session, dataset, new_stewards): group=new_stewards, permissions=permissions.DATASET_TABLE_READ, resource_uri=tableUri, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) dataset_shares = ( @@ -455,8 +456,8 @@ def update_glue_database_status(session, dataset_uri): def get_dataset_tables(session, dataset_uri): """return the dataset tables""" return ( - session.query(models.DatasetTable) - .filter(models.DatasetTable.datasetUri == dataset_uri) + session.query(DatasetTable) + .filter(DatasetTable.datasetUri == dataset_uri) .all() ) @@ -585,10 +586,10 @@ def _delete_dataset_term_links(session, uri): @staticmethod def _delete_dataset_tables(session, dataset_uri) -> bool: tables = ( - session.query(models.DatasetTable) + session.query(DatasetTable) .filter( and_( - models.DatasetTable.datasetUri == dataset_uri, + DatasetTable.datasetUri == dataset_uri, ) ) .all() @@ -618,7 +619,7 @@ def get_dataset_by_bucket_name(session, bucket) -> [models.Dataset]: @staticmethod def count_dataset_tables(session, dataset_uri): return ( - session.query(models.DatasetTable) - .filter(models.DatasetTable.datasetUri == dataset_uri) + session.query(DatasetTable) + .filter(DatasetTable.datasetUri == dataset_uri) .count() ) diff --git a/backend/dataall/db/api/share_object.py b/backend/dataall/db/api/share_object.py index bd0215190..4fddda5e9 100644 --- a/backend/dataall/db/api/share_object.py +++ b/backend/dataall/db/api/share_object.py @@ -10,7 +10,7 @@ from .. import api, utils from .. import models, exceptions, permissions, paginate from ..models.Enums import ShareObjectStatus, ShareItemStatus, ShareObjectActions, ShareItemActions, ShareableType, PrincipalType -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable logger = logging.getLogger(__name__) @@ -422,7 +422,7 @@ def create_share_object( if itemType == ShareableType.StorageLocation.value: item = session.query(DatasetStorageLocation).get(itemUri) if itemType == ShareableType.Table.value: - item = session.query(models.DatasetTable).get(itemUri) + item = session.query(DatasetTable).get(itemUri) share_item = ( session.query(models.ShareObjectItem) @@ -605,7 +605,7 @@ def approve_share_object( group=share.principalId, permissions=permissions.DATASET_TABLE_READ, resource_uri=table.itemUri, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) api.Notification.notify_share_object_approval(session, username, dataset, share) @@ -717,7 +717,7 @@ def get_share_item( ShareObject.get_share_item_by_uri(session, data['shareItemUri']), ) if share_item.itemType == ShareableType.Table.value: - return session.query(models.DatasetTable).get(share_item.itemUri) + return session.query(DatasetTable).get(share_item.itemUri) if share_item.itemType == ShareableType.StorageLocation: return session.Query(DatasetStorageLocation).get(share_item.itemUri) @@ -762,7 +762,7 @@ def add_share_object_item( Share_SM.update_state(session, share, new_share_state) if itemType == ShareableType.Table.value: - item: models.DatasetTable = session.query(models.DatasetTable).get(itemUri) + item: DatasetTable = session.query(DatasetTable).get(itemUri) if item and item.region != target_environment.region: raise exceptions.UnauthorizedOperation( action=permissions.ADD_ITEM, @@ -944,10 +944,10 @@ def list_shareable_items( # marking the table as part of the shareObject tables = ( session.query( - models.DatasetTable.tableUri.label('itemUri'), + DatasetTable.tableUri.label('itemUri'), func.coalesce('DatasetTable').label('itemType'), - models.DatasetTable.GlueTableName.label('itemName'), - models.DatasetTable.description.label('description'), + DatasetTable.GlueTableName.label('itemName'), + DatasetTable.description.label('description'), models.ShareObjectItem.shareItemUri.label('shareItemUri'), models.ShareObjectItem.status.label('status'), case( @@ -959,10 +959,10 @@ def list_shareable_items( models.ShareObjectItem, and_( models.ShareObjectItem.shareUri == share.shareUri, - models.DatasetTable.tableUri == models.ShareObjectItem.itemUri, + DatasetTable.tableUri == models.ShareObjectItem.itemUri, ), ) - .filter(models.DatasetTable.datasetUri == datasetUri) + .filter(DatasetTable.datasetUri == datasetUri) ) if data: if data.get("isRevokable"): @@ -1145,7 +1145,7 @@ def update_share_item_status_batch( def find_share_item_by_table( session, share: models.ShareObject, - table: models.DatasetTable, + table: DatasetTable, ) -> models.ShareObjectItem: share_item: models.ShareObjectItem = ( session.query(models.ShareObjectItem) @@ -1247,10 +1247,10 @@ def get_share_data_items(session, share_uri, status): raise exceptions.ObjectNotFound('Share', share_uri) tables = ( - session.query(models.DatasetTable) + session.query(DatasetTable) .join( models.ShareObjectItem, - models.ShareObjectItem.itemUri == models.DatasetTable.tableUri, + models.ShareObjectItem.itemUri == DatasetTable.tableUri, ) .join( models.ShareObject, diff --git a/backend/dataall/db/models/DatasetTable.py b/backend/dataall/db/models/DatasetTable.py deleted file mode 100644 index e97174167..000000000 --- a/backend/dataall/db/models/DatasetTable.py +++ /dev/null @@ -1,32 +0,0 @@ -from sqlalchemy import Column, String, Text -from sqlalchemy.dialects import postgresql -from sqlalchemy.orm import query_expression - -from .. import Base -from .. import Resource, utils - - -class DatasetTable(Resource, Base): - __tablename__ = 'dataset_table' - datasetUri = Column(String, nullable=False) - tableUri = Column(String, primary_key=True, default=utils.uuid('table')) - AWSAccountId = Column(String, nullable=False) - S3BucketName = Column(String, nullable=False) - S3Prefix = Column(String, nullable=False) - GlueDatabaseName = Column(String, nullable=False) - GlueTableName = Column(String, nullable=False) - GlueTableConfig = Column(Text) - GlueTableProperties = Column(postgresql.JSON, default={}) - LastGlueTableStatus = Column(String, default='InSync') - region = Column(String, default='eu-west-1') - # LastGeneratedPreviewDate= Column(DateTime, default=None) - confidentiality = Column(String, nullable=True) - userRoleForTable = query_expression() - projectPermission = query_expression() - redshiftClusterPermission = query_expression() - stage = Column(String, default='RAW') - topics = Column(postgresql.ARRAY(String), nullable=True) - confidentiality = Column(String, nullable=False, default='C1') - - def uri(self): - return self.tableUri diff --git a/backend/dataall/db/models/__init__.py b/backend/dataall/db/models/__init__.py index c288527cf..123547f8c 100644 --- a/backend/dataall/db/models/__init__.py +++ b/backend/dataall/db/models/__init__.py @@ -6,7 +6,6 @@ from .DashboardShare import DashboardShareStatus from .Dataset import Dataset from .DatasetQualityRule import DatasetQualityRule -from .DatasetTable import DatasetTable from .Environment import Environment from .EnvironmentGroup import EnvironmentGroup from .FeedMessage import FeedMessage diff --git a/backend/dataall/modules/datasets/__init__.py b/backend/dataall/modules/datasets/__init__.py index e02a9d9bf..4f8964016 100644 --- a/backend/dataall/modules/datasets/__init__.py +++ b/backend/dataall/modules/datasets/__init__.py @@ -3,7 +3,7 @@ from typing import List from dataall.db import models -from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetStorageLocation, DatasetTable from dataall.modules.datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.datasets.indexers.location_indexer import DatasetLocationIndexer from dataall.modules.datasets.indexers.table_indexer import DatasetTableIndexer @@ -27,6 +27,7 @@ def __init__(self): FeedRegistry.register(FeedDefinition("DatasetTableColumn", DatasetTableColumn)) FeedRegistry.register(FeedDefinition("DatasetStorageLocation", DatasetStorageLocation)) + FeedRegistry.register(FeedDefinition("DatasetTable", DatasetTable)) GlossaryRegistry.register(GlossaryDefinition("Column", "DatasetTableColumn", DatasetTableColumn)) GlossaryRegistry.register(GlossaryDefinition( @@ -46,7 +47,7 @@ def __init__(self): GlossaryRegistry.register(GlossaryDefinition( target_type="DatasetTable", object_type="DatasetTable", - model=models.DatasetTable, + model=DatasetTable, reindexer=DatasetTableIndexer )) diff --git a/backend/dataall/modules/datasets/api/__init__.py b/backend/dataall/modules/datasets/api/__init__.py index 4c279340e..7fe2d06a1 100644 --- a/backend/dataall/modules/datasets/api/__init__.py +++ b/backend/dataall/modules/datasets/api/__init__.py @@ -2,7 +2,8 @@ from dataall.modules.datasets.api import ( table_column, profiling, - storage_location + storage_location, + table ) -__all__ = ["table_column", "profiling", "storage_location"] +__all__ = ["table_column", "profiling", "storage_location", "table"] diff --git a/backend/dataall/modules/datasets/api/storage_location/resolvers.py b/backend/dataall/modules/datasets/api/storage_location/resolvers.py index 09cf4b14a..6f8d82e43 100644 --- a/backend/dataall/modules/datasets/api/storage_location/resolvers.py +++ b/backend/dataall/modules/datasets/api/storage_location/resolvers.py @@ -88,7 +88,7 @@ def remove_storage_location(context, source, locationUri: str = None): data={'locationUri': location.locationUri}, check_perm=True, ) - indexers.delete_doc(es=context.es, doc_id=location.locationUri) + DatasetLocationIndexer.delete_doc(doc_id=location.locationUri) return True diff --git a/backend/dataall/api/Objects/DatasetTable/__init__.py b/backend/dataall/modules/datasets/api/table/__init__.py similarity index 75% rename from backend/dataall/api/Objects/DatasetTable/__init__.py rename to backend/dataall/modules/datasets/api/table/__init__.py index dfa46b264..3aaba05cf 100644 --- a/backend/dataall/api/Objects/DatasetTable/__init__.py +++ b/backend/dataall/modules/datasets/api/table/__init__.py @@ -1,4 +1,4 @@ -from . import ( +from dataall.modules.datasets.api.table import ( input_types, mutations, queries, diff --git a/backend/dataall/api/Objects/DatasetTable/input_types.py b/backend/dataall/modules/datasets/api/table/input_types.py similarity index 93% rename from backend/dataall/api/Objects/DatasetTable/input_types.py rename to backend/dataall/modules/datasets/api/table/input_types.py index a5bd07998..2e6649515 100644 --- a/backend/dataall/api/Objects/DatasetTable/input_types.py +++ b/backend/dataall/modules/datasets/api/table/input_types.py @@ -1,5 +1,5 @@ -from ... import gql -from ....api.constants import SortDirection, GraphQLEnumMapper +from dataall.api import gql +from dataall.api.constants import SortDirection, GraphQLEnumMapper NewDatasetTableInput = gql.InputType( diff --git a/backend/dataall/api/Objects/DatasetTable/mutations.py b/backend/dataall/modules/datasets/api/table/mutations.py similarity index 82% rename from backend/dataall/api/Objects/DatasetTable/mutations.py rename to backend/dataall/modules/datasets/api/table/mutations.py index 532605cff..7a26a6c15 100644 --- a/backend/dataall/api/Objects/DatasetTable/mutations.py +++ b/backend/dataall/modules/datasets/api/table/mutations.py @@ -1,9 +1,14 @@ -from ... import gql -from .input_types import ( +from dataall.api import gql +from dataall.modules.datasets.api.table.input_types import ( ModifyDatasetTableInput, NewDatasetTableInput, ) -from .resolvers import * +from dataall.modules.datasets.api.table.resolvers import ( + create_table, + update_table, + delete_table, + publish_table_update +) createDatasetTable = gql.MutationField( name='createDatasetTable', diff --git a/backend/dataall/api/Objects/DatasetTable/queries.py b/backend/dataall/modules/datasets/api/table/queries.py similarity index 81% rename from backend/dataall/api/Objects/DatasetTable/queries.py rename to backend/dataall/modules/datasets/api/table/queries.py index 8f7809e62..a6d8d48cf 100644 --- a/backend/dataall/api/Objects/DatasetTable/queries.py +++ b/backend/dataall/modules/datasets/api/table/queries.py @@ -1,7 +1,11 @@ -from ... import gql -from .input_types import DatasetTableFilter -from .resolvers import * -from .schema import ( +from dataall.api import gql +from dataall.modules.datasets.api.table.input_types import DatasetTableFilter +from dataall.modules.datasets.api.table.resolvers import ( + get_table, + list_shared_tables_by_env_dataset, + preview +) +from dataall.modules.datasets.api.table.schema import ( DatasetTable, DatasetTableSearchResult, ) diff --git a/backend/dataall/api/Objects/DatasetTable/resolvers.py b/backend/dataall/modules/datasets/api/table/resolvers.py similarity index 88% rename from backend/dataall/api/Objects/DatasetTable/resolvers.py rename to backend/dataall/modules/datasets/api/table/resolvers.py index 7df3d8cba..a34dfa9c5 100644 --- a/backend/dataall/api/Objects/DatasetTable/resolvers.py +++ b/backend/dataall/modules/datasets/api/table/resolvers.py @@ -4,15 +4,15 @@ from botocore.exceptions import ClientError from pyathena import connect -from .... import db -from ..Dataset.resolvers import get_dataset -from ....api.context import Context -from ....aws.handlers.service_handlers import Worker -from ....aws.handlers.sts import SessionHelper -from ....db import permissions, models -from ....db.api import ResourcePolicy, Glossary -from ....searchproxy import indexers -from ....utils import json_utils +from dataall import db +from dataall.api.Objects.Dataset.resolvers import get_dataset +from dataall.api.context import Context +from dataall.aws.handlers.service_handlers import Worker +from dataall.aws.handlers.sts import SessionHelper +from dataall.db import permissions, models +from dataall.db.api import ResourcePolicy, Glossary +from dataall.modules.datasets.db.models import DatasetTable +from dataall.utils import json_utils from dataall.modules.datasets.indexers.table_indexer import DatasetTableIndexer from dataall.modules.datasets.services.dataset_table import DatasetTableService @@ -98,13 +98,13 @@ def delete_table(context, source, tableUri: str = None): }, check_perm=True, ) - indexers.delete_doc(es=context.es, doc_id=tableUri) + DatasetTableIndexer.delete_doc(doc_id=tableUri) return True def preview(context, source, tableUri: str = None): with context.engine.scoped_session() as session: - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, tableUri ) dataset = db.api.Dataset.get_dataset_by_uri(session, table.datasetUri) @@ -155,17 +155,17 @@ def preview(context, source, tableUri: str = None): return {'rows': rows, 'fields': fields} -def get_glue_table_properties(context: Context, source: models.DatasetTable, **kwargs): +def get_glue_table_properties(context: Context, source: DatasetTable, **kwargs): if not source: return None with context.engine.scoped_session() as session: - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, source.tableUri ) return json_utils.to_string(table.GlueTableProperties).replace('\\', ' ') -def resolve_dataset(context, source: models.DatasetTable, **kwargs): +def resolve_dataset(context, source: DatasetTable, **kwargs): if not source: return None with context.engine.scoped_session() as session: @@ -177,7 +177,7 @@ def resolve_dataset(context, source: models.DatasetTable, **kwargs): return dataset_with_role -def resolve_glossary_terms(context: Context, source: models.DatasetTable, **kwargs): +def resolve_glossary_terms(context: Context, source: DatasetTable, **kwargs): if not source: return None with context.engine.scoped_session() as session: @@ -188,7 +188,7 @@ def resolve_glossary_terms(context: Context, source: models.DatasetTable, **kwar def publish_table_update(context: Context, source, tableUri: str = None): with context.engine.scoped_session() as session: - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, tableUri ) ResourcePolicy.check_user_resource_permission( @@ -217,7 +217,7 @@ def publish_table_update(context: Context, source, tableUri: str = None): return True -def resolve_redshift_copy_schema(context, source: models.DatasetTable, clusterUri: str): +def resolve_redshift_copy_schema(context, source: DatasetTable, clusterUri: str): if not source: return None with context.engine.scoped_session() as session: @@ -227,7 +227,7 @@ def resolve_redshift_copy_schema(context, source: models.DatasetTable, clusterUr def resolve_redshift_copy_location( - context, source: models.DatasetTable, clusterUri: str + context, source: DatasetTable, clusterUri: str ): with context.engine.scoped_session() as session: return db.api.RedshiftCluster.get_cluster_dataset_table( diff --git a/backend/dataall/api/Objects/DatasetTable/schema.py b/backend/dataall/modules/datasets/api/table/schema.py similarity index 94% rename from backend/dataall/api/Objects/DatasetTable/schema.py rename to backend/dataall/modules/datasets/api/table/schema.py index 74d413818..666bf7e35 100644 --- a/backend/dataall/api/Objects/DatasetTable/schema.py +++ b/backend/dataall/modules/datasets/api/table/schema.py @@ -1,7 +1,13 @@ from dataall.modules.datasets.api.table_column.resolvers import list_table_columns -from ... import gql -from .resolvers import * -from ...constants import GraphQLEnumMapper +from dataall.api import gql +from dataall.modules.datasets.api.table.resolvers import ( + resolve_dataset, + get_glue_table_properties, + resolve_redshift_copy_location, + resolve_glossary_terms, + resolve_redshift_copy_schema +) +from dataall.api.constants import GraphQLEnumMapper TablePermission = gql.ObjectType( name='TablePermission', diff --git a/backend/dataall/modules/datasets/api/table_column/resolvers.py b/backend/dataall/modules/datasets/api/table_column/resolvers.py index 8e78a042e..b27a99dd3 100644 --- a/backend/dataall/modules/datasets/api/table_column/resolvers.py +++ b/backend/dataall/modules/datasets/api/table_column/resolvers.py @@ -6,12 +6,12 @@ from dataall.db import paginate, permissions, models from dataall.db.api import ResourcePolicy from dataall.modules.datasets.services.dataset_table import DatasetTableService -from dataall.modules.datasets.db.models import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetTable def list_table_columns( context: Context, - source: models.DatasetTable, + source: DatasetTable, tableUri: str = None, filter: dict = None, ): @@ -46,7 +46,7 @@ def list_table_columns( def sync_table_columns(context: Context, source, tableUri: str = None): with context.engine.scoped_session() as session: - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, tableUri ) ResourcePolicy.check_user_resource_permission( @@ -81,7 +81,7 @@ def update_table_column( ).get(columnUri) if not column: raise db.exceptions.ObjectNotFound('Column', columnUri) - table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri( + table: DatasetTable = DatasetTableService.get_dataset_table_by_uri( session, column.tableUri ) ResourcePolicy.check_user_resource_permission( diff --git a/backend/dataall/modules/datasets/db/models.py b/backend/dataall/modules/datasets/db/models.py index 5ca58a7d4..adfb40514 100644 --- a/backend/dataall/modules/datasets/db/models.py +++ b/backend/dataall/modules/datasets/db/models.py @@ -1,5 +1,5 @@ -from sqlalchemy import Boolean, Column, String -from sqlalchemy.dialects.postgresql import JSON +from sqlalchemy import Boolean, Column, String, Text +from sqlalchemy.dialects.postgresql import JSON, ARRAY from sqlalchemy.orm import query_expression from dataall.db import Base, Resource, utils @@ -55,3 +55,29 @@ class DatasetStorageLocation(Resource, Base): def uri(self): return self.locationUri + + +class DatasetTable(Resource, Base): + __tablename__ = 'dataset_table' + datasetUri = Column(String, nullable=False) + tableUri = Column(String, primary_key=True, default=utils.uuid('table')) + AWSAccountId = Column(String, nullable=False) + S3BucketName = Column(String, nullable=False) + S3Prefix = Column(String, nullable=False) + GlueDatabaseName = Column(String, nullable=False) + GlueTableName = Column(String, nullable=False) + GlueTableConfig = Column(Text) + GlueTableProperties = Column(JSON, default={}) + LastGlueTableStatus = Column(String, default='InSync') + region = Column(String, default='eu-west-1') + # LastGeneratedPreviewDate= Column(DateTime, default=None) + confidentiality = Column(String, nullable=True) + userRoleForTable = query_expression() + projectPermission = query_expression() + redshiftClusterPermission = query_expression() + stage = Column(String, default='RAW') + topics = Column(ARRAY(String), nullable=True) + confidentiality = Column(String, nullable=False, default='C1') + + def uri(self): + return self.tableUri diff --git a/backend/dataall/modules/datasets/handlers/glue_column_handler.py b/backend/dataall/modules/datasets/handlers/glue_column_handler.py index ea2fb82b4..6882d4e12 100644 --- a/backend/dataall/modules/datasets/handlers/glue_column_handler.py +++ b/backend/dataall/modules/datasets/handlers/glue_column_handler.py @@ -5,7 +5,7 @@ from dataall.aws.handlers.service_handlers import Worker from dataall.modules.datasets.aws.glue_table_client import GlueTableClient from dataall.modules.datasets.aws.lf_table_client import LakeFormationTableClient -from dataall.modules.datasets.db.models import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetTable from dataall.modules.datasets.services.dataset_table import DatasetTableService log = logging.getLogger(__name__) @@ -18,7 +18,7 @@ class DatasetColumnGlueHandler: @Worker.handler('glue.table.columns') def get_table_columns(engine, task: models.Task): with engine.scoped_session() as session: - dataset_table: models.DatasetTable = session.query(models.DatasetTable).get( + dataset_table: DatasetTable = session.query(DatasetTable).get( task.targetUri ) aws = SessionHelper.remote_session(dataset_table.AWSAccountId) @@ -33,12 +33,8 @@ def get_table_columns(engine, task: models.Task): @Worker.handler('glue.table.update_column') def update_table_columns(engine, task: models.Task): with engine.scoped_session() as session: - column: DatasetTableColumn = session.query( - DatasetTableColumn - ).get(task.targetUri) - table: models.DatasetTable = session.query(models.DatasetTable).get( - column.tableUri - ) + column: DatasetTableColumn = session.query(DatasetTableColumn).get(task.targetUri) + table: DatasetTable = session.query(DatasetTable).get(column.tableUri) aws_session = SessionHelper.remote_session(table.AWSAccountId) diff --git a/backend/dataall/modules/datasets/indexers/table_indexer.py b/backend/dataall/modules/datasets/indexers/table_indexer.py index fec9e4f7c..fca50b43e 100644 --- a/backend/dataall/modules/datasets/indexers/table_indexer.py +++ b/backend/dataall/modules/datasets/indexers/table_indexer.py @@ -2,6 +2,7 @@ from operator import and_ from dataall.db import models +from dataall.modules.datasets.db.models import DatasetTable from dataall.modules.datasets.indexers.dataset_indexer import DatasetIndexer from dataall.searchproxy.base_indexer import BaseIndexer @@ -12,14 +13,14 @@ class DatasetTableIndexer(BaseIndexer): def upsert(cls, session, table_uri: str): table = ( session.query( - models.DatasetTable.datasetUri.label('datasetUri'), - models.DatasetTable.tableUri.label('uri'), - models.DatasetTable.name.label('name'), - models.DatasetTable.owner.label('owner'), - models.DatasetTable.label.label('label'), - models.DatasetTable.description.label('description'), + DatasetTable.datasetUri.label('datasetUri'), + DatasetTable.tableUri.label('uri'), + DatasetTable.name.label('name'), + DatasetTable.owner.label('owner'), + DatasetTable.label.label('label'), + DatasetTable.description.label('description'), models.Dataset.confidentiality.label('classification'), - models.DatasetTable.tags.label('tags'), + DatasetTable.tags.label('tags'), models.Dataset.topics.label('topics'), models.Dataset.region.label('region'), models.Organization.organizationUri.label('orgUri'), @@ -29,13 +30,13 @@ def upsert(cls, session, table_uri: str): models.Dataset.SamlAdminGroupName.label('admins'), models.Dataset.GlueDatabaseName.label('database'), models.Dataset.S3BucketName.label('source'), - models.DatasetTable.created, - models.DatasetTable.updated, - models.DatasetTable.deleted, + DatasetTable.created, + DatasetTable.updated, + DatasetTable.deleted, ) .join( models.Dataset, - models.Dataset.datasetUri == models.DatasetTable.datasetUri, + models.Dataset.datasetUri == DatasetTable.datasetUri, ) .join( models.Organization, @@ -45,7 +46,7 @@ def upsert(cls, session, table_uri: str): models.Environment, models.Dataset.environmentUri == models.Environment.environmentUri, ) - .filter(models.DatasetTable.tableUri == table_uri) + .filter(DatasetTable.tableUri == table_uri) .first() ) @@ -84,11 +85,11 @@ def upsert(cls, session, table_uri: str): @classmethod def upsert_all(cls, session, dataset_uri: str): tables = ( - session.query(models.DatasetTable) + session.query(DatasetTable) .filter( and_( - models.DatasetTable.datasetUri == dataset_uri, - models.DatasetTable.LastGlueTableStatus != 'Deleted', + DatasetTable.datasetUri == dataset_uri, + DatasetTable.LastGlueTableStatus != 'Deleted', ) ) .all() @@ -96,3 +97,19 @@ def upsert_all(cls, session, dataset_uri: str): for table in tables: DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri) return tables + + @classmethod + def remove_all_deleted(cls, session, dataset_uri: str): + tables = ( + session.query(DatasetTable) + .filter( + and_( + DatasetTable.datasetUri == dataset_uri, + DatasetTable.LastGlueTableStatus == 'Deleted', + ) + ) + .all() + ) + for table in tables: + cls.delete_doc(doc_id=table.tableUri) + return tables diff --git a/backend/dataall/modules/datasets/services/dataset_profiling_service.py b/backend/dataall/modules/datasets/services/dataset_profiling_service.py index 5b6ca8d41..01bc3dc57 100644 --- a/backend/dataall/modules/datasets/services/dataset_profiling_service.py +++ b/backend/dataall/modules/datasets/services/dataset_profiling_service.py @@ -2,7 +2,7 @@ from dataall.db import paginate, models from dataall.db.exceptions import ObjectNotFound -from dataall.modules.datasets.db.models import DatasetProfilingRun +from dataall.modules.datasets.db.models import DatasetProfilingRun, DatasetTable class DatasetProfilingService: @@ -18,7 +18,7 @@ def start_profiling( raise ObjectNotFound('Dataset', datasetUri) if tableUri and not GlueTableName: - table: models.DatasetTable = session.query(models.DatasetTable).get( + table: DatasetTable = session.query(DatasetTable).get( tableUri ) if not table: @@ -105,13 +105,13 @@ def list_table_profiling_runs(session, tableUri, filter): q = ( session.query(DatasetProfilingRun) .join( - models.DatasetTable, - models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, + DatasetTable, + DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) .filter( and_( - models.DatasetTable.tableUri == tableUri, - models.DatasetTable.GlueTableName + DatasetTable.tableUri == tableUri, + DatasetTable.GlueTableName == DatasetProfilingRun.GlueTableName, ) ) @@ -126,12 +126,12 @@ def get_table_last_profiling_run(session, tableUri): return ( session.query(DatasetProfilingRun) .join( - models.DatasetTable, - models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, + DatasetTable, + DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) - .filter(models.DatasetTable.tableUri == tableUri) + .filter(DatasetTable.tableUri == tableUri) .filter( - models.DatasetTable.GlueTableName + DatasetTable.GlueTableName == DatasetProfilingRun.GlueTableName ) .order_by(DatasetProfilingRun.created.desc()) @@ -143,12 +143,12 @@ def get_table_last_profiling_run_with_results(session, tableUri): return ( session.query(DatasetProfilingRun) .join( - models.DatasetTable, - models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, + DatasetTable, + DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) - .filter(models.DatasetTable.tableUri == tableUri) + .filter(DatasetTable.tableUri == tableUri) .filter( - models.DatasetTable.GlueTableName + DatasetTable.GlueTableName == DatasetProfilingRun.GlueTableName ) .filter(DatasetProfilingRun.results.isnot(None)) diff --git a/backend/dataall/modules/datasets/services/dataset_share_service.py b/backend/dataall/modules/datasets/services/dataset_share_service.py index 9ca84a1cf..3503e86fe 100644 --- a/backend/dataall/modules/datasets/services/dataset_share_service.py +++ b/backend/dataall/modules/datasets/services/dataset_share_service.py @@ -8,7 +8,7 @@ from dataall.db import models, permissions from dataall.db.api import has_resource_perm, ShareItemSM from dataall.db.paginator import paginate -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable class DatasetShareService: @@ -41,9 +41,9 @@ def paginated_shared_with_environment_datasets( models.ShareObjectItem.itemType == ShareableType.Table.value, func.concat( - models.DatasetTable.GlueDatabaseName, + DatasetTable.GlueDatabaseName, '.', - models.DatasetTable.GlueTableName, + DatasetTable.GlueTableName, ), ), ( @@ -73,8 +73,8 @@ def paginated_shared_with_environment_datasets( == models.Environment.organizationUri, ) .outerjoin( - models.DatasetTable, - models.ShareObjectItem.itemUri == models.DatasetTable.tableUri, + DatasetTable, + models.ShareObjectItem.itemUri == DatasetTable.tableUri, ) .outerjoin( DatasetStorageLocation, @@ -137,9 +137,9 @@ def paginated_shared_with_environment_group_datasets( models.ShareObjectItem.itemType == ShareableType.Table.value, func.concat( - models.DatasetTable.GlueDatabaseName, + DatasetTable.GlueDatabaseName, '.', - models.DatasetTable.GlueTableName, + DatasetTable.GlueTableName, ), ), ( @@ -169,8 +169,8 @@ def paginated_shared_with_environment_group_datasets( == models.Environment.organizationUri, ) .outerjoin( - models.DatasetTable, - models.ShareObjectItem.itemUri == models.DatasetTable.tableUri, + DatasetTable, + models.ShareObjectItem.itemUri == DatasetTable.tableUri, ) .outerjoin( DatasetStorageLocation, diff --git a/backend/dataall/modules/datasets/services/dataset_table.py b/backend/dataall/modules/datasets/services/dataset_table.py index 1c28469f6..7dc02edca 100644 --- a/backend/dataall/modules/datasets/services/dataset_table.py +++ b/backend/dataall/modules/datasets/services/dataset_table.py @@ -6,7 +6,7 @@ from dataall.db.api import has_tenant_perm, has_resource_perm, Glossary, ResourcePolicy, Environment from dataall.db.models import Dataset from dataall.utils import json_utils -from dataall.modules.datasets.db.models import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetTable logger = logging.getLogger(__name__) @@ -22,14 +22,14 @@ def create_dataset_table( uri: str, data: dict = None, check_perm: bool = False, - ) -> models.DatasetTable: + ) -> DatasetTable: dataset = api.Dataset.get_dataset_by_uri(session, uri) exists = ( - session.query(models.DatasetTable) + session.query(DatasetTable) .filter( and_( - models.DatasetTable.datasetUri == uri, - models.DatasetTable.GlueTableName == data['name'], + DatasetTable.datasetUri == uri, + DatasetTable.GlueTableName == data['name'], ) ) .count() @@ -41,7 +41,7 @@ def create_dataset_table( message=f'table: {data["name"]} already exist on dataset {uri}', ) - table = models.DatasetTable( + table = DatasetTable( datasetUri=uri, label=data['name'], name=data['name'], @@ -72,7 +72,7 @@ def create_dataset_table( group=group, permissions=permissions.DATASET_TABLE_READ, resource_uri=table.tableUri, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) return table @@ -87,13 +87,13 @@ def list_dataset_tables( check_perm: bool = False, ) -> dict: query = ( - session.query(models.DatasetTable) - .filter(models.DatasetTable.datasetUri == uri) - .order_by(models.DatasetTable.created.desc()) + session.query(DatasetTable) + .filter(DatasetTable.datasetUri == uri) + .order_by(DatasetTable.created.desc()) ) if data.get('term'): term = data.get('term') - query = query.filter(models.DatasetTable.label.ilike('%' + term + '%')) + query = query.filter(DatasetTable.label.ilike('%' + term + '%')) return paginate( query, page=data.get('page', 1), page_size=data.get('pageSize', 10) ).to_dict() @@ -107,7 +107,7 @@ def get_dataset_table( uri: str, data: dict = None, check_perm: bool = False, - ) -> models.DatasetTable: + ) -> DatasetTable: return DatasetTableService.get_dataset_table_by_uri(session, data['tableUri']) @staticmethod @@ -183,10 +183,10 @@ def query_dataset_tables_shared_with_env( """ share_item_shared_states = api.ShareItemSM.get_share_item_shared_states() env_tables_shared = ( - session.query(models.DatasetTable) # all tables + session.query(DatasetTable) # all tables .join( models.ShareObjectItem, # found in ShareObjectItem - models.ShareObjectItem.itemUri == models.DatasetTable.tableUri, + models.ShareObjectItem.itemUri == DatasetTable.tableUri, ) .join( models.ShareObject, # jump to share object @@ -218,7 +218,7 @@ def get_dataset_tables_shared_with_env( @staticmethod def get_dataset_table_by_uri(session, table_uri): - table: models.DatasetTable = session.query(models.DatasetTable).get(table_uri) + table: DatasetTable = session.query(DatasetTable).get(table_uri) if not table: raise exceptions.ObjectNotFound('DatasetTable', table_uri) return table @@ -229,8 +229,8 @@ def sync_existing_tables(session, datasetUri, glue_tables=None): dataset: Dataset = session.query(Dataset).get(datasetUri) if dataset: existing_tables = ( - session.query(models.DatasetTable) - .filter(models.DatasetTable.datasetUri == datasetUri) + session.query(DatasetTable) + .filter(DatasetTable.datasetUri == datasetUri) .all() ) existing_table_names = [e.GlueTableName for e in existing_tables] @@ -245,7 +245,7 @@ def sync_existing_tables(session, datasetUri, glue_tables=None): logger.info( f'Storing new table: {table} for dataset db {dataset.GlueDatabaseName}' ) - updated_table = models.DatasetTable( + updated_table = DatasetTable( datasetUri=dataset.datasetUri, label=table['Name'], name=table['Name'], @@ -272,13 +272,13 @@ def sync_existing_tables(session, datasetUri, glue_tables=None): group=group, permissions=permissions.DATASET_TABLE_READ, resource_uri=updated_table.tableUri, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) else: logger.info( f'Updating table: {table} for dataset db {dataset.GlueDatabaseName}' ) - updated_table: models.DatasetTable = ( + updated_table: DatasetTable = ( existing_dataset_tables_map.get(table['Name']) ) updated_table.GlueTableProperties = json_utils.to_json( @@ -345,13 +345,13 @@ def delete_all_table_columns(session, dataset_table): @staticmethod def get_table_by_s3_prefix(session, s3_prefix, accountid, region): - table: models.DatasetTable = ( - session.query(models.DatasetTable) + table: DatasetTable = ( + session.query(DatasetTable) .filter( and_( - models.DatasetTable.S3Prefix.startswith(s3_prefix), - models.DatasetTable.AWSAccountId == accountid, - models.DatasetTable.region == region, + DatasetTable.S3Prefix.startswith(s3_prefix), + DatasetTable.AWSAccountId == accountid, + DatasetTable.region == region, ) ) .first() diff --git a/backend/dataall/modules/datasets/tasks/subscription_service.py b/backend/dataall/modules/datasets/tasks/subscription_service.py index 901865812..ae1c522e0 100644 --- a/backend/dataall/modules/datasets/tasks/subscription_service.py +++ b/backend/dataall/modules/datasets/tasks/subscription_service.py @@ -17,7 +17,7 @@ from dataall.utils import json_utils from dataall.modules.datasets.services.dataset_table import DatasetTableService from dataall.modules.datasets.services.dataset_location import DatasetLocationService -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable root = logging.getLogger() root.setLevel(logging.INFO) @@ -68,7 +68,7 @@ def notify_consumers(engine, messages): @staticmethod def publish_table_update_message(engine, message): with engine.scoped_session() as session: - table: models.DatasetTable = DatasetTableService.get_table_by_s3_prefix( + table: DatasetTable = DatasetTableService.get_table_by_s3_prefix( session, message.get('prefix'), message.get('accountid'), @@ -139,7 +139,7 @@ def publish_location_update_message(session, message): @staticmethod def store_dataquality_results(session, message): - table: models.DatasetTable = DatasetTableService.get_table_by_s3_prefix( + table: DatasetTable = DatasetTableService.get_table_by_s3_prefix( session, message.get('prefix'), message.get('accountid'), @@ -207,7 +207,7 @@ def set_columns_type(quality_results, message): @staticmethod def publish_sns_message( - engine, message, dataset, share_items, prefix, table: models.DatasetTable = None + engine, message, dataset, share_items, prefix, table: DatasetTable = None ): with engine.scoped_session() as session: for item in share_items: @@ -290,7 +290,7 @@ def redshift_copy( message, dataset: models.Dataset, environment: models.Environment, - table: models.DatasetTable, + table: DatasetTable, ): log.info( f'Redshift copy starting ' diff --git a/backend/dataall/modules/datasets/tasks/tables_syncer.py b/backend/dataall/modules/datasets/tasks/tables_syncer.py index 6669c215d..a758eb48a 100644 --- a/backend/dataall/modules/datasets/tasks/tables_syncer.py +++ b/backend/dataall/modules/datasets/tasks/tables_syncer.py @@ -8,9 +8,8 @@ from dataall.aws.handlers.sts import SessionHelper from dataall.db import get_engine from dataall.db import models +from dataall.modules.datasets.db.models import DatasetTable from dataall.modules.datasets.indexers.table_indexer import DatasetTableIndexer -from dataall.searchproxy import indexers -from dataall.searchproxy.connect import connect from dataall.utils.alarm_service import AlarmService from dataall.modules.datasets.services.dataset_table import DatasetTableService @@ -21,7 +20,7 @@ log = logging.getLogger(__name__) -def sync_tables(engine, es=None): +def sync_tables(engine): with engine.scoped_session() as session: processed_tables = [] all_datasets: [models.Dataset] = db.api.Dataset.list_all_active_datasets( @@ -68,8 +67,8 @@ def sync_tables(engine, es=None): ) tables = ( - session.query(models.DatasetTable) - .filter(models.DatasetTable.datasetUri == dataset.datasetUri) + session.query(DatasetTable) + .filter(DatasetTable.datasetUri == dataset.datasetUri) .all() ) @@ -87,8 +86,7 @@ def sync_tables(engine, es=None): processed_tables.extend(tables) - if es: - DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) except Exception as e: log.error( f'Failed to sync tables for dataset ' @@ -112,5 +110,4 @@ def is_assumable_pivot_role(env: models.Environment): if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - ES = connect(envname=ENVNAME) - sync_tables(engine=ENGINE, es=ES) + sync_tables(engine=ENGINE) diff --git a/backend/dataall/searchproxy/base_indexer.py b/backend/dataall/searchproxy/base_indexer.py index fd0cb5e0e..4f1f75322 100644 --- a/backend/dataall/searchproxy/base_indexer.py +++ b/backend/dataall/searchproxy/base_indexer.py @@ -33,6 +33,12 @@ def es(cls): def upsert(session, target_id): raise NotImplementedError("Method upsert is not implemented") + @classmethod + def delete_doc(cls, doc_id): + es = cls.es() + es.delete(index=cls._INDEX, id=doc_id, ignore=[400, 404]) + return True + @classmethod def _index(cls, doc_id, doc): es = cls.es() diff --git a/backend/dataall/searchproxy/indexers.py b/backend/dataall/searchproxy/indexers.py index ce4145fc5..4655de65a 100644 --- a/backend/dataall/searchproxy/indexers.py +++ b/backend/dataall/searchproxy/indexers.py @@ -1,7 +1,5 @@ import logging -from sqlalchemy import and_ - from .. import db from ..db import models from dataall.searchproxy.base_indexer import BaseIndexer @@ -71,24 +69,3 @@ def upsert(cls, session, dashboard_uri: str): }, ) return dashboard - - -def remove_deleted_tables(session, es, datasetUri: str): - tables = ( - session.query(models.DatasetTable) - .filter( - and_( - models.DatasetTable.datasetUri == datasetUri, - models.DatasetTable.LastGlueTableStatus == 'Deleted', - ) - ) - .all() - ) - for table in tables: - delete_doc(es, doc_id=table.tableUri) - return tables - - -def delete_doc(es, doc_id, index='dataall-index'): - es.delete(index=index, id=doc_id, ignore=[400, 404]) - return True diff --git a/backend/dataall/tasks/bucket_policy_updater.py b/backend/dataall/tasks/bucket_policy_updater.py index 9932f53ae..6cb4c51ea 100644 --- a/backend/dataall/tasks/bucket_policy_updater.py +++ b/backend/dataall/tasks/bucket_policy_updater.py @@ -9,8 +9,8 @@ from ..aws.handlers.sts import SessionHelper from ..db import get_engine -from ..db import models, api -from dataall.modules.datasets.db.models import DatasetStorageLocation +from ..db import models +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable root = logging.getLogger() root.setLevel(logging.INFO) @@ -168,18 +168,18 @@ def get_shared_tables(self, dataset) -> typing.List[models.ShareObjectItem]: with self.engine.scoped_session() as session: tables = ( session.query( - models.DatasetTable.GlueDatabaseName.label('GlueDatabaseName'), - models.DatasetTable.GlueTableName.label('GlueTableName'), - models.DatasetTable.S3Prefix.label('S3Prefix'), - models.DatasetTable.AWSAccountId.label('SourceAwsAccountId'), - models.DatasetTable.region.label('SourceRegion'), + DatasetTable.GlueDatabaseName.label('GlueDatabaseName'), + DatasetTable.GlueTableName.label('GlueTableName'), + DatasetTable.S3Prefix.label('S3Prefix'), + DatasetTable.AWSAccountId.label('SourceAwsAccountId'), + DatasetTable.region.label('SourceRegion'), models.Environment.AwsAccountId.label('TargetAwsAccountId'), models.Environment.region.label('TargetRegion'), ) .join( models.ShareObjectItem, and_( - models.ShareObjectItem.itemUri == models.DatasetTable.tableUri + models.ShareObjectItem.itemUri == DatasetTable.tableUri ), ) .join( @@ -193,8 +193,8 @@ def get_shared_tables(self, dataset) -> typing.List[models.ShareObjectItem]: ) .filter( and_( - models.DatasetTable.datasetUri == dataset.datasetUri, - models.DatasetTable.deleted.is_(None), + DatasetTable.datasetUri == dataset.datasetUri, + DatasetTable.deleted.is_(None), models.ShareObjectItem.status == models.Enums.ShareObjectStatus.Approved.value, ) diff --git a/backend/dataall/tasks/catalog_indexer.py b/backend/dataall/tasks/catalog_indexer.py index 5d32800c7..945bdd214 100644 --- a/backend/dataall/tasks/catalog_indexer.py +++ b/backend/dataall/tasks/catalog_indexer.py @@ -5,13 +5,9 @@ from dataall.modules.datasets.indexers.location_indexer import DatasetLocationIndexer from dataall.modules.datasets.indexers.table_indexer import DatasetTableIndexer from .. import db -from ..db import get_engine, exceptions -from ..db import models +from dataall.db import get_engine, models from dataall.searchproxy.indexers import DashboardIndexer -from ..searchproxy.connect import ( - connect, -) -from ..utils.alarm_service import AlarmService +from dataall.utils.alarm_service import AlarmService root = logging.getLogger() root.setLevel(logging.INFO) @@ -20,12 +16,8 @@ log = logging.getLogger(__name__) -def index_objects(engine, es): +def index_objects(engine): try: - if not es: - raise exceptions.AWSResourceNotFound( - action='CATALOG_INDEXER_TASK', message='ES configuration not found' - ) indexed_objects_counter = 0 with engine.scoped_session() as session: @@ -58,5 +50,4 @@ def index_objects(engine, es): if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - ES = connect(envname=ENVNAME) - index_objects(engine=ENGINE, es=ES) + index_objects(engine=ENGINE) diff --git a/backend/dataall/tasks/data_sharing/share_managers/lf_share_manager.py b/backend/dataall/tasks/data_sharing/share_managers/lf_share_manager.py index 22bba64ca..4411cc824 100644 --- a/backend/dataall/tasks/data_sharing/share_managers/lf_share_manager.py +++ b/backend/dataall/tasks/data_sharing/share_managers/lf_share_manager.py @@ -11,7 +11,8 @@ from ....aws.handlers.sts import SessionHelper from ....aws.handlers.ram import Ram from ....db import api, exceptions, models -from ....utils.alarm_service import AlarmService +from dataall.modules.datasets.db.models import DatasetTable +from dataall.utils.alarm_service import AlarmService logger = logging.getLogger(__name__) @@ -22,8 +23,8 @@ def __init__( session, dataset: models.Dataset, share: models.ShareObject, - shared_tables: [models.DatasetTable], - revoked_tables: [models.DatasetTable], + shared_tables: [DatasetTable], + revoked_tables: [DatasetTable], source_environment: models.Environment, target_environment: models.Environment, env_group: models.EnvironmentGroup, @@ -83,7 +84,7 @@ def build_shared_db_name(self) -> str: """ return (self.dataset.GlueDatabaseName + '_shared_' + self.share.shareUri)[:254] - def build_share_data(self, table: models.DatasetTable) -> dict: + def build_share_data(self, table: DatasetTable) -> dict: """ Build aws dict for boto3 operations on Glue and LF from share data Parameters @@ -111,7 +112,7 @@ def build_share_data(self, table: models.DatasetTable) -> dict: return data def check_share_item_exists_on_glue_catalog( - self, share_item: models.ShareObjectItem, table: models.DatasetTable + self, share_item: models.ShareObjectItem, table: DatasetTable ) -> None: """ Checks if a table in the share request @@ -272,12 +273,12 @@ def create_resource_link(cls, **data) -> dict: ) raise e - def revoke_table_resource_link_access(self, table: models.DatasetTable, principals: [str]): + def revoke_table_resource_link_access(self, table: DatasetTable, principals: [str]): """ Revokes access to glue table resource link Parameters ---------- - table : models.DatasetTable + table : DatasetTable principals: List of strings. IAM role arn and Quicksight groups Returns @@ -333,7 +334,7 @@ def revoke_source_table_access(self, table, principals: [str]): Revokes access to the source glue table Parameters ---------- - table : models.DatasetTable + table : DatasetTable Returns ------- @@ -367,7 +368,7 @@ def revoke_source_table_access(self, table, principals: [str]): ) return True - def delete_resource_link_table(self, table: models.DatasetTable): + def delete_resource_link_table(self, table: DatasetTable): logger.info(f'Deleting shared table {table.GlueTableName}') if not Glue.table_exists( @@ -503,7 +504,7 @@ def delete_ram_resource_shares(self, resource_arn: str) -> [dict]: def handle_share_failure( self, - table: models.DatasetTable, + table: DatasetTable, share_item: models.ShareObjectItem, error: Exception, ) -> bool: @@ -533,7 +534,7 @@ def handle_share_failure( def handle_revoke_failure( self, - table: models.DatasetTable, + table: DatasetTable, share_item: models.ShareObjectItem, error: Exception, ) -> bool: diff --git a/backend/dataall/tasks/data_sharing/share_processors/lf_process_cross_account_share.py b/backend/dataall/tasks/data_sharing/share_processors/lf_process_cross_account_share.py index ffdf7d487..dfceec978 100644 --- a/backend/dataall/tasks/data_sharing/share_processors/lf_process_cross_account_share.py +++ b/backend/dataall/tasks/data_sharing/share_processors/lf_process_cross_account_share.py @@ -4,6 +4,7 @@ from ..share_managers import LFShareManager from ....aws.handlers.ram import Ram from ....db import models, api +from dataall.modules.datasets.db.models import DatasetTable log = logging.getLogger(__name__) @@ -14,8 +15,8 @@ def __init__( session, dataset: models.Dataset, share: models.ShareObject, - shared_tables: [models.DatasetTable], - revoked_tables: [models.DatasetTable], + shared_tables: [DatasetTable], + revoked_tables: [DatasetTable], source_environment: models.Environment, target_environment: models.Environment, env_group: models.EnvironmentGroup, diff --git a/backend/dataall/tasks/data_sharing/share_processors/lf_process_same_account_share.py b/backend/dataall/tasks/data_sharing/share_processors/lf_process_same_account_share.py index 4b5ad4096..3ea939b4f 100644 --- a/backend/dataall/tasks/data_sharing/share_processors/lf_process_same_account_share.py +++ b/backend/dataall/tasks/data_sharing/share_processors/lf_process_same_account_share.py @@ -1,7 +1,8 @@ import logging from ..share_managers import LFShareManager -from ....db import models, api +from dataall.db import models, api +from dataall.modules.datasets.db.models import DatasetTable log = logging.getLogger(__name__) @@ -12,8 +13,8 @@ def __init__( session, dataset: models.Dataset, share: models.ShareObject, - shared_tables: [models.DatasetTable], - revoked_tables: [models.DatasetTable], + shared_tables: [DatasetTable], + revoked_tables: [DatasetTable], source_environment: models.Environment, target_environment: models.Environment, env_group: models.EnvironmentGroup, diff --git a/backend/dataall/utils/alarm_service.py b/backend/dataall/utils/alarm_service.py index b414e1ed0..a1d0a6d5b 100644 --- a/backend/dataall/utils/alarm_service.py +++ b/backend/dataall/utils/alarm_service.py @@ -11,6 +11,7 @@ from ..aws.handlers.sts import SessionHelper from ..db import models +from dataall.modules.datasets.db.models import DatasetTable logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ def trigger_stack_deployment_failure_alarm(self, stack: models.Stack): def trigger_table_sharing_failure_alarm( self, - table: models.DatasetTable, + table: DatasetTable, share: models.ShareObject, target_environment: models.Environment, ): @@ -74,7 +75,7 @@ def trigger_table_sharing_failure_alarm( def trigger_revoke_table_sharing_failure_alarm( self, - table: models.DatasetTable, + table: DatasetTable, share: models.ShareObject, target_environment: models.Environment, ): diff --git a/backend/local_graphql_server.py b/backend/local_graphql_server.py index 44f79a087..98e99cd73 100644 --- a/backend/local_graphql_server.py +++ b/backend/local_graphql_server.py @@ -86,12 +86,11 @@ def request_context(headers, mock=False): tenant_name='dataall', ) - set_context(RequestContext(engine, username, groups, es)) + set_context(RequestContext(engine, username, groups)) # TODO: remove when the migration to a new RequestContext API is complete. Used only for backward compatibility context = Context( engine=engine, - es=es, schema=schema, username=username, groups=groups, diff --git a/backend/migrations/versions/d05f9a5b215e_backfill_dataset_table_permissions.py b/backend/migrations/versions/d05f9a5b215e_backfill_dataset_table_permissions.py index d75e7d6cc..32ca6abe0 100644 --- a/backend/migrations/versions/d05f9a5b215e_backfill_dataset_table_permissions.py +++ b/backend/migrations/versions/d05f9a5b215e_backfill_dataset_table_permissions.py @@ -6,7 +6,6 @@ """ from alembic import op -import sqlalchemy as sa from sqlalchemy import orm, Column, String, Text, DateTime, and_ from sqlalchemy.orm import query_expression from sqlalchemy.dialects import postgresql @@ -95,7 +94,7 @@ def upgrade(): resource_uri=table.tableUri, group=group, permissions=permissions.DATASET_TABLE_READ, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) print('dataset table permissions updated successfully for owners/stewards') except Exception as e: @@ -120,7 +119,7 @@ def upgrade(): group=share.principalId, permissions=permissions.DATASET_TABLE_READ, resource_uri=shared_table.itemUri, - resource_type=models.DatasetTable.__name__, + resource_type=DatasetTable.__name__, ) print('dataset table permissions updated for all shared tables') except Exception as e: diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 99ab10e89..91a94db91 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -2,7 +2,7 @@ from .client import * from dataall.db import models from dataall.api import constants -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -45,7 +45,7 @@ def patch_es(module_mocker): return_value={} ) module_mocker.patch('dataall.searchproxy.indexers.DashboardIndexer.upsert', return_value={}) - module_mocker.patch('dataall.searchproxy.indexers.delete_doc', return_value={}) + module_mocker.patch('dataall.searchproxy.base_indexer.BaseIndexer.delete_doc', return_value={}) @pytest.fixture(scope='module', autouse=True) @@ -511,7 +511,7 @@ def factory( def share_item(db): def factory( share: models.ShareObject, - table: models.DatasetTable, + table: DatasetTable, status: str ) -> models.ShareObjectItem: with db.scoped_session() as session: @@ -559,12 +559,12 @@ def factory(dataset: models.Dataset, name, username) -> DatasetStorageLocation: def table(db): cache = {} - def factory(dataset: models.Dataset, name, username) -> models.DatasetTable: + def factory(dataset: models.Dataset, name, username) -> DatasetTable: key = f'{dataset.datasetUri}-{name}' if cache.get(key): return cache.get(key) with db.scoped_session() as session: - table = models.DatasetTable( + table = DatasetTable( name=name, label=name, owner=username, diff --git a/tests/api/test_dataset.py b/tests/api/test_dataset.py index 0efe04e0a..ea6580d80 100644 --- a/tests/api/test_dataset.py +++ b/tests/api/test_dataset.py @@ -3,7 +3,7 @@ import pytest import dataall -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -223,7 +223,7 @@ def test_add_tables(table, dataset1, db): table(dataset=dataset1, name=f'table{i+1}', username=dataset1.owner) with db.scoped_session() as session: - nb = session.query(dataall.db.models.DatasetTable).count() + nb = session.query(DatasetTable).count() assert nb == 10 diff --git a/tests/api/test_dataset_profiling.py b/tests/api/test_dataset_profiling.py index cd325f26d..d50267085 100644 --- a/tests/api/test_dataset_profiling.py +++ b/tests/api/test_dataset_profiling.py @@ -2,7 +2,7 @@ import pytest import dataall -from dataall.modules.datasets.db.models import DatasetProfilingRun +from dataall.modules.datasets.db.models import DatasetProfilingRun, DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -29,7 +29,7 @@ def test_add_tables(table, dataset1, db): table(dataset=dataset1, name=f'table{i+1}', username=dataset1.owner) with db.scoped_session() as session: - nb = session.query(dataall.db.models.DatasetTable).count() + nb = session.query(DatasetTable).count() assert nb == 10 @@ -137,8 +137,8 @@ def test_get_table_profiling_run( table = table(dataset=dataset1, name='table1', username=dataset1.owner) with db.scoped_session() as session: table = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.GlueTableName == 'table1') + session.query(DatasetTable) + .filter(DatasetTable.GlueTableName == 'table1') .first() ) response = client.query( @@ -174,8 +174,8 @@ def test_list_table_profiling_runs( table1000 = table(dataset=dataset1, name='table1000', username=dataset1.owner) with db.scoped_session() as session: table = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.GlueTableName == 'table1') + session.query(DatasetTable) + .filter(DatasetTable.GlueTableName == 'table1') .first() ) module_mocker.patch( diff --git a/tests/api/test_dataset_table.py b/tests/api/test_dataset_table.py index 13447fa7f..71e739633 100644 --- a/tests/api/test_dataset_table.py +++ b/tests/api/test_dataset_table.py @@ -4,7 +4,7 @@ import dataall from dataall.modules.datasets.services.dataset_table import DatasetTableService -from dataall.modules.datasets.db.models import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -72,7 +72,7 @@ def test_add_tables(table, dataset1, db): table(dataset=dataset1, name=f'table{i+1}', username=dataset1.owner) with db.scoped_session() as session: - nb = session.query(dataall.db.models.DatasetTable).count() + nb = session.query(DatasetTable).count() assert nb == 10 @@ -105,8 +105,8 @@ def test_update_table(client, env1, table, dataset1, db, user, group): def test_add_columns(table, dataset1, db): with db.scoped_session() as session: table = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'table1') + session.query(DatasetTable) + .filter(DatasetTable.name == 'table1') .first() ) table_col = DatasetTableColumn( @@ -178,8 +178,8 @@ def test_list_dataset_tables(client, dataset1): def test_update_dataset_table_column(client, table, dataset1, db): with db.scoped_session() as session: table = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'table1') + session.query(DatasetTable) + .filter(DatasetTable.name == 'table1') .first() ) column = ( @@ -227,8 +227,8 @@ def test_update_dataset_table_column(client, table, dataset1, db): def test_sync_tables_and_columns(client, table, dataset1, db): with db.scoped_session() as session: table = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'table1') + session.query(DatasetTable) + .filter(DatasetTable.name == 'table1') .first() ) column = ( @@ -288,9 +288,9 @@ def test_sync_tables_and_columns(client, table, dataset1, db): ] assert DatasetTableService.sync_existing_tables(session, dataset1.datasetUri, glue_tables) - new_table: dataall.db.models.DatasetTable = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'new_table') + new_table: DatasetTable = ( + session.query(DatasetTable) + .filter(DatasetTable.name == 'new_table') .first() ) assert new_table @@ -305,9 +305,9 @@ def test_sync_tables_and_columns(client, table, dataset1, db): assert columns[0].columnType == 'column' assert columns[1].columnType == 'partition_0' - existing_table: dataall.db.models.DatasetTable = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'table1') + existing_table: DatasetTable = ( + session.query(DatasetTable) + .filter(DatasetTable.name == 'table1') .first() ) assert existing_table @@ -322,9 +322,9 @@ def test_sync_tables_and_columns(client, table, dataset1, db): assert columns[0].columnType == 'column' assert columns[1].columnType == 'partition_0' - deleted_table: dataall.db.models.DatasetTable = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.name == 'table2') + deleted_table: DatasetTable = ( + session.query(DatasetTable) + .filter(DatasetTable.name == 'table2') .first() ) assert deleted_table.LastGlueTableStatus == 'Deleted' diff --git a/tests/api/test_glossary.py b/tests/api/test_glossary.py index e12eb9e71..50ecf0ec9 100644 --- a/tests/api/test_glossary.py +++ b/tests/api/test_glossary.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import List from dataall.db import models -from dataall.modules.datasets.db.models import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn, DatasetTable import pytest @@ -28,9 +28,9 @@ def _dataset(db, _env, _org, group, user, dataset) -> models.Dataset: @pytest.fixture(scope='module', autouse=True) -def _table(db, _dataset) -> models.DatasetTable: +def _table(db, _dataset) -> DatasetTable: with db.scoped_session() as session: - t = models.DatasetTable( + t = DatasetTable( datasetUri=_dataset.datasetUri, label='table', AWSAccountId=_dataset.AwsAccountId, diff --git a/tests/api/test_share.py b/tests/api/test_share.py index 58309aa01..d951a15f8 100644 --- a/tests/api/test_share.py +++ b/tests/api/test_share.py @@ -3,6 +3,7 @@ import pytest import dataall +from dataall.modules.datasets.db.models import DatasetTable def random_table_name(): @@ -64,7 +65,7 @@ def tables1(table: typing.Callable, dataset1: dataall.db.models.Dataset): @pytest.fixture(scope="module", autouse=True) def table1(table: typing.Callable, dataset1: dataall.db.models.Dataset, - user: dataall.db.models.User) -> dataall.db.models.DatasetTable: + user: dataall.db.models.User) -> DatasetTable: yield table( dataset=dataset1, name="table1", @@ -112,7 +113,7 @@ def tables2(table, dataset2): @pytest.fixture(scope="module", autouse=True) def table2(table: typing.Callable, dataset2: dataall.db.models.Dataset, - user2: dataall.db.models.User) -> dataall.db.models.DatasetTable: + user2: dataall.db.models.User) -> DatasetTable: yield table( dataset=dataset2, name="table2", @@ -195,7 +196,7 @@ def share1_draft( def share1_item_pa( share_item: typing.Callable, share1_draft: dataall.db.models.ShareObject, - table1: dataall.db.models.DatasetTable + table1: DatasetTable ) -> dataall.db.models.ShareObjectItem: # Cleaned up with share1_draft yield share_item( @@ -270,7 +271,7 @@ def share2_submitted( def share2_item_pa( share_item: typing.Callable, share2_submitted: dataall.db.models.ShareObject, - table1: dataall.db.models.DatasetTable + table1: DatasetTable ) -> dataall.db.models.ShareObjectItem: # Cleaned up with share2 yield share_item( @@ -345,7 +346,7 @@ def share3_processed( def share3_item_shared( share_item: typing.Callable, share3_processed: dataall.db.models.ShareObject, - table1: dataall.db.models.DatasetTable + table1:DatasetTable ) -> dataall.db.models.ShareObjectItem: # Cleaned up with share3 yield share_item( diff --git a/tests/cdkproxy/conftest.py b/tests/cdkproxy/conftest.py index c83d0028b..c223f4a37 100644 --- a/tests/cdkproxy/conftest.py +++ b/tests/cdkproxy/conftest.py @@ -1,6 +1,7 @@ import pytest from dataall.db import models, api +from dataall.modules.datasets.db.models import DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -121,9 +122,9 @@ def dataset(db, env: models.Environment) -> models.Dataset: @pytest.fixture(scope='module', autouse=True) -def table(db, dataset: models.Dataset) -> models.DatasetTable: +def table(db, dataset: models.Dataset) -> DatasetTable: with db.scoped_session() as session: - table = models.DatasetTable( + table = DatasetTable( label='thistable', owner='me', datasetUri=dataset.datasetUri, diff --git a/tests/searchproxy/test_indexers.py b/tests/searchproxy/test_indexers.py index fd31506f1..dfdf4a8f0 100644 --- a/tests/searchproxy/test_indexers.py +++ b/tests/searchproxy/test_indexers.py @@ -5,8 +5,7 @@ import dataall from dataall.modules.datasets.indexers.location_indexer import DatasetLocationIndexer from dataall.modules.datasets.indexers.table_indexer import DatasetTableIndexer -from dataall.searchproxy import indexers -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable from dataall.modules.datasets.indexers.dataset_indexer import DatasetIndexer @@ -74,7 +73,7 @@ def dataset(org, env, db): @pytest.fixture(scope='module', autouse=True) def table(org, env, db, dataset): with db.scoped_session() as session: - table = dataall.db.models.DatasetTable( + table = DatasetTable( datasetUri=dataset.datasetUri, AWSAccountId='12345678901', S3Prefix='S3prefix', diff --git a/tests/tasks/conftest.py b/tests/tasks/conftest.py index 7e6f0d71a..267d3ef73 100644 --- a/tests/tasks/conftest.py +++ b/tests/tasks/conftest.py @@ -2,7 +2,7 @@ from dataall.db import models from dataall.api import constants -from dataall.modules.datasets.db.models import DatasetStorageLocation +from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable @pytest.fixture(scope="module") @@ -148,10 +148,10 @@ def factory(dataset: models.Dataset, label: str) -> DatasetStorageLocation: @pytest.fixture(scope='module') def table(db): - def factory(dataset: models.Dataset, label: str) -> models.DatasetTable: + def factory(dataset: models.Dataset, label: str) -> DatasetTable: with db.scoped_session() as session: - table = models.DatasetTable( + table = DatasetTable( name=label, label=label, owner=dataset.owner, @@ -218,7 +218,7 @@ def factory( def share_item_table(db): def factory( share: models.ShareObject, - table: models.DatasetTable, + table: DatasetTable, status: str, ) -> models.ShareObjectItem: with db.scoped_session() as session: diff --git a/tests/tasks/test_catalog_indexer.py b/tests/tasks/test_catalog_indexer.py index 31b0f14d4..8da53e3d2 100644 --- a/tests/tasks/test_catalog_indexer.py +++ b/tests/tasks/test_catalog_indexer.py @@ -1,5 +1,6 @@ import pytest import dataall +from dataall.modules.datasets.db.models import DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -65,7 +66,7 @@ def sync_dataset(org, env, db): @pytest.fixture(scope='module', autouse=True) def table(org, env, db, sync_dataset): with db.scoped_session() as session: - table = dataall.db.models.DatasetTable( + table = DatasetTable( datasetUri=sync_dataset.datasetUri, AWSAccountId='12345678901', S3Prefix='S3prefix', diff --git a/tests/tasks/test_lf_share_manager.py b/tests/tasks/test_lf_share_manager.py index bee190258..1ff99ba43 100644 --- a/tests/tasks/test_lf_share_manager.py +++ b/tests/tasks/test_lf_share_manager.py @@ -10,6 +10,7 @@ from dataall.db import models from dataall.api import constants +from dataall.modules.datasets.db.models import DatasetTable from dataall.tasks.data_sharing.share_processors.lf_process_cross_account_share import ProcessLFCrossAccountShare from dataall.tasks.data_sharing.share_processors.lf_process_same_account_share import ProcessLFSameAccountShare @@ -94,7 +95,7 @@ def dataset1(dataset: Callable, org1: models.Organization, source_environment: m @pytest.fixture(scope="module") -def table1(table: Callable, dataset1: models.Dataset) -> models.DatasetTable: +def table1(table: Callable, dataset1: models.Dataset) -> DatasetTable: yield table( dataset=dataset1, label="table1" @@ -102,7 +103,7 @@ def table1(table: Callable, dataset1: models.Dataset) -> models.DatasetTable: @pytest.fixture(scope="module") -def table2(table: Callable, dataset1: models.Dataset) -> models.DatasetTable: +def table2(table: Callable, dataset1: models.Dataset) -> DatasetTable: yield table( dataset=dataset1, label="table2" @@ -133,7 +134,7 @@ def share_cross_account( @pytest.fixture(scope="module") def share_item_same_account(share_item_table: Callable, share_same_account: models.ShareObject, - table1: models.DatasetTable) -> models.ShareObjectItem: + table1: DatasetTable) -> models.ShareObjectItem: yield share_item_table( share=share_same_account, table=table1, @@ -142,7 +143,7 @@ def share_item_same_account(share_item_table: Callable, share_same_account: mode @pytest.fixture(scope="module") def revoke_item_same_account(share_item_table: Callable, share_same_account: models.ShareObject, - table2: models.DatasetTable) -> models.ShareObjectItem: + table2: DatasetTable) -> models.ShareObjectItem: yield share_item_table( share=share_same_account, table=table2, @@ -151,7 +152,7 @@ def revoke_item_same_account(share_item_table: Callable, share_same_account: mod @pytest.fixture(scope="module") def share_item_cross_account(share_item_table: Callable, share_cross_account: models.ShareObject, - table1: models.DatasetTable) -> models.ShareObjectItem: + table1: DatasetTable) -> models.ShareObjectItem: yield share_item_table( share=share_cross_account, table=table1, @@ -160,7 +161,7 @@ def share_item_cross_account(share_item_table: Callable, share_cross_account: mo @pytest.fixture(scope="module") def revoke_item_cross_account(share_item_table: Callable, share_cross_account: models.ShareObject, - table2: models.DatasetTable) -> models.ShareObjectItem: + table2: DatasetTable) -> models.ShareObjectItem: yield share_item_table( share=share_cross_account, table=table2, @@ -294,7 +295,7 @@ def test_check_share_item_exists_on_glue_catalog( db, processor_same_account: ProcessLFSameAccountShare, processor_cross_account: ProcessLFCrossAccountShare, - table1: models.DatasetTable, + table1: DatasetTable, share_item_same_account: models.ShareObjectItem, share_item_cross_account: models.ShareObjectItem, mocker, @@ -332,7 +333,7 @@ def test_build_share_data( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table1: models.DatasetTable, + table1: DatasetTable, ): data_same_account = { 'source': { @@ -380,7 +381,7 @@ def test_create_resource_link( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table1: models.DatasetTable, + table1: DatasetTable, mocker, ): sts_mock = mocker.patch( @@ -463,7 +464,7 @@ def test_revoke_table_resource_link_access( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table2: models.DatasetTable, + table2: DatasetTable, mocker, ): glue_mock = mocker.patch( @@ -511,7 +512,7 @@ def test_revoke_source_table_access( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table2: models.DatasetTable, + table2: DatasetTable, mocker, ): glue_mock = mocker.patch( @@ -554,7 +555,7 @@ def test_delete_resource_link_table( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table2: models.DatasetTable, + table2: DatasetTable, mocker, ): glue_mock = mocker.patch( @@ -596,7 +597,7 @@ def test_delete_shared_database( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table1: models.DatasetTable, + table1: DatasetTable, mocker, ): glue_mock = mocker.patch( @@ -625,8 +626,8 @@ def test_revoke_external_account_access_on_source_account( source_environment: models.Environment, target_environment: models.Environment, dataset1: models.Dataset, - table1: models.DatasetTable, - table2: models.DatasetTable, + table1: DatasetTable, + table2: DatasetTable, mocker, ): lf_mock = mocker.patch( @@ -649,7 +650,7 @@ def test_handle_share_failure( processor_cross_account: ProcessLFCrossAccountShare, share_item_same_account: models.ShareObjectItem, share_item_cross_account: models.ShareObjectItem, - table1: models.DatasetTable, + table1: DatasetTable, mocker, ): @@ -678,7 +679,7 @@ def test_handle_revoke_failure( processor_cross_account: ProcessLFCrossAccountShare, revoke_item_same_account: models.ShareObjectItem, revoke_item_cross_account: models.ShareObjectItem, - table1: models.DatasetTable, + table1: DatasetTable, mocker, ): # Given diff --git a/tests/tasks/test_policies.py b/tests/tasks/test_policies.py index d51cc2ac7..ca8c259c6 100644 --- a/tests/tasks/test_policies.py +++ b/tests/tasks/test_policies.py @@ -1,4 +1,5 @@ from dataall.api.constants import OrganisationUserRole +from dataall.modules.datasets.db.models import DatasetTable from dataall.tasks.bucket_policy_updater import BucketPoliciesUpdater import pytest import dataall @@ -68,7 +69,7 @@ def sync_dataset(org, env, db): @pytest.fixture(scope='module', autouse=True) def table(org, env, db, sync_dataset): with db.scoped_session() as session: - table = dataall.db.models.DatasetTable( + table = DatasetTable( datasetUri=sync_dataset.datasetUri, AWSAccountId='12345678901', S3Prefix='S3prefix', diff --git a/tests/tasks/test_subscriptions.py b/tests/tasks/test_subscriptions.py index 874b8ccab..61c70d174 100644 --- a/tests/tasks/test_subscriptions.py +++ b/tests/tasks/test_subscriptions.py @@ -2,6 +2,7 @@ import dataall from dataall.api.constants import OrganisationUserRole +from dataall.modules.datasets.db.models import DatasetTable @pytest.fixture(scope='module') @@ -93,7 +94,7 @@ def share( ): with db.scoped_session() as session: - table = dataall.db.models.DatasetTable( + table = DatasetTable( label='foo', name='foo', owner='alice', diff --git a/tests/tasks/test_tables_sync.py b/tests/tasks/test_tables_sync.py index 9d8282e65..ff6f8271e 100644 --- a/tests/tasks/test_tables_sync.py +++ b/tests/tasks/test_tables_sync.py @@ -1,6 +1,7 @@ import pytest import dataall from dataall.api.constants import OrganisationUserRole +from dataall.modules.datasets.db.models import DatasetTable @pytest.fixture(scope='module', autouse=True) @@ -76,7 +77,7 @@ def sync_dataset(org, env, db): @pytest.fixture(scope='module', autouse=True) def table(org, env, db, sync_dataset): with db.scoped_session() as session: - table = dataall.db.models.DatasetTable( + table = DatasetTable( datasetUri=sync_dataset.datasetUri, AWSAccountId='12345678901', S3Prefix='S3prefix', @@ -163,9 +164,9 @@ def test_tables_sync(db, org, env, sync_dataset, table, mocker): processed_tables = dataall.modules.datasets.tasks.tables_syncer.sync_tables(engine=db) assert len(processed_tables) == 2 with db.scoped_session() as session: - saved_table: dataall.db.models.DatasetTable = ( - session.query(dataall.db.models.DatasetTable) - .filter(dataall.db.models.DatasetTable.GlueTableName == 'table1') + saved_table: DatasetTable = ( + session.query(DatasetTable) + .filter(DatasetTable.GlueTableName == 'table1') .first() ) assert saved_table