Skip to content

Commit

Permalink
Revert "Add DatabricksWorkflowPlugin (#40153)"
Browse files Browse the repository at this point in the history
This reverts commit 22ec726.
  • Loading branch information
jscheffl committed Jul 11, 2024
1 parent 7d7a4cd commit 9c1e786
Show file tree
Hide file tree
Showing 14 changed files with 5 additions and 884 deletions.
32 changes: 3 additions & 29 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
DatabricksWorkflowTaskGroup,
WorkflowRunMetadata,
)
from airflow.providers.databricks.plugins.databricks_workflow import (
WorkflowJobRepairSingleTaskLink,
WorkflowJobRunLink,
)
from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger
from airflow.providers.databricks.utils.databricks import _normalise_json_content, validate_trigger_event

Expand Down Expand Up @@ -962,15 +958,6 @@ def __init__(

super().__init__(**kwargs)

if self._databricks_workflow_task_group is not None:
self.operator_extra_links = (
WorkflowJobRunLink(),
WorkflowJobRepairSingleTaskLink(),
)
else:
# Databricks does not support repair for non-workflow tasks, hence do not show the repair link.
self.operator_extra_links = (DatabricksJobRunLink(),)

@cached_property
def _hook(self) -> DatabricksHook:
return self._get_hook(caller=self.caller)
Expand Down Expand Up @@ -1029,17 +1016,12 @@ def _get_run_json(self) -> dict[str, Any]:
raise ValueError("Must specify either existing_cluster_id or new_cluster.")
return run_json

def _launch_job(self, context: Context | None = None) -> int:
def _launch_job(self) -> int:
"""Launch the job on Databricks."""
run_json = self._get_run_json()
self.databricks_run_id = self._hook.submit_run(run_json)
url = self._hook.get_run_page_url(self.databricks_run_id)
self.log.info("Check the job run in Databricks: %s", url)

if self.do_xcom_push and context is not None:
context["ti"].xcom_push(key=XCOM_RUN_ID_KEY, value=self.databricks_run_id)
context["ti"].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=url)

return self.databricks_run_id

def _handle_terminal_run_state(self, run_state: RunState) -> None:
Expand All @@ -1058,15 +1040,7 @@ def _get_current_databricks_task(self) -> dict[str, Any]:
"""Retrieve the Databricks task corresponding to the current Airflow task."""
if self.databricks_run_id is None:
raise ValueError("Databricks job not yet launched. Please run launch_notebook_job first.")
tasks = self._hook.get_run(self.databricks_run_id)["tasks"]

# Because the task_key remains the same across multiple runs, and the Databricks API does not return
# tasks sorted by their attempts/start time, we sort the tasks by start time. This ensures that we
# map the latest attempt (whose status is to be monitored) of the task run to the task_key while
# building the {task_key: task} map below.
sorted_task_runs = sorted(tasks, key=lambda x: x["start_time"])

return {task["task_key"]: task for task in sorted_task_runs}[
return {task["task_key"]: task for task in self._hook.get_run(self.databricks_run_id)["tasks"]}[
self._get_databricks_task_id(self.task_id)
]

Expand Down Expand Up @@ -1151,7 +1125,7 @@ def execute(self, context: Context) -> None:
self.databricks_run_id = workflow_run_metadata.run_id
self.databricks_conn_id = workflow_run_metadata.conn_id
else:
self._launch_job(context=context)
self._launch_job()
if self.wait_for_termination:
self.monitor_databricks_job()

Expand Down
5 changes: 0 additions & 5 deletions airflow/providers/databricks/operators/databricks_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState
from airflow.providers.databricks.plugins.databricks_workflow import (
WorkflowJobRepairAllFailedLink,
WorkflowJobRunLink,
)
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
Expand Down Expand Up @@ -92,7 +88,6 @@ class _CreateDatabricksWorkflowOperator(BaseOperator):
populated after instantiation using the `add_task` method.
"""

operator_extra_links = (WorkflowJobRunLink(), WorkflowJobRepairAllFailedLink())
template_fields = ("notebook_params",)
caller = "_CreateDatabricksWorkflowOperator"

Expand Down
16 changes: 0 additions & 16 deletions airflow/providers/databricks/plugins/__init__.py

This file was deleted.

Loading

0 comments on commit 9c1e786

Please sign in to comment.