Skip to content

Commit

Permalink
Issue 997-01 (open-metadata#2476)
Browse files Browse the repository at this point in the history
* Fix linting

* Fixed pyformat errors

* Address comments from PR review

* Added back phony in makefile

* Added back comment
  • Loading branch information
TeddyCr authored Jan 28, 2022
1 parent 10a94b2 commit 4f3e330
Show file tree
Hide file tree
Showing 29 changed files with 885 additions and 136 deletions.
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ disable=no-name-in-module

[TYPECHECK]
ignored-classes=optparse.Values,thread._local,_thread._local,SQLAlchemyHelper,FieldInfo

[FORMAT]
max-line-length=88
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ precommit_install: ## Install the project's precommit hooks from .pre-commit-co
@echo "Make sure to first run install_test first"
pre-commit install

## Python Checkstyle
.PHONY: lint
lint: ## Run pylint on the Python sources to analyze the codebase
find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint
lint: ## Run pylint on the Python sources to analyze the codebase
find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint --ignore-paths=$(PY_SOURCE)/metadata_server/

.PHONY: py_format
py_format: ## Run black and isort to format the Python codebase
Expand Down
8 changes: 6 additions & 2 deletions ingestion/src/airflow_provider_openmetadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
Airflow backend lineage module
"""

import metadata


def get_provider_config():
"""
Get provider configuration
Returns
dict:
"""
return {
"name": "OpenMetadata",
"description": "OpenMetadata <https://open-metadata.org/>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ def lineage_callback(context: Dict[str, str]) -> None:
)

except Exception as exc: # pylint: disable=broad-except
logging.error(f"Lineage Callback exception {exc}")
logging.error("Lineage Callback exception %s", exc)
10 changes: 10 additions & 0 deletions ingestion/src/airflow_provider_openmetadata/lineage/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@


class OpenMetadataLineageConfig(ConfigModel):
"""
Base class for OpenMetada lineage config
Attributes
airflow_service_name (str): name of the service
api_endpoint (str): the endpoint for the API
auth_provider_type (str):
secret_key (str):
"""

airflow_service_name: str = "airflow"
api_endpoint: str = "http://localhost:8585"
auth_provider_type: str = "no-auth"
Expand Down
103 changes: 71 additions & 32 deletions ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,75 @@
OpenMetadata Airflow Lineage Backend
"""

import ast
import json
import os
import traceback
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Union
from typing import TYPE_CHECKING, Dict, List, Optional

from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend

from airflow_provider_openmetadata.lineage.config import (
OpenMetadataLineageConfig,
get_lineage_config,
get_metadata_config,
)
from airflow_provider_openmetadata.lineage.utils import (
ALLOWED_FLOW_KEYS,
ALLOWED_TASK_KEYS,
create_pipeline_entity,
get_or_create_pipeline_service,
get_properties,
get_xlets,
is_airflow_version_1,
parse_lineage_to_openmetadata,
)
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createPipeline import (
CreatePipelineEntityRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceEntityRequest,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.pipelineService import (
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import convert_epoch_to_iso

if TYPE_CHECKING:
from airflow import DAG
from airflow.models.baseoperator import BaseOperator


allowed_task_keys = [
"_downstream_task_ids",
"_inlets",
"_outlets",
"_task_type",
"_task_module",
"depends_on_past",
"email",
"label",
"execution_timeout",
"end_date",
"start_date",
"sla",
"sql",
"task_id",
"trigger_rule",
"wait_for_downstream",
]
allowed_flow_keys = [
"_access_control",
"_concurrency",
"_default_view",
"catchup",
"fileloc",
"is_paused_upon_creation",
"start_date",
"tags",
"timezone",
]


# pylint: disable=import-outside-toplevel, unused-import
def is_airflow_version_1() -> bool:
"""
Manage airflow submodule import based airflow version
Returns
bool
"""
try:
from airflow.hooks.base import BaseHook

return False
except ModuleNotFoundError:
from airflow.hooks.base_hook import BaseHook

return True


# pylint: disable=too-few-public-methods
class OpenMetadataLineageBackend(LineageBackend):
"""
Sends lineage data from tasks to OpenMetadata.
Expand All @@ -75,20 +95,39 @@ class OpenMetadataLineageBackend(LineageBackend):
auth_provider_type = no-auth # use google here if you are
configuring google as SSO
secret_key = google-client-secret-key # it needs to be configured
only if you are using google as SSO
only if you are using google as SSO the one configured in openMetadata
openmetadata_api_endpoint = http://localhost:8585
auth_provider_type = no-auth # use google here if you are configuring google as SSO
secret_key = google-client-secret-key # it needs to be configured
only if you are using google as SSO
"""

def __init__(self) -> None:
"""
Instantiate a superclass object and run lineage config function
"""
super().__init__()
_ = get_lineage_config()

# pylint: disable=protected-access
@staticmethod
def send_lineage(
operator: "BaseOperator",
inlets: Optional[List] = None,
outlets: Optional[List] = None,
context: Dict = None,
) -> None:
"""
Send lineage to OpenMetadata
Args
operator (BaseOperator):
inlets (Optional[List]):
outlets (Optional[List]):
context (Dict):
Returns
None
"""

try:
config = get_lineage_config()
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/airflow_provider_openmetadata/lineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def get_xlets(
return None


# pylint: disable=too-many-arguments
def iso_dag_start_date(props: Dict[str, Any]) -> Optional[str]:
"""
Given a properties dict, return the start_date
Expand Down Expand Up @@ -229,6 +230,7 @@ def create_pipeline_entity(
return client.create_or_update(create_pipeline)


# pylint: disable=too-many-arguments,too-many-locals
def parse_lineage_to_openmetadata(
config: OpenMetadataLineageConfig,
context: Dict,
Expand Down
22 changes: 19 additions & 3 deletions ingestion/src/metadata/ingestion/ometa/auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,29 @@
from metadata.config.common import ConfigModel


@dataclass # type: ignore[misc]
@dataclass(init=False) # type: ignore[misc]
class AuthenticationProvider(metaclass=ABCMeta):
"""
Interface definition for an Authentification provider
"""

@classmethod
@abstractmethod
def create(cls, config: ConfigModel) -> "AuthenticationProvider":
pass
"""
Create authentication
Arguments:
config (ConfigModel): configuration
Returns:
AuthenticationProvider
"""

@abstractmethod
def auth_token(self) -> str:
pass
"""
Authentication token
Returns:
str
"""
Loading

0 comments on commit 4f3e330

Please sign in to comment.