Skip to content

Commit

Permalink
Datasets modularization pt.4 (data-dot-all#441)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Refactoring

### Detail
Refactoring of DatasetTable:
Get rid of ElasticSearch connection for every request. Created a lazy
way to establish connection.

### Relates
data-dot-all#412 and data-dot-all#295

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: dbalintx <132444646+dbalintx@users.noreply.github.com>
  • Loading branch information
2 people authored and dlpzx committed May 25, 2023
1 parent 433b5af commit ecaaf6e
Show file tree
Hide file tree
Showing 59 changed files with 350 additions and 359 deletions.
6 changes: 1 addition & 5 deletions backend/api_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from dataall.core.context import set_context, dispose_context, RequestContext
from dataall.db import init_permissions, get_engine, api, permissions
from dataall.modules.loader import load_modules, ImportMode
from dataall.searchproxy import connect

logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
Expand All @@ -30,7 +29,6 @@
TYPE_DEFS = gql(SCHEMA.gql(with_directives=False))
ENVNAME = os.getenv('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
ES = connect(envname=ENVNAME)
Worker.queue = SqsQueue.send

init_permissions(ENGINE)
Expand Down Expand Up @@ -99,7 +97,6 @@ def handler(event, context):

log.info('Lambda Event %s', event)
log.debug('Env name %s', ENVNAME)
log.debug('ElasticSearch %s', ES)
log.debug('Engine %s', ENGINE.engine.url)

if event['httpMethod'] == 'OPTIONS':
Expand Down Expand Up @@ -137,11 +134,10 @@ def handler(event, context):
print(f'Error managing groups due to: {e}')
groups = []

set_context(RequestContext(ENGINE, username, groups, ES))
set_context(RequestContext(ENGINE, username, groups))

app_context = {
'engine': ENGINE,
'es': ES,
'username': username,
'groups': groups,
'schema': SCHEMA,
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/Dashboard/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def delete_dashboard(context: Context, source, dashboardUri: str = None):
data=None,
check_perm=True,
)
indexers.delete_doc(es=context.es, doc_id=dashboardUri)
DashboardIndexer.delete_doc(doc_id=dashboardUri)
return True


Expand Down
10 changes: 4 additions & 6 deletions backend/dataall/api/Objects/Dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ def sync_tables(context: Context, source, datasetUri: str = None):
indexers.upsert_dataset_tables(
session=session, es=context.es, datasetUri=dataset.datasetUri
)
indexers.remove_deleted_tables(
session=session, es=context.es, datasetUri=dataset.datasetUri
)
DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri)
return Dataset.paginated_dataset_tables(
session=session,
username=context.username,
Expand Down Expand Up @@ -574,13 +572,13 @@ def delete_dataset(

tables = [t.tableUri for t in Dataset.get_dataset_tables(session, datasetUri)]
for uri in tables:
indexers.delete_doc(es=context.es, doc_id=uri)
DatasetIndexer.delete_doc(doc_id=uri)

folders = [f.locationUri for f in Dataset.get_dataset_folders(session, datasetUri)]
for uri in folders:
indexers.delete_doc(es=context.es, doc_id=uri)
DatasetIndexer.delete_doc(doc_id=uri)

indexers.delete_doc(es=context.es, doc_id=datasetUri)
DatasetIndexer.delete_doc(doc_id=datasetUri)

Dataset.delete_dataset(
session=session,
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/Feed/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ def types(cls):

FeedRegistry.register(FeedDefinition("Worksheet", models.Worksheet))
FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline))
FeedRegistry.register(FeedDefinition("DatasetTable", models.DatasetTable))
FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard))
8 changes: 3 additions & 5 deletions backend/dataall/api/Objects/Glossary/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass, field
from typing import Type, Dict, Optional, Protocol, Union, Callable, Any

from opensearchpy import OpenSearch
from dataclasses import dataclass
from typing import Type, Dict, Optional, Protocol, Union

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
Expand Down Expand Up @@ -56,7 +54,7 @@ def types(cls):
return [gql.Ref(definition.object_type) for definition in cls._DEFINITIONS.values()]

@classmethod
def reindex(cls, session, es: OpenSearch, target_type: str, target_uri: str):
def reindex(cls, session, target_type: str, target_uri: str):
definition = cls._DEFINITIONS[target_type]
if definition.reindexer:
definition.reindexer.upsert(session, target_uri)
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/Glossary/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def reindex(context, linkUri):
if not link:
return

GlossaryRegistry.reindex(session, context.es, link.targetType, link.targetUri)
GlossaryRegistry.reindex(session, link.targetType, link.targetUri)


def _target_model(target_type: str):
Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/api/Objects/ShareObject/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ....api.context import Context
from ....aws.handlers.service_handlers import Worker
from ....db import models
from dataall.modules.datasets.db.models import DatasetStorageLocation
from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,7 +265,7 @@ def resolve_dataset(context: Context, source: models.ShareObject, **kwargs):


def union_resolver(object, *_):
if isinstance(object, models.DatasetTable):
if isinstance(object, DatasetTable):
return 'DatasetTable'
elif isinstance(object, DatasetStorageLocation):
return 'DatasetStorageLocation'
Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/api/Objects/Vote/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def upvote(context: Context, source, input=None):
data=input,
check_perm=True,
)
reindex(session, context.es, vote)
reindex(session, vote)
return vote


def reindex(session, es, vote):
def reindex(session, vote):
if vote.targetType == 'dataset':
DatasetIndexer.upsert(session=session, dataset_uri=vote.targetUri)
elif vote.targetType == 'dashboard':
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
DataPipeline,
Environment,
Activity,
DatasetTable,
Dataset,
Group,
Principal,
Expand Down
2 changes: 0 additions & 2 deletions backend/dataall/api/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ class Context:
def __init__(
self,
engine=None,
es=None,
username=None,
groups=None,
):
self.engine = engine
self.es = es
self.username = username
self.groups = groups
3 changes: 2 additions & 1 deletion backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .sts import SessionHelper
from ... import db
from ...db import models
from dataall.modules.datasets.db.models import DatasetTable

log = logging.getLogger('aws:glue')

Expand Down Expand Up @@ -523,7 +524,7 @@ def get_job_runs(engine, task: models.Task):

@staticmethod
def grant_principals_all_table_permissions(
table: models.DatasetTable, principals: [str], client=None
table: DatasetTable, principals: [str], client=None
):
"""
Update the table permissions on Lake Formation
Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/aws/handlers/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ...db import models
# TODO should be migrated in the redshift module
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.db.models import DatasetTable

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -448,7 +449,7 @@ def copy_data(engine, task: models.Task):
session, task.payload['datasetUri']
)

table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
table: DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, task.payload['tableUri']
)

Expand Down
2 changes: 0 additions & 2 deletions backend/dataall/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from dataall.db.connection import Engine
from threading import local
import opensearchpy


_request_storage = local()
Expand All @@ -24,7 +23,6 @@ class RequestContext:
db_engine: Engine
username: str
groups: List[str]
es_engine: opensearchpy.OpenSearch


def get_context() -> RequestContext:
Expand Down
33 changes: 17 additions & 16 deletions backend/dataall/db/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from . import Organization
from .. import models, api, exceptions, permissions, paginate
from ..models.Enums import Language, ConfidentialityClassification
from ...modules.datasets.db.dataset_repository import DatasetRepository
from ...modules.datasets.services.dataset_location import DatasetLocationService
from ...utils.naming_convention import (
from dataall.modules.datasets.db.dataset_repository import DatasetRepository
from dataall.modules.datasets.db.models import DatasetTable
from dataall.modules.datasets.services.dataset_location import DatasetLocationService
from dataall.utils.naming_convention import (
NamingConventionService,
NamingConventionPattern,
)
Expand Down Expand Up @@ -266,21 +267,21 @@ def paginated_dataset_tables(
session, username, groups, uri, data=None, check_perm=None
) -> dict:
query = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.filter(
and_(
models.DatasetTable.datasetUri == uri,
models.DatasetTable.LastGlueTableStatus != 'Deleted',
DatasetTable.datasetUri == uri,
DatasetTable.LastGlueTableStatus != 'Deleted',
)
)
.order_by(models.DatasetTable.created.desc())
.order_by(DatasetTable.created.desc())
)
if data and data.get('term'):
query = query.filter(
or_(
*[
models.DatasetTable.name.ilike('%' + data.get('term') + '%'),
models.DatasetTable.GlueTableName.ilike(
DatasetTable.name.ilike('%' + data.get('term') + '%'),
DatasetTable.GlueTableName.ilike(
'%' + data.get('term') + '%'
),
]
Expand Down Expand Up @@ -379,7 +380,7 @@ def transfer_stewardship_to_new_stewards(session, dataset, new_stewards):
group=new_stewards,
permissions=permissions.DATASET_TABLE_READ,
resource_uri=tableUri,
resource_type=models.DatasetTable.__name__,
resource_type=DatasetTable.__name__,
)

dataset_shares = (
Expand Down Expand Up @@ -455,8 +456,8 @@ def update_glue_database_status(session, dataset_uri):
def get_dataset_tables(session, dataset_uri):
"""return the dataset tables"""
return (
session.query(models.DatasetTable)
.filter(models.DatasetTable.datasetUri == dataset_uri)
session.query(DatasetTable)
.filter(DatasetTable.datasetUri == dataset_uri)
.all()
)

Expand Down Expand Up @@ -585,10 +586,10 @@ def _delete_dataset_term_links(session, uri):
@staticmethod
def _delete_dataset_tables(session, dataset_uri) -> bool:
tables = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.filter(
and_(
models.DatasetTable.datasetUri == dataset_uri,
DatasetTable.datasetUri == dataset_uri,
)
)
.all()
Expand Down Expand Up @@ -618,7 +619,7 @@ def get_dataset_by_bucket_name(session, bucket) -> [models.Dataset]:
@staticmethod
def count_dataset_tables(session, dataset_uri):
return (
session.query(models.DatasetTable)
.filter(models.DatasetTable.datasetUri == dataset_uri)
session.query(DatasetTable)
.filter(DatasetTable.datasetUri == dataset_uri)
.count()
)
26 changes: 13 additions & 13 deletions backend/dataall/db/api/share_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .. import api, utils
from .. import models, exceptions, permissions, paginate
from ..models.Enums import ShareObjectStatus, ShareItemStatus, ShareObjectActions, ShareItemActions, ShareableType, PrincipalType
from dataall.modules.datasets.db.models import DatasetStorageLocation
from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -422,7 +422,7 @@ def create_share_object(
if itemType == ShareableType.StorageLocation.value:
item = session.query(DatasetStorageLocation).get(itemUri)
if itemType == ShareableType.Table.value:
item = session.query(models.DatasetTable).get(itemUri)
item = session.query(DatasetTable).get(itemUri)

share_item = (
session.query(models.ShareObjectItem)
Expand Down Expand Up @@ -605,7 +605,7 @@ def approve_share_object(
group=share.principalId,
permissions=permissions.DATASET_TABLE_READ,
resource_uri=table.itemUri,
resource_type=models.DatasetTable.__name__,
resource_type=DatasetTable.__name__,
)

api.Notification.notify_share_object_approval(session, username, dataset, share)
Expand Down Expand Up @@ -717,7 +717,7 @@ def get_share_item(
ShareObject.get_share_item_by_uri(session, data['shareItemUri']),
)
if share_item.itemType == ShareableType.Table.value:
return session.query(models.DatasetTable).get(share_item.itemUri)
return session.query(DatasetTable).get(share_item.itemUri)
if share_item.itemType == ShareableType.StorageLocation:
return session.Query(DatasetStorageLocation).get(share_item.itemUri)

Expand Down Expand Up @@ -762,7 +762,7 @@ def add_share_object_item(
Share_SM.update_state(session, share, new_share_state)

if itemType == ShareableType.Table.value:
item: models.DatasetTable = session.query(models.DatasetTable).get(itemUri)
item: DatasetTable = session.query(DatasetTable).get(itemUri)
if item and item.region != target_environment.region:
raise exceptions.UnauthorizedOperation(
action=permissions.ADD_ITEM,
Expand Down Expand Up @@ -944,10 +944,10 @@ def list_shareable_items(
# marking the table as part of the shareObject
tables = (
session.query(
models.DatasetTable.tableUri.label('itemUri'),
DatasetTable.tableUri.label('itemUri'),
func.coalesce('DatasetTable').label('itemType'),
models.DatasetTable.GlueTableName.label('itemName'),
models.DatasetTable.description.label('description'),
DatasetTable.GlueTableName.label('itemName'),
DatasetTable.description.label('description'),
models.ShareObjectItem.shareItemUri.label('shareItemUri'),
models.ShareObjectItem.status.label('status'),
case(
Expand All @@ -959,10 +959,10 @@ def list_shareable_items(
models.ShareObjectItem,
and_(
models.ShareObjectItem.shareUri == share.shareUri,
models.DatasetTable.tableUri == models.ShareObjectItem.itemUri,
DatasetTable.tableUri == models.ShareObjectItem.itemUri,
),
)
.filter(models.DatasetTable.datasetUri == datasetUri)
.filter(DatasetTable.datasetUri == datasetUri)
)
if data:
if data.get("isRevokable"):
Expand Down Expand Up @@ -1145,7 +1145,7 @@ def update_share_item_status_batch(
def find_share_item_by_table(
session,
share: models.ShareObject,
table: models.DatasetTable,
table: DatasetTable,
) -> models.ShareObjectItem:
share_item: models.ShareObjectItem = (
session.query(models.ShareObjectItem)
Expand Down Expand Up @@ -1247,10 +1247,10 @@ def get_share_data_items(session, share_uri, status):
raise exceptions.ObjectNotFound('Share', share_uri)

tables = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.join(
models.ShareObjectItem,
models.ShareObjectItem.itemUri == models.DatasetTable.tableUri,
models.ShareObjectItem.itemUri == DatasetTable.tableUri,
)
.join(
models.ShareObject,
Expand Down
Loading

0 comments on commit ecaaf6e

Please sign in to comment.