Skip to content

Commit

Permalink
Merge pull request #203 from atlanhq/DVX-118
Browse files Browse the repository at this point in the history
DVX-118: Add support for workflow packages
  • Loading branch information
ErnestoLoma authored Dec 28, 2023
2 parents e18a88f + 0632150 commit 1e234a6
Show file tree
Hide file tree
Showing 28 changed files with 3,166 additions and 20 deletions.
6 changes: 5 additions & 1 deletion pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,17 @@
INDEX_SEARCH = API(INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.ATLAS)
WORKFLOW_INDEX_API = "workflows/indexsearch"
WORKFLOW_INDEX_RUN_API = "runs/indexsearch"
WORKFLOW_RUN_API = "workflows/submit"
WORKFLOW_INDEX_SEARCH = API(
WORKFLOW_INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
WORKFLOW_INDEX_RUN_SEARCH = API(
WORKFLOW_INDEX_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
WORKFLOW_RERUN_API = "workflows/submit"
WORKFLOW_RERUN = API(
WORKFLOW_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_RUN = API(
WORKFLOW_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
Expand Down
27 changes: 26 additions & 1 deletion pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from pyatlan.client.constants import (
WORKFLOW_INDEX_RUN_SEARCH,
WORKFLOW_INDEX_SEARCH,
WORKFLOW_RERUN,
WORKFLOW_RUN,
)
from pyatlan.errors import ErrorCode
from pyatlan.model.enums import AtlanWorkflowPhase, WorkflowPackage
from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Term
from pyatlan.model.workflow import (
ReRunRequest,
Workflow,
WorkflowResponse,
WorkflowRunResponse,
WorkflowSearchRequest,
Expand Down Expand Up @@ -139,11 +141,34 @@ def rerun(
namespace=detail.metadata.namespace, resource_name=detail.metadata.name
)
raw_json = self._client._call_api(
WORKFLOW_RUN,
WORKFLOW_RERUN,
request_obj=request,
)
return WorkflowRunResponse(**raw_json)

def run(self, workflow: Workflow) -> WorkflowResponse:
"""
Run the Atlan workflow with a specific configuration.
Note: This method should only be used to create the workflow for the first time.
Each invocation creates a new connection and new assets within that connection.
Running the workflow multiple times with the same configuration may lead to duplicate assets.
Consider using the "rerun()" method instead to re-execute an existing workflow.
:param workflow: The workflow to run.
:returns: Details of the workflow run.
:raises ErrorCode.INVALID_PARAMETER_TYPE: If the provided 'workflow' parameter is not an instance of Workflow.
"""
if not isinstance(workflow, Workflow):
raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
"workflow", "Workflow"
)
raw_json = self._client._call_api(
WORKFLOW_RUN,
request_obj=workflow,
)
return WorkflowResponse(**raw_json)

def monitor(
self, workflow_response: WorkflowResponse, logger: Optional[Logger] = None
) -> Optional[AtlanWorkflowPhase]:
Expand Down
8 changes: 4 additions & 4 deletions pyatlan/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

E = TypeVar("E", bound="AtlanError")
RAISE_GITHUB_ISSUE = (
"Please raise an issue on the Java SDK GitHub repository providing context in which this error "
"occurred."
"Please raise an issue on the Python SDK GitHub "
"repository providing context in which this error occurred."
)


Expand Down Expand Up @@ -182,14 +182,14 @@ class ErrorCode(Enum):
400,
"ATLAN-PYTHON-400-013",
"Lineage was retrieved using hideProces=False. We do not provide a graph view in this case.",
"Retry your request for lineage setting hideProcess=true.",
"Retry your request for lineage setting hideProcess=True.",
InvalidRequestError,
)
UNABLE_TO_TRANSLATE_FILTERS = (
400,
"ATLAN-PYTHON-400-014",
"Unable to translate the provided include/exclude asset filters into JSON.",
"Verify the filters you provided. If the problem persists, please raise an issue on the Java SDK GitHub "
"Verify the filters you provided. If the problem persists, please raise an issue on the Python SDK GitHub "
"repository providing context in which this error occurred.",
InvalidRequestError,
)
Expand Down
5 changes: 3 additions & 2 deletions pyatlan/model/enums.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from datetime import datetime
from enum import Enum

from pyatlan import utils


class AdminOperationType(str, Enum):
CREATE = "CREATE"
Expand Down Expand Up @@ -143,7 +144,7 @@ def __new__(
return obj

def to_qualified_name(self):
return f"default/{self.value}/{int(datetime.now().timestamp())}"
return f"default/{self.value}/{int(utils.get_epoch_timestamp())}"

SNOWFLAKE = ("snowflake", AtlanConnectionCategory.WAREHOUSE)
TABLEAU = ("tableau", AtlanConnectionCategory.BI)
Expand Down
Empty file.
Empty file.
111 changes: 111 additions & 0 deletions pyatlan/model/packages/base/crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from json import dumps
from typing import Any, Dict, Optional

from pyatlan import utils
from pyatlan.errors import ErrorCode
from pyatlan.model.assets import Connection
from pyatlan.model.packages.base.package import AbstractPackage


class AbstractCrawler(AbstractPackage):
"""
Abstract class for crawlers
:param connection_name: name for the connection
:param connection_type: type of connector for the connection
:param admin_roles: admin roles for the connection
:param admin_groups: admin groups for the connection
:param admin_users: admin users for the connection
:param allow_query: allow data to be queried in the connection (True) or not (False)
:param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False)
:param row_limit: maximum number of rows that can be returned by a query
:param source_logo: logo to use for the source
:raises AtlanException: if there is not at least one role,
group, or user defined as an admin (or any of them are invalid)
"""

def __init__(
self,
connection_name: str,
connection_type: str,
admin_roles: Optional[list[str]],
admin_groups: Optional[list[str]],
admin_users: Optional[list[str]],
allow_query: bool = False,
allow_query_preview: bool = False,
row_limit: int = 0,
source_logo: str = "",
):
self._parameters: list = []
self._credentials_body: dict = {}
self._epoch = int(utils.get_epoch_timestamp())
self._connection_name = connection_name
self._connection_type = connection_type
self._admin_roles = admin_roles
self._admin_groups = admin_groups
self._admin_users = admin_users
self._allow_query = allow_query
self._allow_query_preview = allow_query_preview
self._row_limit = row_limit
self._source_logo = source_logo

def _get_connection(self) -> Connection:
"""
Builds a connection using the provided parameters,
which will be the target for the package to crawl assets.
"""
connection = Connection.create(
name=self._connection_name,
connector_type=self._connection_type,
admin_roles=self._admin_roles,
admin_groups=self._admin_groups,
admin_users=self._admin_users,
)
connection.allow_query = self._allow_query
connection.allow_query_preview = self._allow_query_preview
connection.row_limit = self._row_limit
connection.default_credential_guid = "{{credentialGuid}}"
connection.source_logo = self._source_logo
connection.is_discoverable = True
connection.is_editable = False
return connection

@staticmethod
def build_hierarchical_filter(raw_filter: Optional[dict]) -> str:
"""
Build an exact match filter from the provided map of databases and schemas.
:param raw_filter: map keyed by database name with each value being a list of schemas
:returns: an exact-match filter map string, usable in crawlers include / exclude filters
:raises InvalidRequestException: In the unlikely event the provided filter cannot be translated
"""
to_include: Dict[str, Any] = {}
if not raw_filter:
return ""
try:
for db_name, schemas in raw_filter.items():
exact_schemas = [f"^{schema}$" for schema in schemas]
to_include[f"^{db_name}$"] = exact_schemas
return dumps(to_include)
except (AttributeError, TypeError):
raise ErrorCode.UNABLE_TO_TRANSLATE_FILTERS.exception_with_parameters()

@staticmethod
def build_flat_filter(raw_filter: Optional[list]) -> str:
"""
Build a filter from the provided list of object names / IDs.
:param raw_filter: list of objects for the filter
:returns: a filter map string, usable in crawlers include / exclude filters
:raises InvalidRequestException: In the unlikely event the provided filter cannot be translated
"""
to_include: Dict[str, Any] = {}
if not raw_filter:
return ""
try:
for entry in raw_filter:
to_include[entry] = {}
return dumps(to_include)
except (AttributeError, TypeError):
raise ErrorCode.UNABLE_TO_TRANSLATE_FILTERS.exception_with_parameters()
64 changes: 64 additions & 0 deletions pyatlan/model/packages/base/package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from pyatlan.model.workflow import (
PackageParameter,
Workflow,
WorkflowDAG,
WorkflowMetadata,
WorkflowParameters,
WorkflowSpec,
WorkflowTask,
WorkflowTemplate,
WorkflowTemplateRef,
)


class AbstractPackage:
"""
Abstract class for packages
"""

_PACKAGE_NAME: str = ""
_PACKAGE_PREFIX: str = ""

def _get_metadata(self) -> WorkflowMetadata:
raise NotImplementedError

def to_workflow(self) -> Workflow:
metadata = self._get_metadata()
spec = WorkflowSpec(
entrypoint="main",
templates=[
WorkflowTemplate(
name="main",
dag=WorkflowDAG(
tasks=[
WorkflowTask(
name="run",
arguments=WorkflowParameters(
parameters=self._parameters # type: ignore
),
template_ref=WorkflowTemplateRef(
name=self._PACKAGE_PREFIX,
template="main",
cluster_scope=True,
),
)
]
),
)
],
workflow_metadata=WorkflowMetadata(
annotations={"package.argoproj.io/name": self._PACKAGE_NAME}
),
)
payload = [
PackageParameter(
parameter="credentialGuid",
type="credential",
body=self._credentials_body, # type: ignore
)
]
return Workflow(
metadata=metadata,
spec=spec,
payload=payload if self._credentials_body else [], # type: ignore
)
Loading

0 comments on commit 1e234a6

Please sign in to comment.