Skip to content

Commit

Permalink
Add git_source to DatabricksSubmitRunOperator (#23620)
Browse files Browse the repository at this point in the history
The existing `DatabricksSubmitRunOperator` is extended with the support for the `git_source` parameter which allows users to run notebook tasks from files committed to git repositories.

If specified, any notebook task that is part of the payload will clone the repository and check out the commit, tag, or the tip of the specified branch. This is an alternative to dev repos ([docs](https://docs.databricks.com/repos/index.html)) where the checkout/update would have to be triggered manually.

Public documentation for the feature available here: https://docs.databricks.com/dev-tools/api/latest/jobs.html (NB: as noted in the docs, the feature is currently in public preview).
  • Loading branch information
akolar-db authored May 13, 2022
1 parent 2111d73 commit d0a5b3a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ class DatabricksSubmitRunOperator(BaseOperator):
might be a floating point number).
:param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
:param do_xcom_push: Whether we should push run_id and run_page_url to xcom.
:param git_source: Optional specification of a remote git repository from which
supported task types are retrieved.
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
"""

# Used in airflow.models.BaseOperator
Expand Down Expand Up @@ -316,6 +321,7 @@ def __init__(
idempotency_token: Optional[str] = None,
access_control_list: Optional[List[Dict[str, str]]] = None,
wait_for_termination: bool = True,
git_source: Optional[Dict[str, str]] = None,
**kwargs,
) -> None:
"""Creates a new ``DatabricksSubmitRunOperator``."""
Expand Down Expand Up @@ -355,6 +361,8 @@ def __init__(
self.json['idempotency_token'] = idempotency_token
if access_control_list is not None:
self.json['access_control_list'] = access_control_list
if git_source is not None:
self.json['git_source'] = git_source

self.json = _deep_string_coerce(self.json)
# This variable will be used in case our task gets killed.
Expand Down
18 changes: 18 additions & 0 deletions tests/providers/databricks/operators/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ def test_init_with_templating(self):
)
assert expected == op.json

def test_init_with_git_source(self):
json = {'new_cluster': NEW_CLUSTER, 'notebook_task': NOTEBOOK_TASK, 'run_name': RUN_NAME}
git_source = {
'git_url': 'https://github.com/apache/airflow',
'git_provider': 'github',
'git_branch': 'main',
}
op = DatabricksSubmitRunOperator(task_id=TASK_ID, git_source=git_source, json=json)
expected = databricks_operator._deep_string_coerce(
{
'new_cluster': NEW_CLUSTER,
'notebook_task': NOTEBOOK_TASK,
'run_name': RUN_NAME,
'git_source': git_source,
}
)
assert expected == op.json

def test_init_with_bad_type(self):
json = {'test': datetime.now()}
# Looks a bit weird since we have to escape regex reserved symbols.
Expand Down

0 comments on commit d0a5b3a

Please sign in to comment.