Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

max_active_tis_per_dag not working in Deferrable Operator #44196

Open
1 of 2 tasks
ewelinastr opened this issue Nov 19, 2024 · 1 comment
Open
1 of 2 tasks

max_active_tis_per_dag not working in Deferrable Operator #44196

ewelinastr opened this issue Nov 19, 2024 · 1 comment
Labels
area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:databricks

Comments

@ewelinastr
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

When using DatabricksRunNowOperator with deferrable set to True, option max_active_tis_per_dag is not limiting number of concurrent tasks run.

What you think should happen instead?

When you set max_active_tis_per_dag, Airflow should ensure that:
(the count of running and queued tasks) <= max_active_tis_per_dag
no matter if used with deferrable, or non-defferable setting.
However this is takes effect only for non-defferable, for deferrable operator/setting it is not not limiting number of concurrent tasks run.

How to reproduce

I believe this is the case for all of the defferable-Operators. However I only tried with DatabricksRunNowOperator.
In order to reproduce just use defferable-operator with dynamically created tasks with max_active_tis_per_dag set to 1.
When used with DatabricksRunNowOperator it will create many run of tasks (in Airflow and in Databricks).
You can use below code for dag

from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.decorators import task

with DAG(
    dag_id="something",
    description="something",
    default_args=DEFAULT_ARGS,
    start_date=datetime(2024, 11, 19, 17, 0, 0, tz="UTC")) as dag:

@task
def prepare_dbx_params() -> list[dict[str, str]]:
    some_list = ["aa", "bb", "cc", "dd", "ee", "ff"]
    dbx_params = [
        {
            "python_named_params": {
                "DBX_ARG1": something,
                "DBX_ARG2": something[0]
            }
        }
        for something in some_list
    ]
    return dbx_params


dbx_input_params = prepare_dbx_params()

trigger_dbx_job = DatabricksRunNowOperator.partial(
    task_id="trigger-dbx-job-dag",
    job_name=<NAME OF JOB>,
    databricks_conn_id=<NAME OF DBX CONNECTION>
    deferrable=True,
    max_active_tis_per_dag=1,
).expand_kwargs(dbx_input_params)

trigger_dbx_job

Operating System

macOS

Versions of Apache Airflow Providers

apache_airflow-2.7.3
apache_airflow_providers_databricks-4.6.0

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ewelinastr ewelinastr added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 19, 2024
Copy link

boring-cyborg bot commented Nov 19, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added area:async-operators AIP-40: Deferrable ("Async") Operators provider:databricks labels Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:databricks
Projects
None yet
Development

No branches or pull requests

1 participant