Skip to content

Commit

Permalink
modularization: Datapipelines modularization (#457)
Browse files Browse the repository at this point in the history
Modularization of data pipelines

Changes:
- Relevant files moved from dataall/Objects/api and from dataall/db to
the newly created module
- Relevant permissions extracted to the newly created module and are
being used with the new decorators
- Functions interacting with the DB were outsourced to repository
- extracted and moved Datapipelines related code from core CDK files to
the new module
dataall/cdkproxy/cdk_cli_wrapper.py
dataall/cdkproxy/stacks/pipeline.py
dataall/cdkproxy/cdkpipeline/cdk_pipeline.py
service policies

- extracted and moved Datapipelines related code from core AWS handlers
to the new module
dataall/aws/handlers/stepfunction.py
dataall/aws/handlers/codecommit.py
dataall/aws/handlers/codepipeline.py
dataall/aws/handlers/glue.py

- added module interface for the module
- unit tests updated



By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
dbalintx authored Jun 27, 2023
1 parent f3bccc6 commit 79cd32e
Show file tree
Hide file tree
Showing 75 changed files with 1,468 additions and 1,391 deletions.
420 changes: 0 additions & 420 deletions backend/dataall/api/Objects/DataPipeline/resolvers.py

This file was deleted.

2 changes: 0 additions & 2 deletions backend/dataall/api/Objects/Environment/input_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
gql.Argument('AwsAccountId', gql.NonNullableType(gql.String)),
gql.Argument('region', gql.NonNullableType(gql.String)),
gql.Argument('dashboardsEnabled', type=gql.Boolean),
gql.Argument('pipelinesEnabled', type=gql.Boolean),
gql.Argument('warehousesEnabled', type=gql.Boolean),
gql.Argument('vpcId', gql.String),
gql.Argument('privateSubnetIds', gql.ArrayType(gql.String)),
Expand All @@ -52,7 +51,6 @@
gql.Argument('privateSubnetIds', gql.ArrayType(gql.String)),
gql.Argument('publicSubnetIds', gql.ArrayType(gql.String)),
gql.Argument('dashboardsEnabled', type=gql.Boolean),
gql.Argument('pipelinesEnabled', type=gql.Boolean),
gql.Argument('warehousesEnabled', type=gql.Boolean),
gql.Argument('resourcePrefix', gql.String),
gql.Argument('parameters', gql.ArrayType(ModifyEnvironmentParameterInput))
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/Environment/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
),
gql.Field('validated', type=gql.Boolean),
gql.Field('dashboardsEnabled', type=gql.Boolean),
gql.Field('pipelinesEnabled', type=gql.Boolean),
gql.Field('warehousesEnabled', type=gql.Boolean),
gql.Field('roleCreated', type=gql.Boolean),
gql.Field('isOrganizationDefaultEnvironment', type=gql.Boolean),
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/Feed/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ def types(cls):
return [gql.Ref(target_type) for target_type in cls._DEFINITIONS.keys()]


FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline))
FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard))
20 changes: 0 additions & 20 deletions backend/dataall/api/Objects/Stack/stack_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,3 @@ def delete_stack(

Worker.queue(context.db_engine, [task.taskUri])
return True


def delete_repository(
target_uri, accountid, cdk_role_arn, region, repo_name
):
context = get_context()
with context.db_engine.scoped_session() as session:
task = models.Task(
targetUri=target_uri,
action='repo.datapipeline.delete',
payload={
'accountid': accountid,
'region': region,
'cdk_role_arn': cdk_role_arn,
'repo_name': repo_name,
},
)
session.add(task)
Worker.queue(context.db_engine, [task.taskUri])
return True
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ...api.constants import GraphQLEnumMapper
from . import (
Permission,
DataPipeline,
Environment,
Activity,
Group,
Expand Down
6 changes: 0 additions & 6 deletions backend/dataall/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class DashboardRole(GraphQLEnumMapper):
NoPermission = '000'


class DataPipelineRole(GraphQLEnumMapper):
Creator = '999'
Admin = '900'
NoPermission = '000'


class GlossaryRole(GraphQLEnumMapper):
# Permissions on a glossary
Admin = '900'
Expand Down
100 changes: 0 additions & 100 deletions backend/dataall/aws/handlers/codecommit.py

This file was deleted.

44 changes: 0 additions & 44 deletions backend/dataall/aws/handlers/codepipeline.py

This file was deleted.

32 changes: 17 additions & 15 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from botocore.exceptions import ClientError

from .service_handlers import Worker
from .sts import SessionHelper
from ...db import models

log = logging.getLogger('aws:glue')

Expand All @@ -14,17 +12,21 @@ def __init__(self):
pass

@staticmethod
@Worker.handler(path='glue.job.runs')
def get_job_runs(engine, task: models.Task):
with engine.scoped_session() as session:
Data_pipeline: models.DataPipeline = session.query(models.DataPipeline).get(
task.targetUri
def table_exists(**data):
accountid = data['accountid']
region = data.get('region', 'eu-west-1')
database = data.get('database', 'UndefinedDatabaseName')
table_name = data.get('tablename', 'UndefinedTableName')
try:
table = (
SessionHelper.remote_session(accountid)
.client('glue', region_name=region)
.get_table(
CatalogId=data['accountid'], DatabaseName=database, Name=table_name
)
)
aws = SessionHelper.remote_session(Data_pipeline.AwsAccountId)
glue_client = aws.client('glue', region_name=Data_pipeline.region)
try:
response = glue_client.get_job_runs(JobName=Data_pipeline.name)
except ClientError as e:
log.warning(f'Could not retrieve pipeline runs , {str(e)}')
return []
return response['JobRuns']
log.info(f'Glue table found: {data}')
return table
except ClientError:
log.info(f'Glue table not found: {data}')
return None
42 changes: 0 additions & 42 deletions backend/dataall/aws/handlers/stepfunction.py

This file was deleted.

Loading

0 comments on commit 79cd32e

Please sign in to comment.