From 267e96a8c469393f20e06e9656527f475f29cbce Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Wed, 11 Dec 2024 13:57:46 +0100
Subject: [PATCH 01/11] Add way to disable Airflow plugin without a restart

---
 .../_datahub_listener_module.py                       | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
index 0e1ef69ebf18c..76a3895a9779e 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
@@ -1,3 +1,5 @@
+from airflow.models import Variable
+
 from datahub_airflow_plugin.datahub_listener import (
     get_airflow_plugin_listener,
     hookimpl,
@@ -14,16 +16,23 @@
     @hookimpl
     def on_task_instance_running(previous_state, task_instance, session):
         assert _listener
+        # This is a bit hacky way to provide a way to disable the listener
+        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
+            return
         _listener.on_task_instance_running(previous_state, task_instance, session)
 
     @hookimpl
     def on_task_instance_success(previous_state, task_instance, session):
         assert _listener
+        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
+            return
         _listener.on_task_instance_success(previous_state, task_instance, session)
 
     @hookimpl
     def on_task_instance_failed(previous_state, task_instance, session):
         assert _listener
+        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
+            return
         _listener.on_task_instance_failed(previous_state, task_instance, session)
 
     if hasattr(_listener, "on_dag_run_running"):
@@ -31,4 +40,6 @@ def on_task_instance_failed(previous_state, task_instance, session):
         @hookimpl
         def on_dag_run_running(dag_run, msg):
             assert _listener
+            if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
+                return
             _listener.on_dag_run_running(dag_run, msg)

From 15118338e2b2d47b3f8c0783fa8b347fb4da0012 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Wed, 11 Dec 2024 22:47:16 +0100
Subject: [PATCH 02/11] Add kill-switch to plugin v2

---
 .../_datahub_listener_module.py               | 10 -------
 .../datahub_listener.py                       | 26 +++++++++++++++++++
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
index 76a3895a9779e..f691efabd1d25 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
@@ -1,5 +1,3 @@
-from airflow.models import Variable
-
 from datahub_airflow_plugin.datahub_listener import (
     get_airflow_plugin_listener,
     hookimpl,
@@ -17,22 +15,16 @@
     def on_task_instance_running(previous_state, task_instance, session):
         assert _listener
         # This is a bit hacky way to provide a way to disable the listener
-        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
-            return
         _listener.on_task_instance_running(previous_state, task_instance, session)
 
     @hookimpl
     def on_task_instance_success(previous_state, task_instance, session):
         assert _listener
-        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
-            return
         _listener.on_task_instance_success(previous_state, task_instance, session)
 
     @hookimpl
     def on_task_instance_failed(previous_state, task_instance, session):
         assert _listener
-        if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
-            return
         _listener.on_task_instance_failed(previous_state, task_instance, session)
 
     if hasattr(_listener, "on_dag_run_running"):
@@ -40,6 +32,4 @@ def on_task_instance_failed(previous_state, task_instance, session):
         @hookimpl
         def on_dag_run_running(dag_run, msg):
             assert _listener
-            if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
-                return
             _listener.on_dag_run_running(dag_run, msg)
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index e00cf51ea456c..94394a317e020 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -9,6 +9,7 @@
 
 import airflow
 import datahub.emitter.mce_builder as builder
+from airflow.models import Variable
 from airflow.models.serialized_dag import SerializedDagModel
 from datahub.api.entities.datajob import DataJob
 from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
@@ -50,6 +51,8 @@
     entities_to_dataset_urn_list,
 )
 
+KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"
+
 _F = TypeVar("_F", bound=Callable[..., None])
 if TYPE_CHECKING:
     from airflow.datasets import Dataset
@@ -371,6 +374,9 @@ def on_task_instance_running(
         task_instance: "TaskInstance",
         session: "Session",  # This will always be QUEUED
     ) -> None:
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            return
+
         self._set_log_level()
 
         # This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
@@ -481,6 +487,8 @@ def on_task_instance_running(
     def on_task_instance_finish(
         self, task_instance: "TaskInstance", status: InstanceRunResult
     ) -> None:
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            return
         dagrun: "DagRun" = task_instance.dag_run  # type: ignore[attr-defined]
 
         if self.config.render_templates:
@@ -540,6 +548,9 @@ def on_task_instance_finish(
     def on_task_instance_success(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            return
+
         self._set_log_level()
 
         logger.debug(
@@ -555,6 +566,9 @@ def on_task_instance_success(
     def on_task_instance_failed(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            return
+
         self._set_log_level()
 
         logger.debug(
@@ -568,6 +582,9 @@ def on_task_instance_failed(
         )
 
     def on_dag_start(self, dag_run: "DagRun") -> None:
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            return
+
         dag = dag_run.dag
         if not dag:
             logger.warning(
@@ -695,6 +712,9 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
         @hookimpl
         @run_in_thread
         def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
+            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+                return
+
             self._set_log_level()
 
             logger.debug(
@@ -716,6 +736,9 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
         @hookimpl
         @run_in_thread
         def on_dataset_created(self, dataset: "Dataset") -> None:
+            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+                return
+
             self._set_log_level()
 
             logger.debug(
@@ -725,6 +748,9 @@ def on_dataset_created(self, dataset: "Dataset") -> None:
         @hookimpl
         @run_in_thread
         def on_dataset_changed(self, dataset: "Dataset") -> None:
+            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+                return
+
             self._set_log_level()
 
             logger.debug(

From 7d0b511f74f2f18f621b0b2d44e247a58f6d7478 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Wed, 11 Dec 2024 22:48:54 +0100
Subject: [PATCH 03/11] Remove unneded comment

---
 .../src/datahub_airflow_plugin/_datahub_listener_module.py       | 1 -
 1 file changed, 1 deletion(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
index f691efabd1d25..0e1ef69ebf18c 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py
@@ -14,7 +14,6 @@
     @hookimpl
     def on_task_instance_running(previous_state, task_instance, session):
         assert _listener
-        # This is a bit hacky way to provide a way to disable the listener
         _listener.on_task_instance_running(previous_state, task_instance, session)
 
     @hookimpl

From 2d62bffbbc4d725daeb7a7530c95a9254373a044 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 09:50:40 +0100
Subject: [PATCH 04/11] Moving kill switch into run in thread

---
 .../datahub_listener.py                       | 30 ++++---------------
 1 file changed, 5 insertions(+), 25 deletions(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index 94394a317e020..ef654fb8c2a80 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -51,8 +51,6 @@
     entities_to_dataset_urn_list,
 )
 
-KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"
-
 _F = TypeVar("_F", bound=Callable[..., None])
 if TYPE_CHECKING:
     from airflow.datasets import Dataset
@@ -81,6 +79,8 @@ def hookimpl(f: _F) -> _F:  # type: ignore[misc] # noqa: F811
 )
 _DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"
 
+KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"
+
 
 def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
     # Using globals instead of functools.lru_cache to make testing easier.
@@ -124,6 +124,9 @@ def run_in_thread(f: _F) -> _F:
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
         try:
+            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+                logger.info("DataHub listener disabled by kill switch")
+                return cast(_F, wrapper)
             if _RUN_IN_THREAD:
                 # A poor-man's timeout mechanism.
                 # This ensures that we don't hang the task if the extractors
@@ -374,9 +377,6 @@ def on_task_instance_running(
         task_instance: "TaskInstance",
         session: "Session",  # This will always be QUEUED
     ) -> None:
-        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-            return
-
         self._set_log_level()
 
         # This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
@@ -487,8 +487,6 @@ def on_task_instance_running(
     def on_task_instance_finish(
         self, task_instance: "TaskInstance", status: InstanceRunResult
     ) -> None:
-        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-            return
         dagrun: "DagRun" = task_instance.dag_run  # type: ignore[attr-defined]
 
         if self.config.render_templates:
@@ -548,9 +546,6 @@ def on_task_instance_finish(
     def on_task_instance_success(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
-        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-            return
-
         self._set_log_level()
 
         logger.debug(
@@ -566,9 +561,6 @@ def on_task_instance_success(
     def on_task_instance_failed(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
-        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-            return
-
         self._set_log_level()
 
         logger.debug(
@@ -582,9 +574,6 @@ def on_task_instance_failed(
         )
 
     def on_dag_start(self, dag_run: "DagRun") -> None:
-        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-            return
-
         dag = dag_run.dag
         if not dag:
             logger.warning(
@@ -712,9 +701,6 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
         @hookimpl
         @run_in_thread
         def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
-            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-                return
-
             self._set_log_level()
 
             logger.debug(
@@ -736,9 +722,6 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
         @hookimpl
         @run_in_thread
         def on_dataset_created(self, dataset: "Dataset") -> None:
-            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-                return
-
             self._set_log_level()
 
             logger.debug(
@@ -748,9 +731,6 @@ def on_dataset_created(self, dataset: "Dataset") -> None:
         @hookimpl
         @run_in_thread
         def on_dataset_changed(self, dataset: "Dataset") -> None:
-            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-                return
-
             self._set_log_level()
 
             logger.debug(

From cf78fa5fc3d92024bcbf9721394c8a05035148a4 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 10:01:27 +0100
Subject: [PATCH 05/11] Add doc for kill-switch

---
 docs/lineage/airflow.md | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md
index 829c048a8f8e2..1178feaf0d851 100644
--- a/docs/lineage/airflow.md
+++ b/docs/lineage/airflow.md
@@ -80,6 +80,37 @@ enabled = True  # default
 | log_level                  | _no change_          | [debug] Set the log level for the plugin.                                                |
 | debug_emitter              | false                | [debug] If true, the plugin will log the emitted events.                                 |
 
+#### Disabling the DataHub Plugin v2
+
+There are two ways to disable the DataHub Plugin v2:
+
+##### 1. Disable via Configuration
+
+Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.
+
+```ini title="airflow.cfg"
+[datahub]
+enabled = False
+```
+
+##### 2. Disable via Airflow Variable (Kill-Switch)
+
+If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.
+
+##### Command Line
+
+```shell
+airflow variables set datahub_airflow_plugin_disable_listener true
+```
+
+##### Airflow UI
+
+1. Go to Admin -> Variables.
+2. Click the "+" symbol to create a new variable.
+3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.
+
+This will immediately disable the plugin without requiring a restart.
+
 ## DataHub Plugin v1
 
 ### Installation

From b64bb22e2d937048b4f0784a064a821d354a2e81 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 10:09:43 +0100
Subject: [PATCH 06/11] Fix mypy check

---
 .../src/datahub_airflow_plugin/_airflow_shims.py            | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
index c1e2dd4cc422d..fc8bbc5a233fb 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
@@ -1,4 +1,4 @@
-from typing import List
+from typing import List, Optional
 
 import airflow.version
 import packaging.version
@@ -40,7 +40,7 @@
 ) or PLUGGY_VERSION <= packaging.version.parse("1.0.0")
 
 
-def get_task_inlets(operator: "Operator") -> List:
+def get_task_inlets(operator: "Operator") -> Optional[List]:
     # From Airflow 2.4 _inlets is dropped and inlets used consistently. Earlier it was not the case, so we have to stick there to _inlets
     if hasattr(operator, "_inlets"):
         return operator._inlets  # type: ignore[attr-defined, union-attr]
@@ -49,7 +49,7 @@ def get_task_inlets(operator: "Operator") -> List:
     return operator.inlets
 
 
-def get_task_outlets(operator: "Operator") -> List:
+def get_task_outlets(operator: "Operator") -> Optional[List]:
     # From Airflow 2.4 _outlets is dropped and inlets used consistently. Earlier it was not the case, so we have to stick there to _outlets
     # We have to use _outlets because outlets is empty in Airflow < 2.4.0
     if hasattr(operator, "_outlets"):

From f5f47e3c02128f471b655800ae5f2a7e1349daa9 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 10:25:49 +0100
Subject: [PATCH 07/11] Fix shims

---
 .../src/datahub_airflow_plugin/_airflow_shims.py            | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
index fc8bbc5a233fb..ed0416a854d9a 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
@@ -40,7 +40,7 @@
 ) or PLUGGY_VERSION <= packaging.version.parse("1.0.0")
 
 
-def get_task_inlets(operator: "Operator") -> Optional[List]:
+def get_task_inlets(operator: "Operator") -> List:
     # From Airflow 2.4 _inlets is dropped and inlets used consistently. Earlier it was not the case, so we have to stick there to _inlets
     if hasattr(operator, "_inlets"):
         return operator._inlets  # type: ignore[attr-defined, union-attr]
@@ -49,14 +49,14 @@ def get_task_inlets(operator: "Operator") -> Optional[List]:
     return operator.inlets
 
 
-def get_task_outlets(operator: "Operator") -> Optional[List]:
+def get_task_outlets(operator: "Operator") -> List:
     # From Airflow 2.4 _outlets is dropped and inlets used consistently. Earlier it was not the case, so we have to stick there to _outlets
     # We have to use _outlets because outlets is empty in Airflow < 2.4.0
     if hasattr(operator, "_outlets"):
         return operator._outlets  # type: ignore[attr-defined, union-attr]
     if hasattr(operator, "get_outlet_defs"):
         return operator.get_outlet_defs()
-    return operator.outlets
+    return operator.outlets if operator.outlets else []
 
 
 __all__ = [

From 27791f482f7ac8aa4e39d21129a2b217ae36f805 Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 10:30:25 +0100
Subject: [PATCH 08/11] Remove unused import

---
 .../airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
index 211def1716863..d86a46e042e8f 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py
@@ -1,4 +1,4 @@
-from typing import List, Optional
+from typing import List
 
 import airflow.version
 import packaging.version

From 5423fa36bd2c1e486175ea2e71871dd33ae04aca Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Thu, 12 Dec 2024 11:33:21 +0100
Subject: [PATCH 09/11] Move out kill switch from in thread as it breaks the
 transaction

---
 .../datahub_listener.py                       | 24 ++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index 85938aa5b9433..4aa684e70dfdb 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -125,9 +125,6 @@ def run_in_thread(f: _F) -> _F:
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
         try:
-            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-                logger.info("DataHub listener disabled by kill switch")
-                return cast(_F, wrapper)
             if _RUN_IN_THREAD:
                 # A poor-man's timeout mechanism.
                 # This ensures that we don't hang the task if the extractors
@@ -370,6 +367,15 @@ def _extract_lineage(
                     redact_with_exclusions(v)
                 )
 
+    def check_kill_switch(self):
+        try:
+            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+                logger.info("DataHub listener disabled by kill switch")
+                return True
+        except Exception as e:
+            raise e
+        return False
+
     @hookimpl
     @run_in_thread
     def on_task_instance_running(
@@ -378,6 +384,8 @@ def on_task_instance_running(
         task_instance: "TaskInstance",
         session: "Session",  # This will always be QUEUED
     ) -> None:
+        if self.check_kill_switch():
+            return
         self._set_log_level()
 
         # This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
@@ -488,6 +496,10 @@ def on_task_instance_running(
     def on_task_instance_finish(
         self, task_instance: "TaskInstance", status: InstanceRunResult
     ) -> None:
+        if self.check_kill_switch():
+            return
+        self._set_log_level()
+
         dagrun: "DagRun" = task_instance.dag_run  # type: ignore[attr-defined]
 
         if self.config.render_templates:
@@ -547,6 +559,9 @@ def on_task_instance_finish(
     def on_task_instance_success(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
+        if self.check_kill_switch():
+            return
+
         self._set_log_level()
 
         logger.debug(
@@ -562,6 +577,9 @@ def on_task_instance_success(
     def on_task_instance_failed(
         self, previous_state: None, task_instance: "TaskInstance", session: "Session"
     ) -> None:
+        if self.check_kill_switch():
+            return
+
         self._set_log_level()
 
         logger.debug(

From 307321307357e0da7334386eea41161a475c15ec Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Fri, 13 Dec 2024 10:21:33 +0100
Subject: [PATCH 10/11] Pr review fixes

---
 docs/lineage/airflow.md                       | 62 +++++++++----------
 .../datahub_listener.py                       | 19 +++---
 2 files changed, 40 insertions(+), 41 deletions(-)

diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md
index 7922f3e917075..13dd439de7fac 100644
--- a/docs/lineage/airflow.md
+++ b/docs/lineage/airflow.md
@@ -80,37 +80,6 @@ enabled = True  # default
 | log_level                  | _no change_          | [debug] Set the log level for the plugin.                                                |
 | debug_emitter              | false                | [debug] If true, the plugin will log the emitted events.                                 |
 
-#### Disabling the DataHub Plugin v2
-
-There are two ways to disable the DataHub Plugin v2:
-
-##### 1. Disable via Configuration
-
-Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.
-
-```ini title="airflow.cfg"
-[datahub]
-enabled = False
-```
-
-##### 2. Disable via Airflow Variable (Kill-Switch)
-
-If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.
-
-##### Command Line
-
-```shell
-airflow variables set datahub_airflow_plugin_disable_listener true
-```
-
-##### Airflow UI
-
-1. Go to Admin -> Variables.
-2. Click the "+" symbol to create a new variable.
-3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.
-
-This will immediately disable the plugin without requiring a restart.
-
 ## DataHub Plugin v1
 
 ### Installation
@@ -370,6 +339,37 @@ TypeError: on_task_instance_success() missing 3 required positional arguments: '
 
 The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details.
 
+### Disabling the DataHub Plugin v2
+
+There are two ways to disable the DataHub Plugin v2:
+
+#### 1. Disable via Configuration
+
+Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.
+
+```ini title="airflow.cfg"
+[datahub]
+enabled = False
+```
+
+#### 2. Disable via Airflow Variable (Kill-Switch)
+
+If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.
+
+#### Command Line
+
+```shell
+airflow variables set datahub_airflow_plugin_disable_listener true
+```
+
+#### Airflow UI
+
+1. Go to Admin -> Variables.
+2. Click the "+" symbol to create a new variable.
+3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.
+
+This will immediately disable the plugin without requiring a restart.
+
 ## Compatibility
 
 We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index 4aa684e70dfdb..ed610f5ba8f7c 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -368,12 +368,9 @@ def _extract_lineage(
                 )
 
     def check_kill_switch(self):
-        try:
-            if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
-                logger.info("DataHub listener disabled by kill switch")
-                return True
-        except Exception as e:
-            raise e
+        if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
+            logger.debug("DataHub listener disabled by kill switch")
+            return True
         return False
 
     @hookimpl
@@ -468,6 +465,9 @@ def on_task_instance_running(
             f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}"
         )
 
+        self.materialize_iolets(datajob)
+
+    def materialize_iolets(self, datajob: DataJob):
         if self.config.materialize_iolets:
             for outlet in datajob.outlets:
                 reported_time: int = int(time.time() * 1000)
@@ -496,10 +496,6 @@ def on_task_instance_running(
     def on_task_instance_finish(
         self, task_instance: "TaskInstance", status: InstanceRunResult
     ) -> None:
-        if self.check_kill_switch():
-            return
-        self._set_log_level()
-
         dagrun: "DagRun" = task_instance.dag_run  # type: ignore[attr-defined]
 
         if self.config.render_templates:
@@ -720,6 +716,9 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
         @hookimpl
         @run_in_thread
         def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
+            if self.check_kill_switch():
+                return
+
             self._set_log_level()
 
             logger.debug(

From f49152d0ac923240ed79acd3d8e62e1260e0e82b Mon Sep 17 00:00:00 2001
From: treff7es <treff7es@gmail.com>
Date: Fri, 13 Dec 2024 13:40:15 +0100
Subject: [PATCH 11/11] Lint fix

---
 .../src/datahub_airflow_plugin/datahub_listener.py              | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index ed610f5ba8f7c..640991a90a1d2 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -467,7 +467,7 @@ def on_task_instance_running(
 
         self.materialize_iolets(datajob)
 
-    def materialize_iolets(self, datajob: DataJob):
+    def materialize_iolets(self, datajob: DataJob) -> None:
         if self.config.materialize_iolets:
             for outlet in datajob.outlets:
                 reported_time: int = int(time.time() * 1000)