Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airflow): add dag AllowDenyPattern config #11472

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
```

| Name | Default value | Description |
| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
Expand All @@ -144,6 +144,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. |

#### Validate that the plugin is working

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import datahub.emitter.mce_builder as builder
from airflow.configuration import conf
from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from pydantic.fields import Field

if TYPE_CHECKING:
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -56,6 +57,11 @@ class DatahubLineageConfig(ConfigModel):

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

dag_filter_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for DAGs to ingest",
)

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -87,6 +93,9 @@ def get_lineage_config() -> DatahubLineageConfig:
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)
dag_filter_pattern = AllowDenyPattern.parse_raw(
conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}')
)

return DatahubLineageConfig(
enabled=enabled,
Expand All @@ -102,4 +111,5 @@ def get_lineage_config() -> DatahubLineageConfig:
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
dag_filter_pattern=dag_filter_pattern,
)
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,15 @@ def on_task_instance_running(
return

logger.debug(
f"DataHub listener got notification about task instance start for {task_instance.task_id}"
f"DataHub listener got notification about task instance start for {task_instance.task_id} of dag {task_instance.dag_run.dag_id}"
)

if not self.config.dag_filter_pattern.allowed(task_instance.dag_run.dag_id):
logger.debug(
f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern"
)
return

task_instance = _render_templates(task_instance)

# The type ignore is to placate mypy on Airflow 2.1.x.
Expand Down Expand Up @@ -490,6 +496,10 @@ def on_task_instance_finish(

dag: "DAG" = task.dag # type: ignore[assignment]

if not self.config.dag_filter_pattern.allowed(dag.dag_id):
logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern")
return

datajob = AirflowGenerator.generate_datajob(
cluster=self.config.cluster,
task=task,
Expand Down Expand Up @@ -687,8 +697,12 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
f"DataHub listener got notification about dag run start for {dag_run.dag_id}"
)

self.on_dag_start(dag_run)
dushayntAW marked this conversation as resolved.
Show resolved Hide resolved
assert dag_run.dag_id
if not self.config.dag_filter_pattern.allowed(dag_run.dag_id):
logger.debug(f"DAG {dag_run.dag_id} is not allowed by the pattern")
return

self.on_dag_start(dag_run)
self.emitter.flush()

# TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

from datahub_airflow_plugin.entities import Dataset, Urn

with DAG(
"dag_to_skip",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
task1 = BashOperator(
task_id="dag_to_skip_task_1",
dag=dag,
bash_command="echo 'dag_to_skip_task_1'",
inlets=[
Dataset(platform="snowflake", name="mydb.schema.tableA"),
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"),
],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)

task2 = BashOperator(
task_id="dag_to_skip_task_2",
dag=dag,
bash_command="echo 'dag_to_skip_task_2'",
)

task1 >> task2
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
DAGS_FOLDER = pathlib.Path(__file__).parent / "dags"
GOLDENS_FOLDER = pathlib.Path(__file__).parent / "goldens"

DAG_TO_SKIP_INGESTION = "dag_to_skip"


@dataclasses.dataclass
class AirflowInstance:
Expand Down Expand Up @@ -140,6 +142,7 @@ def _run_airflow(
# Configure the datahub plugin and have it write the MCPs to a file.
"AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True",
"AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name,
"AIRFLOW__DATAHUB__DAG_FILTER_STR": f'{{ "deny": ["{DAG_TO_SKIP_INGESTION}"] }}',
f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection(
conn_id="datahub_file_default",
conn_type="datahub-file",
Expand Down Expand Up @@ -276,6 +279,7 @@ class DagTestCase:
test_cases = [
DagTestCase("simple_dag"),
DagTestCase("basic_iolets"),
DagTestCase("dag_to_skip", v2_only=True),
DagTestCase("snowflake_operator", success=False, v2_only=True),
DagTestCase("sqlite_operator", v2_only=True),
DagTestCase("custom_operator_dag", v2_only=True),
Expand Down Expand Up @@ -373,20 +377,26 @@ def test_airflow_plugin(
print("Sleeping for a few seconds to let the plugin finish...")
time.sleep(10)

_sanitize_output_file(airflow_instance.metadata_file)

check_golden_file(
pytestconfig=pytestconfig,
output_path=airflow_instance.metadata_file,
golden_path=golden_path,
ignore_paths=[
# TODO: If we switched to Git urls, maybe we could get this to work consistently.
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]",
],
)
"""
we need to check that the golden file is missing / empty
when the dag_id is DAG_TO_SKIP_INGESTION
otherwise, this test doesn't actually do anything
dushayntAW marked this conversation as resolved.
Show resolved Hide resolved
"""
if dag_id != DAG_TO_SKIP_INGESTION:
_sanitize_output_file(airflow_instance.metadata_file)

check_golden_file(
pytestconfig=pytestconfig,
output_path=airflow_instance.metadata_file,
golden_path=golden_path,
ignore_paths=[
# TODO: If we switched to Git urls, maybe we could get this to work consistently.
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]",
],
)


def _sanitize_output_file(output_path: pathlib.Path) -> None:
Expand Down
Loading