Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modularization: Datasets modularization pt.2 #432

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3a5e0de
Initialization of dataset module
nikpodsh Apr 11, 2023
a50a02f
Refactoring of datasets
nikpodsh Apr 11, 2023
be14986
Refactoring of datasets
nikpodsh Apr 11, 2023
06f82ad
Refactoring of datasets
nikpodsh Apr 11, 2023
38145ae
Fixed leftover in loader
nikpodsh Apr 11, 2023
f0e146a
Dataset refactoring
nikpodsh Apr 11, 2023
b039163
Dataset refactoring
nikpodsh Apr 11, 2023
b7922ed
Dataset refactoring
nikpodsh Apr 11, 2023
1771bca
Notebooks doesn't require tasks
nikpodsh Apr 11, 2023
3d1603f
Renamed tasks to handlers
nikpodsh Apr 11, 2023
fb6b515
Dataset refactoring
nikpodsh Apr 11, 2023
e3596a5
Dataset refactoring
nikpodsh Apr 11, 2023
3af2ecf
Dataset refactoring
nikpodsh Apr 11, 2023
1a063b2
Dataset refactoring
nikpodsh Apr 11, 2023
b733714
Dataset refactoring
nikpodsh Apr 11, 2023
2a4e2e0
Extracted feed registry
nikpodsh Apr 11, 2023
c15d090
Extracted feed and glossary registry and created a model registry
nikpodsh Apr 11, 2023
052a2b1
Dataset refactoring
nikpodsh Apr 12, 2023
d984483
Fixed and unignored test_tables_sync
nikpodsh Apr 12, 2023
dc0c935
Split model registry into feed and glossaries
nikpodsh Apr 12, 2023
727e353
Abstraction for glossaries
nikpodsh Apr 12, 2023
49fbb41
Fixed leftovers
nikpodsh Apr 12, 2023
7d029e7
Datasets refactoring
nikpodsh Apr 13, 2023
be527eb
Added runtime type registration for Union GraphQL type
nikpodsh Apr 13, 2023
3daf2aa
Changed Feed type registration mechanism
nikpodsh Apr 13, 2023
db3bfd3
Added TODO for future refactoring
nikpodsh Apr 13, 2023
13b6e92
Added GlossaryRegistry for Union scheme
nikpodsh Apr 13, 2023
144dfea
Changed import in redshift module
nikpodsh Apr 13, 2023
d43b9b3
No need for Utils yet
nikpodsh Apr 13, 2023
39b244c
Fixed linting
nikpodsh Apr 13, 2023
cb3800a
Datasets refactoring
nikpodsh Apr 14, 2023
dd8e597
Datasets refactoring
nikpodsh Apr 14, 2023
8ca7bea
Datasets refactoring
nikpodsh Apr 14, 2023
e36ab3b
Datasets refactoring
nikpodsh Apr 14, 2023
31720c2
Datasets refactoring
nikpodsh Apr 14, 2023
8a907df
Datasets refactoring
nikpodsh Apr 14, 2023
561da72
Datasets refactoring
nikpodsh Apr 14, 2023
47a38cc
Datasets refactoring
nikpodsh Apr 17, 2023
2ac3ae7
Resolved code conflict
nikpodsh Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
Test,
SagemakerStudio,
RedshiftCluster,
DatasetProfiling,
Glossary,
AthenaQueryResult,
Worksheet,
Expand Down
84 changes: 0 additions & 84 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,90 +522,6 @@ def get_job_runs(engine, task: models.Task):
return []
return response['JobRuns']

@staticmethod
@Worker.handler('glue.job.start_profiling_run')
def start_profiling_run(engine, task: models.Task):
with engine.scoped_session() as session:
profiling: models.DatasetProfilingRun = (
db.api.DatasetProfilingRun.get_profiling_run(
session, profilingRunUri=task.targetUri
)
)
dataset: models.Dataset = session.query(models.Dataset).get(
profiling.datasetUri
)
run = Glue.run_job(
**{
'accountid': dataset.AwsAccountId,
'name': dataset.GlueProfilingJobName,
'region': dataset.region,
'arguments': (
{'--table': profiling.GlueTableName}
if profiling.GlueTableName
else {}
),
}
)
db.api.DatasetProfilingRun.update_run(
session,
profilingRunUri=profiling.profilingRunUri,
GlueJobRunId=run['JobRunId'],
)
return run

@staticmethod
def run_job(**data):
accountid = data['accountid']
name = data['name']
try:
session = SessionHelper.remote_session(accountid=accountid)
client = session.client('glue', region_name=data.get('region', 'eu-west-1'))
response = client.start_job_run(
JobName=name, Arguments=data.get('arguments', {})
)
return response
except ClientError as e:
log.error(f'Failed to start profiling job {name} due to: {e}')
raise e

@staticmethod
@Worker.handler('glue.job.profiling_run_status')
def get_profiling_run(engine, task: models.Task):
with engine.scoped_session() as session:
profiling: models.DatasetProfilingRun = (
db.api.DatasetProfilingRun.get_profiling_run(
session, profilingRunUri=task.targetUri
)
)
dataset: models.Dataset = session.query(models.Dataset).get(
profiling.datasetUri
)
glue_run = Glue.get_job_run(
**{
'accountid': dataset.AwsAccountId,
'name': dataset.GlueProfilingJobName,
'region': dataset.region,
'run_id': profiling.GlueJobRunId,
}
)
profiling.status = glue_run['JobRun']['JobRunState']
session.commit()
return profiling.status

@staticmethod
def get_job_run(**data):
accountid = data['accountid']
name = data['name']
run_id = data['run_id']
try:
session = SessionHelper.remote_session(accountid=accountid)
client = session.client('glue', region_name=data.get('region', 'eu-west-1'))
response = client.get_job_run(JobName=name, RunId=run_id)
return response
except ClientError as e:
log.error(f'Failed to get job run {run_id} due to: {e}')
raise e

@staticmethod
def grant_principals_all_table_permissions(
table: models.DatasetTable, principals: [str], client=None
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/db/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from .share_object import ShareObject, ShareObjectSM, ShareItemSM
from .dataset import Dataset
from .dataset_location import DatasetStorageLocation
from .dataset_profiling_run import DatasetProfilingRun
from .notification import Notification
from .redshift_cluster import RedshiftCluster
from .vpc import Vpc
Expand Down
20 changes: 0 additions & 20 deletions backend/dataall/db/models/DatasetProfilingRun.py

This file was deleted.

18 changes: 0 additions & 18 deletions backend/dataall/db/models/DatasetTableProfilingJob.py

This file was deleted.

2 changes: 0 additions & 2 deletions backend/dataall/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
from .DashboardShare import DashboardShare
from .DashboardShare import DashboardShareStatus
from .Dataset import Dataset
from .DatasetProfilingRun import DatasetProfilingRun
from .DatasetQualityRule import DatasetQualityRule
from .DatasetStorageLocation import DatasetStorageLocation
from .DatasetTable import DatasetTable
from .DatasetTableProfilingJob import DatasetTableProfilingJob
from .Environment import Environment
from .EnvironmentGroup import EnvironmentGroup
from .FeedMessage import FeedMessage
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/modules/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dataall.api.Objects.Feed.registry import FeedRegistry, FeedDefinition
from dataall.api.Objects.Glossary.registry import GlossaryRegistry, GlossaryDefinition
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn
from dataall.modules.loader import ModuleInterface, ImportMode

log = logging.getLogger(__name__)
Expand Down
5 changes: 3 additions & 2 deletions backend/dataall/modules/datasets/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The GraphQL schema of datasets and related functionality"""
from dataall.modules.datasets.api import (
table_column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change the naming convention? api graphql Objects were capitalized before right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct and if you think it should remain the existing way please let me know.
I thought that this modularization is a good opportunity to start following the python convention ( underscore instead of capitalized).
But as I said if you feel that it's not correct (or capitalized is a unique style of data.all :)) I will return it back

table_column,
profiling
)

__all__ = ["table_column"]
__all__ = ["table_column", "profiling"]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import (
from dataall.modules.datasets.api.profiling import (
input_types,
mutations,
queries,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ... import gql
from dataall.api import gql

StartDatasetProfilingRunInput = gql.InputType(
name='StartDatasetProfilingRunInput',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from ... import gql
from .resolvers import *
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
start_profiling_run,
update_profiling_run_results
)

startDatasetProfilingRun = gql.MutationField(
name='startDatasetProfilingRun',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from ... import gql
from .resolvers import *
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
get_profiling_run,
list_profiling_runs,
list_table_profiling_runs,
get_last_table_profiling_run
)


getDatasetProfilingRun = gql.QueryField(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import logging

from ....api.context import Context
from ....aws.handlers.service_handlers import Worker
from ....aws.handlers.sts import SessionHelper
from ....db import api, permissions, models
from ....db.api import ResourcePolicy
from dataall.api.context import Context
from dataall.aws.handlers.service_handlers import Worker
from dataall.aws.handlers.sts import SessionHelper
from dataall.db import api, permissions, models
from dataall.db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.services.dataset_profiling_service import DatasetProfilingService
from dataall.modules.datasets.db.models import DatasetProfilingRun

log = logging.getLogger(__name__)


def resolve_dataset(context, source: models.DatasetProfilingRun):
def resolve_dataset(context, source: DatasetProfilingRun):
if not source:
return None
with context.engine.scoped_session() as session:
Expand All @@ -32,7 +34,7 @@ def start_profiling_run(context: Context, source, input: dict = None):
)
dataset = api.Dataset.get_dataset_by_uri(session, input['datasetUri'])

run = api.DatasetProfilingRun.start_profiling(
run = DatasetProfilingService.start_profiling(
session=session,
datasetUri=dataset.datasetUri,
tableUri=input.get('tableUri'),
Expand All @@ -49,7 +51,7 @@ def start_profiling_run(context: Context, source, input: dict = None):
return run


def get_profiling_run_status(context: Context, source: models.DatasetProfilingRun):
def get_profiling_run_status(context: Context, source: DatasetProfilingRun):
if not source:
return None
with context.engine.scoped_session() as session:
Expand All @@ -61,7 +63,7 @@ def get_profiling_run_status(context: Context, source: models.DatasetProfilingRu
return source.status


def get_profiling_results(context: Context, source: models.DatasetProfilingRun):
def get_profiling_results(context: Context, source: DatasetProfilingRun):
if not source or source.results == {}:
return None
else:
Expand All @@ -70,28 +72,28 @@ def get_profiling_results(context: Context, source: models.DatasetProfilingRun):

def update_profiling_run_results(context: Context, source, profilingRunUri, results):
with context.engine.scoped_session() as session:
run = api.DatasetProfilingRun.update_run(
run = DatasetProfilingService.update_run(
session=session, profilingRunUri=profilingRunUri, results=results
)
return run


def list_profiling_runs(context: Context, source, datasetUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.list_profiling_runs(session, datasetUri)
return DatasetProfilingService.list_profiling_runs(session, datasetUri)


def get_profiling_run(context: Context, source, profilingRunUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.get_profiling_run(
return DatasetProfilingService.get_profiling_run(
session=session, profilingRunUri=profilingRunUri
)


def get_last_table_profiling_run(context: Context, source, tableUri=None):
with context.engine.scoped_session() as session:
run: models.DatasetProfilingRun = (
api.DatasetProfilingRun.get_table_last_profiling_run(
run: DatasetProfilingRun = (
DatasetProfilingService.get_table_last_profiling_run(
session=session, tableUri=tableUri
)
)
Expand All @@ -112,7 +114,7 @@ def get_last_table_profiling_run(context: Context, source, tableUri=None):

if not run.results:
run_with_results = (
api.DatasetProfilingRun.get_table_last_profiling_run_with_results(
DatasetProfilingService.get_table_last_profiling_run_with_results(
session=session, tableUri=tableUri
)
)
Expand Down Expand Up @@ -143,6 +145,6 @@ def get_profiling_results_from_s3(environment, dataset, table, run):

def list_table_profiling_runs(context: Context, source, tableUri=None):
with context.engine.scoped_session() as session:
return api.DatasetProfilingRun.list_table_profiling_runs(
return DatasetProfilingService.list_table_profiling_runs(
session=session, tableUri=tableUri, filter={}
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ... import gql
from .resolvers import (
from dataall.api import gql
from dataall.modules.datasets.api.profiling.resolvers import (
resolve_dataset,
get_profiling_run_status,
get_profiling_results,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataall.db import paginate, permissions, models
from dataall.db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn


def list_table_columns(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from sqlalchemy import Column, String

from dataall.db import Base
from dataall.db import Resource, utils
from sqlalchemy.dialects.postgresql import JSON
from dataall.db import Base, Resource, utils


class DatasetTableColumn(Resource, Base):
Expand All @@ -21,3 +20,19 @@ class DatasetTableColumn(Resource, Base):

def uri(self):
return self.columnUri


class DatasetProfilingRun(Resource, Base):
__tablename__ = 'dataset_profiling_run'
profilingRunUri = Column(
String, primary_key=True, default=utils.uuid('profilingrun')
)
datasetUri = Column(String, nullable=False)
GlueJobName = Column(String)
GlueJobRunId = Column(String)
GlueTriggerSchedule = Column(String)
GlueTriggerName = Column(String)
GlueTableName = Column(String)
AwsAccountId = Column(String)
results = Column(JSON, default={})
status = Column(String, default='Created')
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataall.aws.handlers.service_handlers import Worker
from dataall.modules.datasets.aws.glue_table_client import GlueTableClient
from dataall.modules.datasets.aws.lf_table_client import LakeFormationTableClient
from dataall.modules.datasets.db.table_column_model import DatasetTableColumn
from dataall.modules.datasets.db.models import DatasetTableColumn
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)
Expand Down
Loading