Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modularization: Dataset Modularization pt.1 #413

Merged
merged 32 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3a5e0de
Initialization of dataset module
nikpodsh Apr 11, 2023
a50a02f
Refactoring of datasets
nikpodsh Apr 11, 2023
be14986
Refactoring of datasets
nikpodsh Apr 11, 2023
06f82ad
Refactoring of datasets
nikpodsh Apr 11, 2023
38145ae
Fixed leftover in loader
nikpodsh Apr 11, 2023
f0e146a
Dataset refactoring
nikpodsh Apr 11, 2023
b039163
Dataset refactoring
nikpodsh Apr 11, 2023
b7922ed
Dataset refactoring
nikpodsh Apr 11, 2023
1771bca
Notebooks doesn't require tasks
nikpodsh Apr 11, 2023
3d1603f
Renamed tasks to handlers
nikpodsh Apr 11, 2023
fb6b515
Dataset refactoring
nikpodsh Apr 11, 2023
e3596a5
Dataset refactoring
nikpodsh Apr 11, 2023
3af2ecf
Dataset refactoring
nikpodsh Apr 11, 2023
1a063b2
Dataset refactoring
nikpodsh Apr 11, 2023
b733714
Dataset refactoring
nikpodsh Apr 11, 2023
2a4e2e0
Extracted feed registry
nikpodsh Apr 11, 2023
c15d090
Extracted feed and glossary registry and created a model registry
nikpodsh Apr 11, 2023
052a2b1
Dataset refactoring
nikpodsh Apr 12, 2023
d984483
Fixed and unignored test_tables_sync
nikpodsh Apr 12, 2023
dc0c935
Split model registry into feed and glossaries
nikpodsh Apr 12, 2023
727e353
Abstraction for glossaries
nikpodsh Apr 12, 2023
49fbb41
Fixed leftovers
nikpodsh Apr 12, 2023
7d029e7
Datasets refactoring
nikpodsh Apr 13, 2023
be527eb
Added runtime type registration for Union GraphQL type
nikpodsh Apr 13, 2023
3daf2aa
Changed Feed type registration mechanism
nikpodsh Apr 13, 2023
db3bfd3
Added TODO for future refactoring
nikpodsh Apr 13, 2023
13b6e92
Added GlossaryRegistry for Union scheme
nikpodsh Apr 13, 2023
144dfea
Changed import in redshift module
nikpodsh Apr 13, 2023
d43b9b3
No need for Utils yet
nikpodsh Apr 13, 2023
39b244c
Fixed linting
nikpodsh Apr 13, 2023
ac71f59
Renamed method
nikpodsh Apr 20, 2023
ec3228f
Added AWS for glue and lake formation clients
nikpodsh Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/aws_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

engine = get_engine(envname=ENVNAME)

load_modules(modes=[ImportMode.TASKS])
load_modules(modes=[ImportMode.HANDLERS])


def handler(event, context=None):
Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/api/Objects/DatasetProfiling/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ....aws.handlers.sts import SessionHelper
from ....db import api, permissions, models
from ....db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,7 +98,7 @@ def get_last_table_profiling_run(context: Context, source, tableUri=None):

if run:
if not run.results:
table = api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
dataset = api.Dataset.get_dataset_by_uri(session, table.datasetUri)
environment = api.Environment.get_environment_by_uri(
session, dataset.environmentUri
Expand Down
25 changes: 13 additions & 12 deletions backend/dataall/api/Objects/DatasetTable/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
from ....db.api import ResourcePolicy, Glossary
from ....searchproxy import indexers
from ....utils import json_utils
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)


def create_table(context, source, datasetUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.create_dataset_table(
table = DatasetTableService.create_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -37,7 +38,7 @@ def list_dataset_tables(context, source, filter: dict = None):
if not filter:
filter = {}
with context.engine.scoped_session() as session:
return db.api.DatasetTable.list_dataset_tables(
return DatasetTableService.list_dataset_tables(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -49,8 +50,8 @@ def list_dataset_tables(context, source, filter: dict = None):

def get_table(context, source: models.Dataset, tableUri: str = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
return db.api.DatasetTable.get_dataset_table(
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
return DatasetTableService.get_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -64,14 +65,14 @@ def get_table(context, source: models.Dataset, tableUri: str = None):

def update_table(context, source, tableUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)

dataset = db.api.Dataset.get_dataset_by_uri(session, table.datasetUri)

input['table'] = table
input['tableUri'] = table.tableUri

db.api.DatasetTable.update_dataset_table(
DatasetTableService.update_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -85,8 +86,8 @@ def update_table(context, source, tableUri: str = None, input: dict = None):

def delete_table(context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
db.api.DatasetTable.delete_dataset_table(
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
DatasetTableService.delete_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -102,7 +103,7 @@ def delete_table(context, source, tableUri: str = None):

def preview(context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, tableUri
)
dataset = db.api.Dataset.get_dataset_by_uri(session, table.datasetUri)
Expand Down Expand Up @@ -157,7 +158,7 @@ def get_glue_table_properties(context: Context, source: models.DatasetTable, **k
if not source:
return None
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, source.tableUri
)
return json_utils.to_string(table.GlueTableProperties).replace('\\', ' ')
Expand Down Expand Up @@ -186,7 +187,7 @@ def resolve_glossary_terms(context: Context, source: models.DatasetTable, **kwar

def publish_table_update(context: Context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, tableUri
)
ResourcePolicy.check_user_resource_permission(
Expand Down Expand Up @@ -235,7 +236,7 @@ def resolve_redshift_copy_location(

def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str, filter: dict = None):
with context.engine.scoped_session() as session:
return db.api.DatasetTable.get_dataset_tables_shared_with_env(
return DatasetTableService.get_dataset_tables_shared_with_env(
session,
envUri,
datasetUri
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/DatasetTable/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ..DatasetTableColumn.resolvers import list_table_columns
from dataall.modules.datasets.api.table_column.resolvers import list_table_columns
from ... import gql
from .resolvers import *
from ...constants import GraphQLEnumMapper
Expand Down
43 changes: 43 additions & 0 deletions backend/dataall/api/Objects/Feed/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from dataclasses import dataclass
from typing import Type, Dict

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
from dataall.db import Resource, models


@dataclass
class FeedDefinition:
target_type: str
model: Type[Resource]


class FeedRegistry(UnionTypeRegistry):
"""Registers models for different target types"""
_DEFINITIONS: Dict[str, FeedDefinition] = {}

@classmethod
def register(cls, definition: FeedDefinition):
cls._DEFINITIONS[definition.target_type] = definition

@classmethod
def find_model(cls, target_type: str):
return cls._DEFINITIONS[target_type].model

@classmethod
def find_target(cls, obj: Resource):
for target_type, definition in cls._DEFINITIONS.items():
if isinstance(obj, definition.model):
return target_type
return None

@classmethod
def types(cls):
return [gql.Ref(target_type) for target_type in cls._DEFINITIONS.keys()]


FeedRegistry.register(FeedDefinition("Worksheet", models.Worksheet))
FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline))
FeedRegistry.register(FeedDefinition("DatasetTable", models.DatasetTable))
FeedRegistry.register(FeedDefinition("DatasetStorageLocation", models.DatasetStorageLocation))
FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard))
32 changes: 5 additions & 27 deletions backend/dataall/api/Objects/Feed/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from sqlalchemy import or_

from ....api.context import Context
from ....db import paginate, models
from dataall.api.context import Context
from dataall.db import paginate, models
from dataall.api.Objects.Feed.registry import FeedRegistry


class Feed:
Expand All @@ -19,37 +20,14 @@ def targetType(self):


def resolve_feed_target_type(obj, *_):
if isinstance(obj, models.DatasetTableColumn):
return 'DatasetTableColumn'
elif isinstance(obj, models.Worksheet):
return 'Worksheet'
elif isinstance(obj, models.DataPipeline):
return 'DataPipeline'
elif isinstance(obj, models.DatasetTable):
return 'DatasetTable'
elif isinstance(obj, models.Dataset):
return 'Dataset'
elif isinstance(obj, models.DatasetStorageLocation):
return 'DatasetStorageLocation'
elif isinstance(obj, models.Dashboard):
return 'Dashboard'
else:
return None
return FeedRegistry.find_target(obj)


def resolve_target(context: Context, source: Feed, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
model = {
'Dataset': models.Dataset,
'DatasetTable': models.DatasetTable,
'DatasetTableColumn': models.DatasetTableColumn,
'DatasetStorageLocation': models.DatasetStorageLocation,
'Dashboard': models.Dashboard,
'DataPipeline': models.DataPipeline,
'Worksheet': models.Worksheet,
}[source.targetType]
model = FeedRegistry.find_model(source.targetType)
target = session.query(model).get(source.targetUri)
return target

Expand Down
11 changes: 2 additions & 9 deletions backend/dataall/api/Objects/Feed/schema.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from ... import gql
from .resolvers import *
from dataall.api.Objects.Feed.registry import FeedRegistry


FeedTarget = gql.Union(
name='FeedTarget',
types=[
gql.Ref('Dataset'),
gql.Ref('DatasetTable'),
gql.Ref('DatasetTableColumn'),
gql.Ref('DatasetStorageLocation'),
gql.Ref('DataPipeline'),
gql.Ref('Worksheet'),
gql.Ref('Dashboard'),
],
type_registry=FeedRegistry,
resolver=resolve_feed_target_type,
)

Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/api/Objects/Glossary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mutations,
resolvers,
schema,
registry,
)

__all__ = ['resolvers', 'schema', 'input_types', 'queries', 'mutations']
__all__ = ['registry', 'resolvers', 'schema', 'input_types', 'queries', 'mutations']
58 changes: 58 additions & 0 deletions backend/dataall/api/Objects/Glossary/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass
from typing import Type, Dict, Optional, Protocol, Union

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
from dataall.db import Resource, models


class Identifiable(Protocol):
def uri(self):
...


@dataclass
class GlossaryDefinition:
"""Glossary's definition used for registration references of other modules"""
target_type: str
object_type: str
model: Union[Type[Resource], Identifiable] # should be an intersection, but python typing doesn't have one yet

def target_uri(self):
return self.model.uri()


class GlossaryRegistry(UnionTypeRegistry):
"""Registry of glossary definition and API to retrieve data"""
_DEFINITIONS: Dict[str, GlossaryDefinition] = {}

@classmethod
def register(cls, glossary: GlossaryDefinition) -> None:
cls._DEFINITIONS[glossary.target_type] = glossary

@classmethod
def find_model(cls, target_type: str) -> Optional[Resource]:
definition = cls._DEFINITIONS[target_type]
return definition.model if definition is not None else None

@classmethod
def find_object_type(cls, model: Resource) -> Optional[str]:
for _, definition in cls._DEFINITIONS.items():
if isinstance(model, definition.model):
return definition.object_type
return None

@classmethod
def definitions(cls):
return cls._DEFINITIONS.values()

@classmethod
def types(cls):
return [gql.Ref(definition.object_type) for definition in cls._DEFINITIONS.values()]


GlossaryRegistry.register(GlossaryDefinition("DatasetTable", "DatasetTable", models.DatasetTable))
GlossaryRegistry.register(GlossaryDefinition("Folder", "DatasetStorageLocation", models.DatasetStorageLocation))
GlossaryRegistry.register(GlossaryDefinition("Dashboard", "Dashboard", models.Dashboard))
GlossaryRegistry.register(GlossaryDefinition("DatasetTable", "DatasetTable", models.DatasetTable))
GlossaryRegistry.register(GlossaryDefinition("Dataset", "Dataset", models.Dataset))
Loading