From 74a249aa03608c18ead0b0dd730a932a531f1d29 Mon Sep 17 00:00:00 2001 From: nikpodsh <124577300+nikpodsh@users.noreply.github.com> Date: Tue, 2 May 2023 16:13:45 +0200 Subject: [PATCH] Datasets modularization pt.2 (#432) ### Feature or Bugfix - Refactoring ### Detail Refactoring of DatasetProfilingRun ### Relates - #295 and #412 By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- backend/dataall/api/Objects/__init__.py | 1 - backend/dataall/aws/handlers/glue.py | 84 ---------------- backend/dataall/db/api/__init__.py | 1 - .../dataall/db/models/DatasetProfilingRun.py | 20 ---- .../db/models/DatasetTableProfilingJob.py | 18 ---- backend/dataall/db/models/__init__.py | 2 - backend/dataall/modules/datasets/__init__.py | 2 +- .../dataall/modules/datasets/api/__init__.py | 5 +- .../datasets/api/profiling}/__init__.py | 2 +- .../datasets/api/profiling}/input_types.py | 2 +- .../datasets/api/profiling}/mutations.py | 7 +- .../datasets/api/profiling}/queries.py | 9 +- .../datasets/api/profiling}/resolvers.py | 34 ++++--- .../datasets/api/profiling}/schema.py | 4 +- .../datasets/api/table_column/resolvers.py | 2 +- .../db/{table_column_model.py => models.py} | 21 +++- .../datasets/handlers/glue_column_handler.py | 2 +- .../handlers/glue_profiling_handler.py | 98 +++++++++++++++++++ .../services/dataset_profiling_service.py} | 55 ++++++----- .../datasets/services/dataset_table.py | 2 +- .../datasets/tasks/subscription_service.py | 3 +- tests/api/test_dataset_profiling.py | 9 +- tests/api/test_dataset_table.py | 2 +- tests/api/test_glossary.py | 2 +- tests/modules/datasets/test_dataset_feed.py | 2 +- 25 files changed, 195 insertions(+), 194 deletions(-) delete mode 100644 backend/dataall/db/models/DatasetProfilingRun.py delete mode 100644 backend/dataall/db/models/DatasetTableProfilingJob.py rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/__init__.py (73%) rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/input_types.py (95%) rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/mutations.py (80%) rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/queries.py (84%) rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/resolvers.py (80%) rename backend/dataall/{api/Objects/DatasetProfiling => modules/datasets/api/profiling}/schema.py (94%) rename backend/dataall/modules/datasets/db/{table_column_model.py => models.py} (54%) create mode 100644 backend/dataall/modules/datasets/handlers/glue_profiling_handler.py rename backend/dataall/{db/api/dataset_profiling_run.py => modules/datasets/services/dataset_profiling_service.py} (69%) diff --git a/backend/dataall/api/Objects/__init__.py b/backend/dataall/api/Objects/__init__.py index 43d5e0833..80b91358a 100644 --- a/backend/dataall/api/Objects/__init__.py +++ b/backend/dataall/api/Objects/__init__.py @@ -29,7 +29,6 @@ Test, SagemakerStudio, RedshiftCluster, - DatasetProfiling, Glossary, AthenaQueryResult, Worksheet, diff --git a/backend/dataall/aws/handlers/glue.py b/backend/dataall/aws/handlers/glue.py index e05ce4c54..e76fd4e63 100644 --- a/backend/dataall/aws/handlers/glue.py +++ b/backend/dataall/aws/handlers/glue.py @@ -522,90 +522,6 @@ def get_job_runs(engine, task: models.Task): return [] return response['JobRuns'] - @staticmethod - @Worker.handler('glue.job.start_profiling_run') - def start_profiling_run(engine, task: models.Task): - with engine.scoped_session() as session: - profiling: models.DatasetProfilingRun = ( - db.api.DatasetProfilingRun.get_profiling_run( - session, profilingRunUri=task.targetUri - ) - ) - dataset: models.Dataset = session.query(models.Dataset).get( - profiling.datasetUri - ) - run = Glue.run_job( - **{ - 'accountid': dataset.AwsAccountId, - 'name': dataset.GlueProfilingJobName, - 'region': dataset.region, - 'arguments': ( - {'--table': profiling.GlueTableName} - if profiling.GlueTableName - else {} - ), - } - ) - db.api.DatasetProfilingRun.update_run( - session, - profilingRunUri=profiling.profilingRunUri, - GlueJobRunId=run['JobRunId'], - ) - return run - - @staticmethod - def run_job(**data): - accountid = data['accountid'] - name = data['name'] - try: - session = SessionHelper.remote_session(accountid=accountid) - client = session.client('glue', region_name=data.get('region', 'eu-west-1')) - response = client.start_job_run( - JobName=name, Arguments=data.get('arguments', {}) - ) - return response - except ClientError as e: - log.error(f'Failed to start profiling job {name} due to: {e}') - raise e - - @staticmethod - @Worker.handler('glue.job.profiling_run_status') - def get_profiling_run(engine, task: models.Task): - with engine.scoped_session() as session: - profiling: models.DatasetProfilingRun = ( - db.api.DatasetProfilingRun.get_profiling_run( - session, profilingRunUri=task.targetUri - ) - ) - dataset: models.Dataset = session.query(models.Dataset).get( - profiling.datasetUri - ) - glue_run = Glue.get_job_run( - **{ - 'accountid': dataset.AwsAccountId, - 'name': dataset.GlueProfilingJobName, - 'region': dataset.region, - 'run_id': profiling.GlueJobRunId, - } - ) - profiling.status = glue_run['JobRun']['JobRunState'] - session.commit() - return profiling.status - - @staticmethod - def get_job_run(**data): - accountid = data['accountid'] - name = data['name'] - run_id = data['run_id'] - try: - session = SessionHelper.remote_session(accountid=accountid) - client = session.client('glue', region_name=data.get('region', 'eu-west-1')) - response = client.get_job_run(JobName=name, RunId=run_id) - return response - except ClientError as e: - log.error(f'Failed to get job run {run_id} due to: {e}') - raise e - @staticmethod def grant_principals_all_table_permissions( table: models.DatasetTable, principals: [str], client=None diff --git a/backend/dataall/db/api/__init__.py b/backend/dataall/db/api/__init__.py index 19138f7d7..a5f11d2c7 100644 --- a/backend/dataall/db/api/__init__.py +++ b/backend/dataall/db/api/__init__.py @@ -13,7 +13,6 @@ from .share_object import ShareObject, ShareObjectSM, ShareItemSM from .dataset import Dataset from .dataset_location import DatasetStorageLocation -from .dataset_profiling_run import DatasetProfilingRun from .notification import Notification from .redshift_cluster import RedshiftCluster from .vpc import Vpc diff --git a/backend/dataall/db/models/DatasetProfilingRun.py b/backend/dataall/db/models/DatasetProfilingRun.py deleted file mode 100644 index b4996db64..000000000 --- a/backend/dataall/db/models/DatasetProfilingRun.py +++ /dev/null @@ -1,20 +0,0 @@ -from sqlalchemy import Column, String -from sqlalchemy.dialects.postgresql import JSON - -from .. import Base, Resource, utils - - -class DatasetProfilingRun(Resource, Base): - __tablename__ = 'dataset_profiling_run' - profilingRunUri = Column( - String, primary_key=True, default=utils.uuid('profilingrun') - ) - datasetUri = Column(String, nullable=False) - GlueJobName = Column(String) - GlueJobRunId = Column(String) - GlueTriggerSchedule = Column(String) - GlueTriggerName = Column(String) - GlueTableName = Column(String) - AwsAccountId = Column(String) - results = Column(JSON, default={}) - status = Column(String, default='Created') diff --git a/backend/dataall/db/models/DatasetTableProfilingJob.py b/backend/dataall/db/models/DatasetTableProfilingJob.py deleted file mode 100644 index ea0fedbf0..000000000 --- a/backend/dataall/db/models/DatasetTableProfilingJob.py +++ /dev/null @@ -1,18 +0,0 @@ -from sqlalchemy import Column, String -from sqlalchemy.orm import query_expression - -from .. import Base -from .. import Resource, utils - - -class DatasetTableProfilingJob(Resource, Base): - __tablename__ = 'dataset_table_profiling_job' - tableUri = Column(String, nullable=False) - jobUri = Column(String, primary_key=True, default=utils.uuid('profilingjob')) - AWSAccountId = Column(String, nullable=False) - RunCommandId = Column(String, nullable=True) - GlueDatabaseName = Column(String, nullable=False) - GlueTableName = Column(String, nullable=False) - region = Column(String, default='eu-west-1') - status = Column(String, default='') - userRoleForJob = query_expression() diff --git a/backend/dataall/db/models/__init__.py b/backend/dataall/db/models/__init__.py index 1ab4134b3..0af480d79 100644 --- a/backend/dataall/db/models/__init__.py +++ b/backend/dataall/db/models/__init__.py @@ -5,11 +5,9 @@ from .DashboardShare import DashboardShare from .DashboardShare import DashboardShareStatus from .Dataset import Dataset -from .DatasetProfilingRun import DatasetProfilingRun from .DatasetQualityRule import DatasetQualityRule from .DatasetStorageLocation import DatasetStorageLocation from .DatasetTable import DatasetTable -from .DatasetTableProfilingJob import DatasetTableProfilingJob from .Environment import Environment from .EnvironmentGroup import EnvironmentGroup from .FeedMessage import FeedMessage diff --git a/backend/dataall/modules/datasets/__init__.py b/backend/dataall/modules/datasets/__init__.py index de1963bd2..4620495fe 100644 --- a/backend/dataall/modules/datasets/__init__.py +++ b/backend/dataall/modules/datasets/__init__.py @@ -4,7 +4,7 @@ from dataall.api.Objects.Feed.registry import FeedRegistry, FeedDefinition from dataall.api.Objects.Glossary.registry import GlossaryRegistry, GlossaryDefinition -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn from dataall.modules.loader import ModuleInterface, ImportMode log = logging.getLogger(__name__) diff --git a/backend/dataall/modules/datasets/api/__init__.py b/backend/dataall/modules/datasets/api/__init__.py index 538df0734..6bb6f8ab0 100644 --- a/backend/dataall/modules/datasets/api/__init__.py +++ b/backend/dataall/modules/datasets/api/__init__.py @@ -1,6 +1,7 @@ """The GraphQL schema of datasets and related functionality""" from dataall.modules.datasets.api import ( - table_column + table_column, + profiling ) -__all__ = ["table_column"] +__all__ = ["table_column", "profiling"] diff --git a/backend/dataall/api/Objects/DatasetProfiling/__init__.py b/backend/dataall/modules/datasets/api/profiling/__init__.py similarity index 73% rename from backend/dataall/api/Objects/DatasetProfiling/__init__.py rename to backend/dataall/modules/datasets/api/profiling/__init__.py index dfa46b264..4c5b6c491 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/__init__.py +++ b/backend/dataall/modules/datasets/api/profiling/__init__.py @@ -1,4 +1,4 @@ -from . import ( +from dataall.modules.datasets.api.profiling import ( input_types, mutations, queries, diff --git a/backend/dataall/api/Objects/DatasetProfiling/input_types.py b/backend/dataall/modules/datasets/api/profiling/input_types.py similarity index 95% rename from backend/dataall/api/Objects/DatasetProfiling/input_types.py rename to backend/dataall/modules/datasets/api/profiling/input_types.py index deb1739c5..e8e89fb16 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/input_types.py +++ b/backend/dataall/modules/datasets/api/profiling/input_types.py @@ -1,4 +1,4 @@ -from ... import gql +from dataall.api import gql StartDatasetProfilingRunInput = gql.InputType( name='StartDatasetProfilingRunInput', diff --git a/backend/dataall/api/Objects/DatasetProfiling/mutations.py b/backend/dataall/modules/datasets/api/profiling/mutations.py similarity index 80% rename from backend/dataall/api/Objects/DatasetProfiling/mutations.py rename to backend/dataall/modules/datasets/api/profiling/mutations.py index 5876c81a7..e4bcd62cc 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/mutations.py +++ b/backend/dataall/modules/datasets/api/profiling/mutations.py @@ -1,5 +1,8 @@ -from ... import gql -from .resolvers import * +from dataall.api import gql +from dataall.modules.datasets.api.profiling.resolvers import ( + start_profiling_run, + update_profiling_run_results +) startDatasetProfilingRun = gql.MutationField( name='startDatasetProfilingRun', diff --git a/backend/dataall/api/Objects/DatasetProfiling/queries.py b/backend/dataall/modules/datasets/api/profiling/queries.py similarity index 84% rename from backend/dataall/api/Objects/DatasetProfiling/queries.py rename to backend/dataall/modules/datasets/api/profiling/queries.py index 9ab3eb2bb..8d2fbb25c 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/queries.py +++ b/backend/dataall/modules/datasets/api/profiling/queries.py @@ -1,5 +1,10 @@ -from ... import gql -from .resolvers import * +from dataall.api import gql +from dataall.modules.datasets.api.profiling.resolvers import ( + get_profiling_run, + list_profiling_runs, + list_table_profiling_runs, + get_last_table_profiling_run +) getDatasetProfilingRun = gql.QueryField( diff --git a/backend/dataall/api/Objects/DatasetProfiling/resolvers.py b/backend/dataall/modules/datasets/api/profiling/resolvers.py similarity index 80% rename from backend/dataall/api/Objects/DatasetProfiling/resolvers.py rename to backend/dataall/modules/datasets/api/profiling/resolvers.py index 4b4684019..62ff64942 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/resolvers.py +++ b/backend/dataall/modules/datasets/api/profiling/resolvers.py @@ -1,17 +1,19 @@ import json import logging -from ....api.context import Context -from ....aws.handlers.service_handlers import Worker -from ....aws.handlers.sts import SessionHelper -from ....db import api, permissions, models -from ....db.api import ResourcePolicy +from dataall.api.context import Context +from dataall.aws.handlers.service_handlers import Worker +from dataall.aws.handlers.sts import SessionHelper +from dataall.db import api, permissions, models +from dataall.db.api import ResourcePolicy from dataall.modules.datasets.services.dataset_table import DatasetTableService +from dataall.modules.datasets.services.dataset_profiling_service import DatasetProfilingService +from dataall.modules.datasets.db.models import DatasetProfilingRun log = logging.getLogger(__name__) -def resolve_dataset(context, source: models.DatasetProfilingRun): +def resolve_dataset(context, source: DatasetProfilingRun): if not source: return None with context.engine.scoped_session() as session: @@ -32,7 +34,7 @@ def start_profiling_run(context: Context, source, input: dict = None): ) dataset = api.Dataset.get_dataset_by_uri(session, input['datasetUri']) - run = api.DatasetProfilingRun.start_profiling( + run = DatasetProfilingService.start_profiling( session=session, datasetUri=dataset.datasetUri, tableUri=input.get('tableUri'), @@ -49,7 +51,7 @@ def start_profiling_run(context: Context, source, input: dict = None): return run -def get_profiling_run_status(context: Context, source: models.DatasetProfilingRun): +def get_profiling_run_status(context: Context, source: DatasetProfilingRun): if not source: return None with context.engine.scoped_session() as session: @@ -61,7 +63,7 @@ def get_profiling_run_status(context: Context, source: models.DatasetProfilingRu return source.status -def get_profiling_results(context: Context, source: models.DatasetProfilingRun): +def get_profiling_results(context: Context, source: DatasetProfilingRun): if not source or source.results == {}: return None else: @@ -70,7 +72,7 @@ def get_profiling_results(context: Context, source: models.DatasetProfilingRun): def update_profiling_run_results(context: Context, source, profilingRunUri, results): with context.engine.scoped_session() as session: - run = api.DatasetProfilingRun.update_run( + run = DatasetProfilingService.update_run( session=session, profilingRunUri=profilingRunUri, results=results ) return run @@ -78,20 +80,20 @@ def update_profiling_run_results(context: Context, source, profilingRunUri, resu def list_profiling_runs(context: Context, source, datasetUri=None): with context.engine.scoped_session() as session: - return api.DatasetProfilingRun.list_profiling_runs(session, datasetUri) + return DatasetProfilingService.list_profiling_runs(session, datasetUri) def get_profiling_run(context: Context, source, profilingRunUri=None): with context.engine.scoped_session() as session: - return api.DatasetProfilingRun.get_profiling_run( + return DatasetProfilingService.get_profiling_run( session=session, profilingRunUri=profilingRunUri ) def get_last_table_profiling_run(context: Context, source, tableUri=None): with context.engine.scoped_session() as session: - run: models.DatasetProfilingRun = ( - api.DatasetProfilingRun.get_table_last_profiling_run( + run: DatasetProfilingRun = ( + DatasetProfilingService.get_table_last_profiling_run( session=session, tableUri=tableUri ) ) @@ -112,7 +114,7 @@ def get_last_table_profiling_run(context: Context, source, tableUri=None): if not run.results: run_with_results = ( - api.DatasetProfilingRun.get_table_last_profiling_run_with_results( + DatasetProfilingService.get_table_last_profiling_run_with_results( session=session, tableUri=tableUri ) ) @@ -143,6 +145,6 @@ def get_profiling_results_from_s3(environment, dataset, table, run): def list_table_profiling_runs(context: Context, source, tableUri=None): with context.engine.scoped_session() as session: - return api.DatasetProfilingRun.list_table_profiling_runs( + return DatasetProfilingService.list_table_profiling_runs( session=session, tableUri=tableUri, filter={} ) diff --git a/backend/dataall/api/Objects/DatasetProfiling/schema.py b/backend/dataall/modules/datasets/api/profiling/schema.py similarity index 94% rename from backend/dataall/api/Objects/DatasetProfiling/schema.py rename to backend/dataall/modules/datasets/api/profiling/schema.py index f6fe9c575..6babb61b3 100644 --- a/backend/dataall/api/Objects/DatasetProfiling/schema.py +++ b/backend/dataall/modules/datasets/api/profiling/schema.py @@ -1,5 +1,5 @@ -from ... import gql -from .resolvers import ( +from dataall.api import gql +from dataall.modules.datasets.api.profiling.resolvers import ( resolve_dataset, get_profiling_run_status, get_profiling_results, diff --git a/backend/dataall/modules/datasets/api/table_column/resolvers.py b/backend/dataall/modules/datasets/api/table_column/resolvers.py index b958f2f7a..8e78a042e 100644 --- a/backend/dataall/modules/datasets/api/table_column/resolvers.py +++ b/backend/dataall/modules/datasets/api/table_column/resolvers.py @@ -6,7 +6,7 @@ from dataall.db import paginate, permissions, models from dataall.db.api import ResourcePolicy from dataall.modules.datasets.services.dataset_table import DatasetTableService -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn def list_table_columns( diff --git a/backend/dataall/modules/datasets/db/table_column_model.py b/backend/dataall/modules/datasets/db/models.py similarity index 54% rename from backend/dataall/modules/datasets/db/table_column_model.py rename to backend/dataall/modules/datasets/db/models.py index 05bc26058..1ba60bea1 100644 --- a/backend/dataall/modules/datasets/db/table_column_model.py +++ b/backend/dataall/modules/datasets/db/models.py @@ -1,7 +1,6 @@ from sqlalchemy import Column, String - -from dataall.db import Base -from dataall.db import Resource, utils +from sqlalchemy.dialects.postgresql import JSON +from dataall.db import Base, Resource, utils class DatasetTableColumn(Resource, Base): @@ -21,3 +20,19 @@ class DatasetTableColumn(Resource, Base): def uri(self): return self.columnUri + + +class DatasetProfilingRun(Resource, Base): + __tablename__ = 'dataset_profiling_run' + profilingRunUri = Column( + String, primary_key=True, default=utils.uuid('profilingrun') + ) + datasetUri = Column(String, nullable=False) + GlueJobName = Column(String) + GlueJobRunId = Column(String) + GlueTriggerSchedule = Column(String) + GlueTriggerName = Column(String) + GlueTableName = Column(String) + AwsAccountId = Column(String) + results = Column(JSON, default={}) + status = Column(String, default='Created') diff --git a/backend/dataall/modules/datasets/handlers/glue_column_handler.py b/backend/dataall/modules/datasets/handlers/glue_column_handler.py index 9d97470ff..ea2fb82b4 100644 --- a/backend/dataall/modules/datasets/handlers/glue_column_handler.py +++ b/backend/dataall/modules/datasets/handlers/glue_column_handler.py @@ -5,7 +5,7 @@ from dataall.aws.handlers.service_handlers import Worker from dataall.modules.datasets.aws.glue_table_client import GlueTableClient from dataall.modules.datasets.aws.lf_table_client import LakeFormationTableClient -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn from dataall.modules.datasets.services.dataset_table import DatasetTableService log = logging.getLogger(__name__) diff --git a/backend/dataall/modules/datasets/handlers/glue_profiling_handler.py b/backend/dataall/modules/datasets/handlers/glue_profiling_handler.py new file mode 100644 index 000000000..d15607733 --- /dev/null +++ b/backend/dataall/modules/datasets/handlers/glue_profiling_handler.py @@ -0,0 +1,98 @@ +import logging +from botocore.exceptions import ClientError + +from dataall.aws.handlers.service_handlers import Worker +from dataall.aws.handlers.sts import SessionHelper +from dataall.db import models +from dataall.modules.datasets.db.models import DatasetProfilingRun +from dataall.modules.datasets.services.dataset_profiling_service import DatasetProfilingService + +log = logging.getLogger(__name__) + + +class DatasetProfilingGlueHandler: + """A handler for dataset profiling""" + + @staticmethod + @Worker.handler('glue.job.profiling_run_status') + def get_profiling_run(engine, task: models.Task): + with engine.scoped_session() as session: + profiling: DatasetProfilingRun = ( + DatasetProfilingService.get_profiling_run( + session, profilingRunUri=task.targetUri + ) + ) + dataset: models.Dataset = session.query(models.Dataset).get( + profiling.datasetUri + ) + glue_run = DatasetProfilingGlueHandler.get_job_run( + **{ + 'accountid': dataset.AwsAccountId, + 'name': dataset.GlueProfilingJobName, + 'region': dataset.region, + 'run_id': profiling.GlueJobRunId, + } + ) + profiling.status = glue_run['JobRun']['JobRunState'] + session.commit() + return profiling.status + + @staticmethod + @Worker.handler('glue.job.start_profiling_run') + def start_profiling_run(engine, task: models.Task): + with engine.scoped_session() as session: + profiling: DatasetProfilingRun = ( + DatasetProfilingService.get_profiling_run( + session, profilingRunUri=task.targetUri + ) + ) + dataset: models.Dataset = session.query(models.Dataset).get( + profiling.datasetUri + ) + run = DatasetProfilingGlueHandler.run_job( + **{ + 'accountid': dataset.AwsAccountId, + 'name': dataset.GlueProfilingJobName, + 'region': dataset.region, + 'arguments': ( + {'--table': profiling.GlueTableName} + if profiling.GlueTableName + else {} + ), + } + ) + DatasetProfilingService.update_run( + session, + profilingRunUri=profiling.profilingRunUri, + GlueJobRunId=run['JobRunId'], + ) + return run + + @staticmethod + def get_job_run(**data): + accountid = data['accountid'] + name = data['name'] + run_id = data['run_id'] + try: + session = SessionHelper.remote_session(accountid=accountid) + client = session.client('glue', region_name=data.get('region', 'eu-west-1')) + response = client.get_job_run(JobName=name, RunId=run_id) + return response + except ClientError as e: + log.error(f'Failed to get job run {run_id} due to: {e}') + raise e + + @staticmethod + def run_job(**data): + accountid = data['accountid'] + name = data['name'] + try: + session = SessionHelper.remote_session(accountid=accountid) + client = session.client('glue', region_name=data.get('region', 'eu-west-1')) + response = client.start_job_run( + JobName=name, Arguments=data.get('arguments', {}) + ) + return response + except ClientError as e: + log.error(f'Failed to start profiling job {name} due to: {e}') + raise e diff --git a/backend/dataall/db/api/dataset_profiling_run.py b/backend/dataall/modules/datasets/services/dataset_profiling_service.py similarity index 69% rename from backend/dataall/db/api/dataset_profiling_run.py rename to backend/dataall/modules/datasets/services/dataset_profiling_service.py index f1552bc81..5b6ca8d41 100644 --- a/backend/dataall/db/api/dataset_profiling_run.py +++ b/backend/dataall/modules/datasets/services/dataset_profiling_service.py @@ -1,10 +1,11 @@ from sqlalchemy import and_ -from .. import paginate, models -from ..exceptions import ObjectNotFound +from dataall.db import paginate, models +from dataall.db.exceptions import ObjectNotFound +from dataall.modules.datasets.db.models import DatasetProfilingRun -class DatasetProfilingRun: +class DatasetProfilingService: def __init__(self): pass @@ -30,7 +31,7 @@ def start_profiling( if not environment: raise ObjectNotFound('Environment', dataset.environmentUri) - run = models.DatasetProfilingRun( + run = DatasetProfilingRun( datasetUri=dataset.datasetUri, status='RUNNING', AwsAccountId=environment.AwsAccountId, @@ -55,7 +56,7 @@ def update_run( GlueJobRunState=None, results=None, ): - run = DatasetProfilingRun.get_profiling_run( + run = DatasetProfilingService.get_profiling_run( session, profilingRunUri=profilingRunUri, GlueJobRunId=GlueJobRunId ) if GlueJobRunId: @@ -72,14 +73,14 @@ def get_profiling_run( session, profilingRunUri=None, GlueJobRunId=None, GlueTableName=None ): if profilingRunUri: - run: models.DatasetProfilingRun = session.query( - models.DatasetProfilingRun + run: DatasetProfilingRun = session.query( + DatasetProfilingRun ).get(profilingRunUri) else: - run: models.DatasetProfilingRun = ( - session.query(models.DatasetProfilingRun) - .filter(models.DatasetProfilingRun.GlueJobRunId == GlueJobRunId) - .filter(models.DatasetProfilingRun.GlueTableName == GlueTableName) + run: DatasetProfilingRun = ( + session.query(DatasetProfilingRun) + .filter(DatasetProfilingRun.GlueJobRunId == GlueJobRunId) + .filter(DatasetProfilingRun.GlueTableName == GlueTableName) .first() ) return run @@ -89,9 +90,9 @@ def list_profiling_runs(session, datasetUri, filter: dict = None): if not filter: filter = {} q = ( - session.query(models.DatasetProfilingRun) - .filter(models.DatasetProfilingRun.datasetUri == datasetUri) - .order_by(models.DatasetProfilingRun.created.desc()) + session.query(DatasetProfilingRun) + .filter(DatasetProfilingRun.datasetUri == datasetUri) + .order_by(DatasetProfilingRun.created.desc()) ) return paginate( q, page=filter.get('page', 1), page_size=filter.get('pageSize', 20) @@ -102,19 +103,19 @@ def list_table_profiling_runs(session, tableUri, filter): if not filter: filter = {} q = ( - session.query(models.DatasetProfilingRun) + session.query(DatasetProfilingRun) .join( models.DatasetTable, - models.DatasetTable.datasetUri == models.DatasetProfilingRun.datasetUri, + models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) .filter( and_( models.DatasetTable.tableUri == tableUri, models.DatasetTable.GlueTableName - == models.DatasetProfilingRun.GlueTableName, + == DatasetProfilingRun.GlueTableName, ) ) - .order_by(models.DatasetProfilingRun.created.desc()) + .order_by(DatasetProfilingRun.created.desc()) ) return paginate( q, page=filter.get('page', 1), page_size=filter.get('pageSize', 20) @@ -123,34 +124,34 @@ def list_table_profiling_runs(session, tableUri, filter): @staticmethod def get_table_last_profiling_run(session, tableUri): return ( - session.query(models.DatasetProfilingRun) + session.query(DatasetProfilingRun) .join( models.DatasetTable, - models.DatasetTable.datasetUri == models.DatasetProfilingRun.datasetUri, + models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) .filter(models.DatasetTable.tableUri == tableUri) .filter( models.DatasetTable.GlueTableName - == models.DatasetProfilingRun.GlueTableName + == DatasetProfilingRun.GlueTableName ) - .order_by(models.DatasetProfilingRun.created.desc()) + .order_by(DatasetProfilingRun.created.desc()) .first() ) @staticmethod def get_table_last_profiling_run_with_results(session, tableUri): return ( - session.query(models.DatasetProfilingRun) + session.query(DatasetProfilingRun) .join( models.DatasetTable, - models.DatasetTable.datasetUri == models.DatasetProfilingRun.datasetUri, + models.DatasetTable.datasetUri == DatasetProfilingRun.datasetUri, ) .filter(models.DatasetTable.tableUri == tableUri) .filter( models.DatasetTable.GlueTableName - == models.DatasetProfilingRun.GlueTableName + == DatasetProfilingRun.GlueTableName ) - .filter(models.DatasetProfilingRun.results.isnot(None)) - .order_by(models.DatasetProfilingRun.created.desc()) + .filter(DatasetProfilingRun.results.isnot(None)) + .order_by(DatasetProfilingRun.created.desc()) .first() ) diff --git a/backend/dataall/modules/datasets/services/dataset_table.py b/backend/dataall/modules/datasets/services/dataset_table.py index 23b510083..1c28469f6 100644 --- a/backend/dataall/modules/datasets/services/dataset_table.py +++ b/backend/dataall/modules/datasets/services/dataset_table.py @@ -6,7 +6,7 @@ from dataall.db.api import has_tenant_perm, has_resource_perm, Glossary, ResourcePolicy, Environment from dataall.db.models import Dataset from dataall.utils import json_utils -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn logger = logging.getLogger(__name__) diff --git a/backend/dataall/modules/datasets/tasks/subscription_service.py b/backend/dataall/modules/datasets/tasks/subscription_service.py index 8674f903a..74f84d7c9 100644 --- a/backend/dataall/modules/datasets/tasks/subscription_service.py +++ b/backend/dataall/modules/datasets/tasks/subscription_service.py @@ -12,6 +12,7 @@ from dataall.aws.handlers.sqs import SqsQueue from dataall.db import get_engine from dataall.db import models +from dataall.modules.datasets.services.dataset_profiling_service import DatasetProfilingService from dataall.tasks.subscriptions import poll_queues from dataall.utils import json_utils from dataall.modules.datasets.services.dataset_table import DatasetTableService @@ -143,7 +144,7 @@ def store_dataquality_results(session, message): message.get('region'), ) - run = db.api.DatasetProfilingRun.start_profiling( + run = DatasetProfilingService.start_profiling( session=session, datasetUri=table.datasetUri, GlueTableName=table.GlueTableName, diff --git a/tests/api/test_dataset_profiling.py b/tests/api/test_dataset_profiling.py index c5bed6d1e..8d708e94d 100644 --- a/tests/api/test_dataset_profiling.py +++ b/tests/api/test_dataset_profiling.py @@ -2,6 +2,7 @@ import pytest import dataall +from dataall.modules.datasets.db.models import DatasetProfilingRun @pytest.fixture(scope='module', autouse=True) @@ -39,7 +40,7 @@ def test_add_tables(table, dataset1, db): def update_runs(db, runs): with db.scoped_session() as session: for run in runs: - run = session.query(dataall.db.models.DatasetProfilingRun).get( + run = session.query(DatasetProfilingRun).get( run['profilingRunUri'] ) run.status = 'SUCCEEDED' @@ -70,7 +71,7 @@ def test_start_profiling(org1, env1, dataset1, client, module_mocker, db, user, profiling = response.data.startDatasetProfilingRun assert profiling.profilingRunUri with db.scoped_session() as session: - profiling = session.query(dataall.db.models.DatasetProfilingRun).get( + profiling = session.query(DatasetProfilingRun).get( profiling.profilingRunUri ) profiling.GlueJobRunId = 'jr_111111111111' @@ -129,7 +130,7 @@ def test_get_table_profiling_run( client, dataset1, env1, module_mocker, table, db, group ): module_mocker.patch( - 'dataall.api.Objects.DatasetProfiling.resolvers.get_profiling_results_from_s3', + 'dataall.modules.datasets.api.profiling.resolvers.get_profiling_results_from_s3', return_value='{"results": "yes"}', ) runs = list_profiling_runs(client, dataset1, group) @@ -169,7 +170,7 @@ def test_list_table_profiling_runs( client, dataset1, env1, module_mocker, table, db, group ): module_mocker.patch( - 'dataall.api.Objects.DatasetProfiling.resolvers.get_profiling_results_from_s3', + 'dataall.modules.datasets.api.profiling.resolvers.get_profiling_results_from_s3', return_value='{"results": "yes"}', ) module_mocker.patch('requests.post', return_value=True) diff --git a/tests/api/test_dataset_table.py b/tests/api/test_dataset_table.py index c285aa5f7..56f8806c5 100644 --- a/tests/api/test_dataset_table.py +++ b/tests/api/test_dataset_table.py @@ -4,7 +4,7 @@ import dataall from dataall.modules.datasets.services.dataset_table import DatasetTableService -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn @pytest.fixture(scope='module', autouse=True) diff --git a/tests/api/test_glossary.py b/tests/api/test_glossary.py index 987ccc1a8..bb7f34516 100644 --- a/tests/api/test_glossary.py +++ b/tests/api/test_glossary.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import List from dataall.db import models -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn import pytest diff --git a/tests/modules/datasets/test_dataset_feed.py b/tests/modules/datasets/test_dataset_feed.py index db5ff43e2..06ffdc8ed 100644 --- a/tests/modules/datasets/test_dataset_feed.py +++ b/tests/modules/datasets/test_dataset_feed.py @@ -1,6 +1,6 @@ from dataall.api.Objects.Feed.registry import FeedRegistry -from dataall.modules.datasets.db.table_column_model import DatasetTableColumn +from dataall.modules.datasets.db.models import DatasetTableColumn def test_dataset_registered():