Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/airflow): Add way to disable Airflow plugin without a restart #12098

Merged
merged 12 commits into from
Dec 16, 2024
31 changes: 31 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,37 @@ enabled = True # default
| log_level | _no change_ | [debug] Set the log level for the plugin. |
| debug_emitter | false | [debug] If true, the plugin will log the emitted events. |

#### Disabling the DataHub Plugin v2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move these docs down to the "Debugging" section


There are two ways to disable the DataHub Plugin v2:

##### 1. Disable via Configuration

Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.

```ini title="airflow.cfg"
[datahub]
enabled = False
```

##### 2. Disable via Airflow Variable (Kill-Switch)

If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.

##### Command Line

```shell
airflow variables set datahub_airflow_plugin_disable_listener true
```

##### Airflow UI

1. Go to Admin -> Variables.
2. Click the "+" symbol to create a new variable.
3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.

This will immediately disable the plugin without requiring a restart.

## DataHub Plugin v1

### Installation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import airflow
import datahub.emitter.mce_builder as builder
from airflow.models import Variable
from airflow.models.serialized_dag import SerializedDagModel
from datahub.api.entities.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
Expand Down Expand Up @@ -78,6 +79,8 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"

KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"


def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
# Using globals instead of functools.lru_cache to make testing easier.
Expand Down Expand Up @@ -364,6 +367,15 @@ def _extract_lineage(
redact_with_exclusions(v)
)

def check_kill_switch(self):
try:
if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
logger.info("DataHub listener disabled by kill switch")
return True
except Exception as e:
raise e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of this try catch?

return False

@hookimpl
@run_in_thread
def on_task_instance_running(
Expand All @@ -372,6 +384,8 @@ def on_task_instance_running(
task_instance: "TaskInstance",
session: "Session", # This will always be QUEUED
) -> None:
if self.check_kill_switch():
return
self._set_log_level()

# This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
Expand Down Expand Up @@ -482,6 +496,10 @@ def on_task_instance_running(
def on_task_instance_finish(
self, task_instance: "TaskInstance", status: InstanceRunResult
) -> None:
if self.check_kill_switch():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is called by the success and failure hooks - so it doesn't need this

return
self._set_log_level()

dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]

if self.config.render_templates:
Expand Down Expand Up @@ -541,6 +559,9 @@ def on_task_instance_finish(
def on_task_instance_success(
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
) -> None:
if self.check_kill_switch():
return

self._set_log_level()

logger.debug(
Expand All @@ -556,6 +577,9 @@ def on_task_instance_success(
def on_task_instance_failed(
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
) -> None:
if self.check_kill_switch():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to add these to the on_dag_... methods

return

self._set_log_level()

logger.debug(
Expand Down
Loading