Skip to content

Commit

Permalink
Datasets modularization pt.2 (#432)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Refactoring

### Detail
Refactoring of DatasetProfilingRun

### Relates
- #295 and #412 

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
nikpodsh authored May 2, 2023
1 parent 3c4ab2d commit 74a249a
Show file tree
Hide file tree
Showing 25 changed files with 195 additions and 194 deletions.
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
Test,
SagemakerStudio,
RedshiftCluster,
DatasetProfiling,
Glossary,
AthenaQueryResult,
Worksheet,
Expand Down
84 changes: 0 additions & 84 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,90 +522,6 @@ def get_job_runs(engine, task: models.Task):
return []
return response['JobRuns']

@staticmethod
@Worker.handler('glue.job.start_profiling_run')
def start_profiling_run(engine, task: models.Task):
with engine.scoped_session() as session:
profiling: models.DatasetProfilingRun = (
db.api.DatasetProfilingRun.get_profiling_run(
session, profilingRunUri=task.targetUri
)
)
dataset: models.Dataset = session.query(models.Dataset).get(
profiling.datasetUri
)
run = Glue.run_job(
**{
'accountid': dataset.AwsAccountId,
'name': dataset.GlueProfilingJobName,
'region': dataset.region,
'arguments': (
{'--table': profiling.GlueTableName}
if profiling.GlueTableName
else {}
),
}
)
db.api.DatasetProfilingRun.update_run(
session,
profilingRunUri=profiling.profilingRunUri,
GlueJobRunId=run['JobRunId'],
)
return run

@staticmethod
def run_job(**data):
accountid = data['accountid']
name = data['name']
try:
session = SessionHelper.remote_session(accountid=accountid)
client = session.client('glue', region_name=data.get('region', 'eu-west-1'))
response = client.start_job_run(
JobName=name, Arguments=data.get('arguments', {})
)
return response
except ClientError as e:
log.error(f'Failed to start profiling job {name} due to: {e}')
raise e

@staticmethod
@Worker.handler('glue.job.profiling_run_status')
def get_profiling_run(engine, task: models.Task):
with engine.scoped_session() as session:
profiling: models.DatasetProfilingRun = (
db.api.DatasetProfilingRun.get_profiling_run(
session, profilingRunUri=task.targetUri
)
)
dataset: models.Dataset = session.query(models.Dataset).get(
profiling.datasetUri
)
glue_run = Glue.get_job_run(
**{
'accountid': dataset.AwsAccountId,
'name': dataset.GlueProfilingJobName,
'region': dataset.region,
'run_id': profiling.GlueJobRunId,
}
)
profiling.status = glue_run['JobRun']['JobRunState']
session.commit()
return profiling.status

@staticmethod
def get_job_run(**data):
accountid = data['accountid']
name = data['name']
run_id = data['run_id']
try:
session = SessionHelper.remote_session(accountid=accountid)
client = session.client('glue', region_name=data.get('region', 'eu-west-1'))
response = client.get_job_run(JobName=name, RunId=run_id)
return response
except ClientError as e:
log.error(f'Failed to get job run {run_id} due to: {e}')
raise e

@staticmethod
def grant_principals_all_table_permissions(
table: models.DatasetTable, principals: [str], client=None
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/db/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from .share_object import ShareObject, ShareObjectSM, ShareItemSM
from .dataset import Dataset
from .dataset_location import DatasetStorageLocation
from .dataset_profiling_run import DatasetProfilingRun
from .notification import Notification
from .redshift_cluster import RedshiftCluster
from .vpc import Vpc
Expand Down
20 changes: 0 additions & 20 deletions backend/dataall/db/models/DatasetProfilingRun.py

This file was deleted.

18 changes: 0 additions & 18 deletions backend/dataall/db/models/DatasetTableProfilingJob.py

This file was deleted.

2 changes: 0 additions & 2 deletions backend/dataall/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
from .DashboardShare import DashboardShare
from .DashboardShare import DashboardShareStatus
from .Dataset import Dataset
from .DatasetProfilingRun import DatasetProfilingRun
from .DatasetQualityRule import DatasetQualityRule
from .DatasetStorageLocation import DatasetStorageLocation
from .DatasetTable import DatasetTable
from .DatasetTableProfilingJob import DatasetTableProfilingJob
from .Environment import Environment
from .EnvironmentGroup import EnvironmentGroup
from .FeedMessage import FeedMessage
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/modules/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dataall.api.Objects.Feed.registry import FeedRegistry, FeedDefinition
from dataall.api.Objects.Glossary.registry import GlossaryRegistry, GlossaryDefinition
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn
from dataall.modules.loader import ModuleInterface, ImportMode

log = logging.getLogger(__name__)
Expand Down
5 changes: 3 additions & 2 deletions backend/dataall/modules/datasets/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The GraphQL schema of datasets and related functionality"""
from dataall.modules.datasets.api import (
table_column
table_column,
profiling
)

__all__ = ["table_column"]
__all__ = ["table_column", "profiling"]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import (
from dataall.modules.datasets.api.profiling import (
input_types,
mutations,
queries,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ... import gql
from dataall.api import gql

StartDatasetProfilingRunInput = gql.InputType(
name='StartDatasetProfilingRunInput',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from ... import gql
from .resolvers import *
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
start_profiling_run,
update_profiling_run_results
)

startDatasetProfilingRun = gql.MutationField(
name='startDatasetProfilingRun',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from ... import gql
from .resolvers import *
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
get_profiling_run,
list_profiling_runs,
list_table_profiling_runs,
get_last_table_profiling_run
)


getDatasetProfilingRun = gql.QueryField(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import logging

from ....api.context import Context
from ....aws.handlers.service_handlers import Worker
from ....aws.handlers.sts import SessionHelper
from ....db import api, permissions, models
from ....db.api import ResourcePolicy
from dataall.api.context import Context
from dataall.aws.handlers.service_handlers import Worker
from dataall.aws.handlers.sts import SessionHelper
from dataall.db import api, permissions, models
from dataall.db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.services.dataset_profiling_service import DatasetProfilingService
from dataall.modules.datasets.db.models import DatasetProfilingRun

log = logging.getLogger(__name__)


def resolve_dataset(context, source: models.DatasetProfilingRun):
def resolve_dataset(context, source: DatasetProfilingRun):
if not source:
return None
with context.engine.scoped_session() as session:
Expand All @@ -32,7 +34,7 @@ def start_profiling_run(context: Context, source, input: dict = None):
)
dataset = api.Dataset.get_dataset_by_uri(session, input['datasetUri'])

run = api.DatasetProfilingRun.start_profiling(
run = DatasetProfilingService.start_profiling(
session=session,
datasetUri=dataset.datasetUri,
tableUri=input.get('tableUri'),
Expand All @@ -49,7 +51,7 @@ def start_profiling_run(context: Context, source, input: dict = None):
return run


def get_profiling_run_status(context: Context, source: models.DatasetProfilingRun):
def get_profiling_run_status(context: Context, source: DatasetProfilingRun):
if not source:
return None
with context.engine.scoped_session() as session:
Expand All @@ -61,7 +63,7 @@ def get_profiling_run_status(context: Context, source: models.DatasetProfilingRu
return source.status


def get_profiling_results(context: Context, source: models.DatasetProfilingRun):
def get_profiling_results(context: Context, source: DatasetProfilingRun):
if not source or source.results == {}:
return None
else:
Expand All @@ -70,28 +72,28 @@ def get_profiling_results(context: Context, source: models.DatasetProfilingRun):

def update_profiling_run_results(context: Context, source, profilingRunUri, results):
with context.engine.scoped_session() as session:
run = api.DatasetProfilingRun.update_run(
run = DatasetProfilingService.update_run(
session=session, profilingRunUri=profilingRunUri, results=results
)
return run


def list_profiling_runs(context: Context, source, datasetUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.list_profiling_runs(session, datasetUri)
return DatasetProfilingService.list_profiling_runs(session, datasetUri)


def get_profiling_run(context: Context, source, profilingRunUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.get_profiling_run(
return DatasetProfilingService.get_profiling_run(
session=session, profilingRunUri=profilingRunUri
)


def get_last_table_profiling_run(context: Context, source, tableUri=None):
with context.engine.scoped_session() as session:
run: models.DatasetProfilingRun = (
api.DatasetProfilingRun.get_table_last_profiling_run(
run: DatasetProfilingRun = (
DatasetProfilingService.get_table_last_profiling_run(
session=session, tableUri=tableUri
)
)
Expand All @@ -112,7 +114,7 @@ def get_last_table_profiling_run(context: Context, source, tableUri=None):

if not run.results:
run_with_results = (
api.DatasetProfilingRun.get_table_last_profiling_run_with_results(
DatasetProfilingService.get_table_last_profiling_run_with_results(
session=session, tableUri=tableUri
)
)
Expand Down Expand Up @@ -143,6 +145,6 @@ def get_profiling_results_from_s3(environment, dataset, table, run):

def list_table_profiling_runs(context: Context, source, tableUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.list_table_profiling_runs(
return DatasetProfilingService.list_table_profiling_runs(
session=session, tableUri=tableUri, filter={}
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ... import gql
from .resolvers import (
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
resolve_dataset,
get_profiling_run_status,
get_profiling_results,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataall.db import paginate, permissions, models
from dataall.db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn


def list_table_columns(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from sqlalchemy import Column, String

from dataall.db import Base
from dataall.db import Resource, utils
from sqlalchemy.dialects.postgresql import JSON
from dataall.db import Base, Resource, utils


class DatasetTableColumn(Resource, Base):
Expand All @@ -21,3 +20,19 @@ class DatasetTableColumn(Resource, Base):

def uri(self):
return self.columnUri


class DatasetProfilingRun(Resource, Base):
__tablename__ = 'dataset_profiling_run'
profilingRunUri = Column(
String, primary_key=True, default=utils.uuid('profilingrun')
)
datasetUri = Column(String, nullable=False)
GlueJobName = Column(String)
GlueJobRunId = Column(String)
GlueTriggerSchedule = Column(String)
GlueTriggerName = Column(String)
GlueTableName = Column(String)
AwsAccountId = Column(String)
results = Column(JSON, default={})
status = Column(String, default='Created')
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataall.aws.handlers.service_handlers import Worker
from dataall.modules.datasets.aws.glue_table_client import GlueTableClient
from dataall.modules.datasets.aws.lf_table_client import LakeFormationTableClient
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)
Expand Down
Loading

0 comments on commit 74a249a

Please sign in to comment.