Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Apr 9, 2025

Why?

Variable.set is a common pattern that is used and in the past it would use the database and session to do so. Adding a similar mechanism in the task sdk execution time variable is needed to be able to set variables using the new task sdk API server mechanism.

What?

I have more of less tried to port the behaviour from here: https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/variable.py#L183-L215

And this also includes porting the write conflict check. I am handling thus for the worker side.

This means that if we are trying to write a variable to the metadata DB but same variable exists in the secrets backend, this will throw a warning.

Testing

  1. From the task level:

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.models.variable import Variable


class CustomOperator(BaseOperator):
    def execute(self, context):
        Variable.set(key="set_key", value="set_value", description="This is being set from task sdk")


with DAG("example_variable_set_sdk", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="set_var")

image

image

  1. At the top level of the dag

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.sdk import Variable

Variable.set(key="set_key_top_level", value="set_value_top_level", description="This is being set from task sdk but at top level")

class CustomOperator(BaseOperator):
    def execute(self, context):
        print("Inside operator")


with DAG("example_variable_set_sdk_top_level", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="set_var")

image

Testing with a workers secret backend defined

Setting up a workers secrets backend as LocalFilesystemBackend with this env:

export AIRFLOW__WORKERS__SECRETS_BACKEND="airflow.secrets.local_filesystem.LocalFilesystemBackend"
export AIRFLOW__WORKERS__SECRETS_BACKEND_KWARGS='{"connections_file_path": "/files/conn.json", "variables_file_path": "/files/var.json"}'

var.json:

{
    "var_a": "var_a_value"
}

Verified the setup:

root@765fc4803c7c:/opt/airflow# airflow config get-value workers secrets_backend
airflow.secrets.local_filesystem.LocalFilesystemBackend
root@765fc4803c7c:/opt/airflow# airflow config get-value workers secrets_backend_kwargs
{"connections_file_path": "/files/conn.json", "variables_file_path": "/files/var.json"

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable

def get_var_for_secrets_backend():
    variable = Variable.get("var_a")
    print("The variable from secrets backend is", variable)

def set_var_for_secrets_backend():
    Variable.set(key="var_a", value="trying to override")


with DAG("set_var_for_secrets_backend_dag", schedule=None, catchup=False) as dag:
    getter = PythonOperator(
        task_id="get_var_for_secrets_backend",
        python_callable=get_var_for_secrets_backend,
    )

    setter = PythonOperator(
        task_id="set_var_for_secrets_backend",
        python_callable=set_var_for_secrets_backend,
    )


    getter >> setter

This dag first prints the value of the variable present in the secrets backend.

Task1:
image

Task 2 trying to override it:
image

However the DB has it:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

Draft at this stage but working on fixing the tests and testing various things.

@amoghrajesh amoghrajesh merged commit 458ac63 into apache:main Apr 10, 2025
67 checks passed
@amoghrajesh amoghrajesh deleted the variable-set-tasksdk branch April 10, 2025 07:26
@jscheffl
Copy link
Contributor

Just tested again and as noted in #47920 I still see my integration test DAG failing :-(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants