Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DatabricksJobsCreateOperator #32221

Closed
wants to merge 27 commits into from

Conversation

stikkireddy
Copy link
Contributor

Add the DatabricksJobsCreateOperator for use cases where the DatabricksSubmitRunOperator is insufficient.
closes: #29733

Continuation of @kyle-winkelman 's work in #29790.

@boring-cyborg
Copy link

boring-cyborg bot commented Jun 28, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@stikkireddy stikkireddy marked this pull request as ready for review June 28, 2023 01:02
@eladkal eladkal changed the title Provider Databricks add jobs create operator Add DatabricksJobsCreateOperator Jun 28, 2023
@potiuk
Copy link
Member

potiuk commented Jun 28, 2023

cc: @alexott and team ?

@stikkireddy
Copy link
Contributor Author

stikkireddy commented Jun 28, 2023

@potiuk any suggestions on why the Validate provider.yaml files might be failing? i do not see any errors that stand out

I think i figured it out

@potiuk
Copy link
Member

potiuk commented Jun 28, 2023

@potiuk any suggestions on why the Validate provider.yaml files might be failing? i do not see any errors that stand out

I think i figured it out

let's see. approved the run

airflow/providers/databricks/hooks/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/operators/databricks.py Outdated Show resolved Hide resolved
For more information about templating see :ref:`concepts:jinja-templating`.
:param name: An optional name for the job.
:param tags: A map of tags associated with the job.
:param tasks: A list of task specifications to be executed by this job.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice the type is added at the end of the docstring. I don't see this pattern in other databricks operators. Do we need to keep it as we have already annotated it in the __init__ method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other operators don't use data classes, that's why there are no types there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but I thought type annotation was enough? not a blocker, though

airflow/providers/databricks/operators/databricks.py Outdated Show resolved Hide resolved
Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let rename the functions, and check regarding access_control_list - I already asked in the engineering channel

airflow/providers/databricks/hooks/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/hooks/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/operators/databricks.py Outdated Show resolved Hide resolved
Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@potiuk
Copy link
Member

potiuk commented Jul 30, 2023

Needs tests fixing.

def __init__(
self,
*,
json: Any | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Should we change the type to dict instead?

@stikkireddy
Copy link
Contributor Author

hey @potiuk is anything holding this up from being merged? Is it just the type hint comment?

@potiuk
Copy link
Member

potiuk commented Aug 9, 2023

hey @potiuk is anything holding this up from being merged? Is it just the type hint comment?

Mostly just lack of ping after you fixed the tests to run the workflows I think. Generally if you see that you PR needs attention, just comment on it. Otherwise it might be very easily missed that you fixed tests. It's up to the author to bring attention of those who might review it - when you have 1 PR to care about - some reviewers have likely 40-50 to look at.

@stikkireddy
Copy link
Contributor Author

@potiuk i updated branch a little while please let me know if this works.

@potiuk
Copy link
Member

potiuk commented Aug 13, 2023

@potiuk i updated branch a little while please let me know if this works.

Some other PR got merged in the meantime - you need to rebase and resolve conflict. Can you also respond please to all the comments in this PR and confirm that they've been addressed? There were lots of comments from @alexott and I am not sure if all have been addressed.

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

For more information about templating see :ref:`concepts:jinja-templating`.
:param name: An optional name for the job.
:param tags: A map of tags associated with the job.
:param tasks: A list of task specifications to be executed by this job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other operators don't use data classes, that's why there are no types there.

@stikkireddy
Copy link
Contributor Author

@potiuk based on @alexott's comment is this possible to push through? I also resolved all the issues and made the suggested changes.

@potiuk
Copy link
Member

potiuk commented Aug 19, 2023

@potiuk based on @alexott's comment is this possible to push through? I also resolved all the issues and made the suggested changes.

Let's see If our CI agrees to merge it. I just approve it for runing - let me know if/when it succeds (or fails) @stikkireddy

@stikkireddy
Copy link
Contributor Author

@potiuk seems like it failed due to static checks, i will take a look at the type hints. and ping you again once they are resolved.

if name is not None:
self.json["name"] = name
if tags is not None:
self.json["tags"] = tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since json is a templated field, attempting to modify it in this way will fail/not work as expected if the input arg is a string (e.g. "{{ var.json....}}" or an XComArg (meaning it's an output of a previous task). Template fields are not rendered until just before the execute method is called.

It would be best to move modifying json (and generally any template field) to the execute method instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To illustrate the point, let's use this example DAG where we define the json arg in a previous task and use its output:

from __future__ import annotations

from pendulum import datetime
from typing import TYPE_CHECKING, Sequence

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator

if TYPE_CHECKING:
    from airflow.utils.context import Context


class DatabricksCreateJobsOperator(BaseOperator):
    template_fields: Sequence[str] = ("json", "databricks_conn_id")

    def __init__(
        self,
        *,
        json: dict | None = None,
        name: str | None = None,
        tags: dict[str, str] | None = None,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.json = json or {}
        if name is not None:
            self.json["name"] = name
        if tags is not None:
            self.json["tags"] = tags

    def execute(context: Context) -> None:
        pass

@dag(start_date=datetime(2023, 1, 1), schedule=None)
def derived_template_fields():
    @task
    def push_json() -> dict[str, str]:
        return {"key1": "val1", "key2": "val2"}

    json = push_json()

    DatabricksCreateJobsOperator(
        task_id="create_job_w_json", json=json, name="some_name", tags={"key3": "value3"}
    )


derived_template_fields()

DAG parsing fails with:

Running: airflow dags reserialize
[2023-08-31T14:29:57.796+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:29:57.816+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:29:58.533+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:29:58.615+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 50, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 41, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'PlainXComArg' object does not support item assignment

Even if we change the json arg assignment to use the classic XCom Jinja template approach (i.e. json = "{{ ti.xcom_pull(task_ids='push_json') }}"), the DAG fails to parse:

Running: airflow dags reserialize
[2023-08-31T14:32:01.553+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:32:01.574+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:32:02.341+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:32:02.415+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 51, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 42, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'str' object does not support item assignment

Perhaps it's possible users haven't needed a use case for predefining a json arg from a previous task, Airflow Variable, DAG Param, etc. (accessed by a Jinja templates).

Seems like there is now some movement on addressing #29069 to prevent this in the future too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-fell well spotted, that's a relevant concern! Since there is a separate ticket to address this issue, would it be okay if we consider this change outside of the scope of the current PR?

Copy link
Contributor

@josh-fell josh-fell Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally fair. All of the Databricks operators would need to be cleaned up, but I also haven't heard of anyone running into this issue within their use cases.

I can log a separate issue and maybe we can get some folks to contribute! It's a solid "good first issue".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great, thanks a lot, @josh-fell! Please share the ticket once you create it - it's valuable and relevant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the mighty delay here, just got around to logging the issue.

@tatiana
Copy link
Contributor

tatiana commented Oct 10, 2023

Hey @stikkireddy, you've come a long way on this - is there any chance you could resolve the conflicts & have the CI tests pass, it feels like we're super close to merging this feature.

For more information about templating see :ref:`concepts:jinja-templating`.
:param name: An optional name for the job.
:param tags: A map of tags associated with the job.
:param tasks: A list of task specifications to be executed by this job.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but I thought type annotation was enough? not a blocker, though

@@ -299,6 +299,7 @@
"aiohttp>=3.6.3, <4",
"apache-airflow-providers-common-sql>=1.5.0",
"apache-airflow>=2.4.0",
"databricks-sdk>=0.1.11, <1.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need/want to pin it to <1.0.0?

@Lee-W Lee-W self-requested a review October 11, 2023 02:35
Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few questions but they're non-blockers. Also we might need to fix the CI failures.

@@ -60,11 +60,13 @@ dependencies:
# The 2.9.1 (to be released soon) already contains the fix
- databricks-sql-connector>=2.0.0, <3.0.0, !=2.9.0
- aiohttp>=3.6.3, <4
- databricks-sdk==0.10.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why limit to specific version? we rarely do this

I also don't understand if this is dependency or extra dependency ?

- name: sdk
description: Install Databricks SDK
dependencies:
- databricks-sdk==0.10.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

@pankajastro
Copy link
Member

Wondering why the job Build Images / Build CI images is getting cancelled and looks like it persisting on this PR.

@potiuk
Copy link
Member

potiuk commented Oct 15, 2023

Should be rebased now. The issue should be fixed with 3 PRs I did during the last few days.

Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 26, 2023
@github-actions github-actions bot closed this Dec 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Databricks create/reset then run-now
10 participants