Skip to content

Commit

Permalink
add task_key property
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar committed Oct 17, 2024
1 parent b3b4850 commit 6b090b3
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions providers/src/airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ def __init__(
self,
caller: str = "DatabricksTaskBaseOperator",
databricks_conn_id: str = "databricks_default",
databricks_task_key: str | None = None,
databricks_retry_args: dict[Any, Any] | None = None,
databricks_retry_delay: int = 1,
databricks_retry_limit: int = 3,
Expand All @@ -1006,6 +1007,7 @@ def __init__(
):
self.caller = caller
self.databricks_conn_id = databricks_conn_id
self._databricks_task_key = databricks_task_key
self.databricks_retry_args = databricks_retry_args
self.databricks_retry_delay = databricks_retry_delay
self.databricks_retry_limit = databricks_retry_limit
Expand Down Expand Up @@ -1034,6 +1036,26 @@ def __init__(
def _hook(self) -> DatabricksHook:
return self._get_hook(caller=self.caller)

@property
def databricks_task_key(self):
return self._databricks_task_key

@databricks_task_key.setter
def databricks_task_key(self, value: str):
if value is None or isinstance(value, str):
raise TypeError("task_key should be a string or None")
if value is None:
self._databricks_task_key = None
else:
if len(value) > 100:
self.log.warning(
"The task_key '%s' exceeds 100 characters and will be truncated by the Databricks API. "
"This will cause failure when trying to monitor the task. Using hash of the task_key instead.",
"hash is computed using hashlib.md5",
value,
)
self._databricks_task_key = value

def _get_hook(self, caller: str) -> DatabricksHook:
return DatabricksHook(
self.databricks_conn_id,
Expand Down

0 comments on commit 6b090b3

Please sign in to comment.