Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 6, 2025

closes: #45421

Summary of changes

  1. Added a minimal Variable user-facing definition which will be used in DAG files by DAG authors
  2. Added logic to get Variables in the context - both in "value" and "json" format
  • "value" is the raw form
  • "json" is the deserialised json form, we are trying to keep the contract between SDK and API server simple, they interact only in strings and the responsibility of serialising + deserialising lies on the client before sending it to the task sdk, not on the API server. This will enable multi language support too.

Object Glossary

-VariableResponse is auto-generated and tightly coupled with the API schema.
-VariableResult is runtime-specific and meant for internal communication between Supervisor & Task Runner.
-Variable class here is where the public-facing, user-relevant aspects are exposed, hiding internal details.

Testing

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag


class CustomOperator(BaseOperator):
    def execute(self, context):
        import os
        os.environ["AIRFLOW_VAR_HI_MESSAGE"] = "hello_world"
        os.environ["AIRFLOW_VAR_JSON_VAR"] = "{\r\n  \"key1\": \"value1\",\r\n  \"key2\": \"value2\",\r\n  \"enabled\": true,\r\n  \"threshold\": 42\r\n}"
        task_id = context["task_instance"].task_id
        print(f"Hello World {task_id}!")
        print(context)
        print(context["var"]["value"].hi_message)
        print(context["var"]["json"].json_var)


@dag()
def var_from_context():
    CustomOperator(task_id="hello")


var_from_context()

This dag tests both the scenarios of a regular value context as well as json.

Case1: Variables found

image

Logs:

2c27ff9a5949
 ▶ Log message source details
2025-01-06 12:37:34.612546 [info     ] Filling up the DagBag from /files/dags/var_from_context.py [airflow.models.dagbag.DagBag]
2025-01-06 12:37:34.613168 [debug    ] Importing /files/dags/var_from_context.py [airflow.models.dagbag.DagBag]
2025-01-06 12:37:34.616934 [debug    ] Loaded DAG <DAG: var_from_context> [airflow.models.dagbag.DagBag]
2025-01-06 12:37:34.617131 [debug    ] DAG file parsed                [task] file=/files/dags/var_from_context.py
2025-01-06 12:37:34.641905 [warning  ] CustomOperator.execute cannot be called outside TaskInstance! [airflow.task.operators.unusual_prefix_c6632fd34e048ff55a9057c21ee5a54c16b99828_var_from_context.CustomOperator]
2025-01-06 12:37:34.642284 [info     ] Hello World hello!             [task] chan=stdout
2025-01-06 12:37:34.642393 [info     ] {'dag': <DAG: var_from_context>, 'inlets': [], 'map_index_template': None, 'outlets': [], 'run_id': 'manual__2025-01-06T12:37:34.037857+00:00', 'task': <Task(CustomOperator): hello>, 'task_instance': RuntimeTaskInstance(id=UUID('01943b9e-dadf-72eb-a47b-6c36e0fea3b7'), task_id='hello', dag_id='var_from_context', run_id='manual__2025-01-06T12:37:34.037857+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'ti': RuntimeTaskInstance(id=UUID('01943b9e-dadf-72eb-a47b-6c36e0fea3b7'), task_id='hello', dag_id='var_from_context', run_id='manual__2025-01-06T12:37:34.037857+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'var': {'json': <VariableAccessor (dynamic access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn': <ConnectionAccessor (dynamic access)>, 'dag_run': DagRun(dag_id='var_from_context', run_id='manual__2025-01-06T12:37:34.037857+00:00', logical_date=datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 1, 6, 12, 37, 34, 492777, tzinfo=TzInfo(UTC)), end_date=None, run_type=<DagRunType.MANUAL: 'manual'>, conf={}), 'data_interval_end': datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), 'data_interval_start': datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), 'logical_date': datetime.datetime(2025, 1, 6, 12, 37, 34, 37857, tzinfo=TzInfo(UTC)), 'ds': '2025-01-06', 'ds_nodash': '20250106', 'task_instance_key_str': 'var_from_context__hello__20250106', 'ts': '2025-01-06T12:37:34.037857+00:00', 'ts_nodash': '20250106T123734', 'ts_nodash_with_tz': '20250106T123734.037857+0000'} [task] chan=stdout
2025-01-06 12:37:34.642268 [debug    ] Sending request                [task] json={"key":"hi_message","type":"GetVariable"}

2025-01-06 12:37:34.648998 [info     ] Variable(key='hi_message', value='hello_world', description=None) [task] chan=stdout
2025-01-06 12:37:34.649000 [debug    ] Sending request                [task] json={"key":"json_var","type":"GetVariable"}

2025-01-06 12:37:34.652506 [warning  ] Pydantic serializer warnings:
  Expected `str` but got `dict` with value `{'api_key': '12345', 'region': 'us-east-1'}` - serialized value may not be as expected [py.warnings] category=UserWarning filename=/usr/local/lib/python3.9/site-packages/pydantic/main.py lineno=426
2025-01-06 12:37:34.652586 [debug    ] Sending request                [task] json={"state":"success","end_date":"2025-01-06T12:37:34.652550Z","type":"TaskState"}

2025-01-06 12:37:34.652845 [info     ] Variable(key='json_var', value={'api_key': '12345', 'region': 'us-east-1'}, description=None) [task] chan=stdout

Case 2: Variable not found

image

Logs:

2c27ff9a5949
 ▶ Log message source details
2025-01-06 12:34:57.712062 [info     ] Filling up the DagBag from /files/dags/var_from_context.py [airflow.models.dagbag.DagBag]
2025-01-06 12:34:57.712579 [debug    ] Importing /files/dags/var_from_context.py [airflow.models.dagbag.DagBag]
2025-01-06 12:34:57.717120 [debug    ] Loaded DAG <DAG: var_from_context> [airflow.models.dagbag.DagBag]
2025-01-06 12:34:57.717434 [debug    ] DAG file parsed                [task] file=/files/dags/var_from_context.py
2025-01-06 12:34:57.743903 [warning  ] CustomOperator.execute cannot be called outside TaskInstance! [airflow.task.operators.unusual_prefix_c6632fd34e048ff55a9057c21ee5a54c16b99828_var_from_context.CustomOperator]
2025-01-06 12:34:57.744163 [info     ] Hello World hello!             [task] chan=stdout
2025-01-06 12:34:57.744270 [info     ] {'dag': <DAG: var_from_context>, 'inlets': [], 'map_index_template': None, 'outlets': [], 'run_id': 'manual__2025-01-06T12:34:56.856856+00:00', 'task': <Task(CustomOperator): hello>, 'task_instance': RuntimeTaskInstance(id=UUID('01943b9c-74e4-7b93-ba7e-7401dde98944'), task_id='hello', dag_id='var_from_context', run_id='manual__2025-01-06T12:34:56.856856+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'ti': RuntimeTaskInstance(id=UUID('01943b9c-74e4-7b93-ba7e-7401dde98944'), task_id='hello', dag_id='var_from_context', run_id='manual__2025-01-06T12:34:56.856856+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'var': {'json': <VariableAccessor (dynamic access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn': <ConnectionAccessor (dynamic access)>, 'dag_run': DagRun(dag_id='var_from_context', run_id='manual__2025-01-06T12:34:56.856856+00:00', logical_date=datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 1, 6, 12, 34, 57, 589490, tzinfo=TzInfo(UTC)), end_date=None, run_type=<DagRunType.MANUAL: 'manual'>, conf={}), 'data_interval_end': datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), 'data_interval_start': datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), 'logical_date': datetime.datetime(2025, 1, 6, 12, 34, 56, 856856, tzinfo=TzInfo(UTC)), 'ds': '2025-01-06', 'ds_nodash': '20250106', 'task_instance_key_str': 'var_from_context__hello__20250106', 'ts': '2025-01-06T12:34:56.856856+00:00', 'ts_nodash': '20250106T123456', 'ts_nodash_with_tz': '20250106T123456.856856+0000'} [task] chan=stdout
2025-01-06 12:34:57.744177 [debug    ] Sending request                [task] json={"key":"hi_message","type":"GetVariable"}

2025-01-06 12:34:57.749149 [debug    ] Sending request                [task] json={"state":"failed","end_date":"2025-01-06T12:34:57.749108Z","type":"TaskState"}

TODO:

  • Writing variables from task SDK

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits but lgtm

@amoghrajesh
Copy link
Contributor Author

Unrelated failure. Merging.

@amoghrajesh amoghrajesh closed this Jan 6, 2025
@amoghrajesh amoghrajesh reopened this Jan 6, 2025
@amoghrajesh amoghrajesh merged commit a6da8df into apache:main Jan 7, 2025
44 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-variables-from-context branch January 7, 2025 06:05
HariGS-DB pushed a commit to HariGS-DB/airflow that referenced this pull request Jan 16, 2025
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow retrieving Variable from Task Context

2 participants