Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,118 @@

.. towncrier release notes start

Airflow 3.0.2 (2025-06-10)
--------------------------

Significant Changes
^^^^^^^^^^^^^^^^^^^

No significant changes.

Bug Fixes
"""""""""

- Fix memory leak in dag-processor (#50558)
- Add back invalid inlet and outlet check before running tasks (#50773)
- Implement slice on LazyXComSequence to allow filtering items from a mapped task(#50117)
- Fix execution API server URL handling for relative paths in KE (#51183)
- Add log lookup exception for Empty operator subtypes (#50325)
- Increase the max zoom on the graph view to make it easier to see small dags on big monitor screens (#50772)
- Fix timezone selection and dashboard layout (#50463)
- Bugfix: Creating backfill for a dag is affecting other dags (#50577)
- Fix next asset schedule and dag card UX (#50271)
- Add bundle path to sys.path in task runner (#51318)
- Add bundle root to sys.path in dag processor (#50385)
- Prevent CPU spike in task supervisor when heartbeat timeout exceeded (#51023)
- Fix Airflow Connection Form widget error (#51168)
- Add backwards compatibility shim and deprecation warning for EmailOperator (#51004)
- Handle ``SIGSEGV`` signals during DAG file imports (#51171)
- Fix deferred task resumption in ``dag.test()`` (#51182)
- Fix get dags query to not have join explosion (#50984)
- Bugfix: Logical date isn't populated in Context vars (#50898)
- Mask variable values in task logs only if the variable key is sensitive (#50775)
- Mask secrets when retrieving variables from secrets backend (#50895)
- Deserialize should work while retrieving variables with secrets backend (#50889)
- Fix XCom deserialization for mapped tasks with custom backend (#50687)
- Support macros defined via plugins in Airflow 3 (#50642)
- Fix Pydantic ``ForwardRef`` error by reordering discriminated union definitions (#50688)
- Adding backwards compatibility shim for ``BaseNotifier`` (#50340)
- Use latest bundle version when clearing / re-running dag (#50040)
- Handle ``upstream_mapped_index`` when xcom access is needed (#50641)
- Remove unnecessary breaking flag in config command (#50781)
- Do not flood worker logs with secrets backend loading logs (#50581)
- Persist table sorting preferences across sessions using local storage (#50720)
- Fixed patch_task_instance API endpoint to support task instance summaries and task groups (#50550)
- Fixed bulk API schemas to improve OpenAPI compatibility and client generation (#50852)
- Fixed variable API endpoints to support keys containing slashes (#50841)
- Restored backward compatibility for the ``/run`` API endpoint for older Task SDK clients
- Fixed dropdown overflow and error text styling in ``FlexibleForm`` component (#50845)
- Corrected DAG tag rendering to display ``+1 more`` when tags exceed the display limit by one (#50669)
- Fix permission check on the ui config endpoint (#50608)
- Fix ``default_args`` handling in operator ``.partial()`` to prevent ``TypeError`` when unused keys are present (#50525)
- DAG Processor: Fix index to sort by last parsing duration (#50388)
- UI: Fix border overlap issue in the Events page (#50453)
- Fix ``airflow tasks clear`` command (#49631)
- Restored support for ``--local`` flag in ``dag list`` and ``dag list-import-errors`` CLI commands (#49380)
- CLI: Exclude example dags when a bundle is passed (#50401)
- Fix CLI export to handle stdout without file descriptors (#50328)
- Fix ``DagProcessor`` stats log to show the correct parse duration (#50316)
- Fix OpenAPI schema for ``get_log`` API (#50547)

Miscellaneous
"""""""""""""

- UI: Implement navigation on bar click (#50416)
- UI: Always Show Trends count in Dag Overview (#50183)
- UI: Add basic json check to variable value
- Remove filtering by last dag run state in patch dags endpoint (#51347)
- Ensure that both public and ui dags endpoints map to DagService (#51226)
- Refresh Dag details page on new run (#51173)
- Log fallback to None when no XCom value is found (#51285)
- Move ``example_dags`` in standard provider to ``example_dags`` in sources (#51275)
- Bring back "standard" example dags to the ``airflow-core`` package (#51192)
- Faster note on grid endpoint (#51247)
- Port ``task.test`` to Task SDK (#50827)
- Port ``dag.test`` to Task SDK (#50300,#50419)
- Port ``ti.run`` to Task SDK execution path (#50141)
- Support running ``airflow dags test`` from local files (#50420)
- Move macros to task SDK ``execution_time`` module (#50940)
- Add a link to the Airflow logo in Nav (#50304)
- UI: Bump minor and patch package json dependencies (#50298)
- Added a direct link to the latest DAG run in the DAG header (#51119,#51148)
- Fetch only the most recent ``dagrun`` value for list display (#50834)
- Move ``secret_key`` config to ``api`` section (#50839)
- Move various ``webserver`` configs to ``fab`` provider (#50774,#50269,#50208,#50896)
- Make ``dag_run`` nullable in Details page (#50719)
- Rename Operation IDs for task instance endpoints to include map indexes (#49608)
- Update default sort for connections and dags (#50600)
- Raise exception if downgrade can't proceed due to no ``ab_user`` table (#50343)
- Enable JSON serialization for variables created via the bulk API (#51057)
- Always display the backfill option in the UI; enable it only for DAGs with a defined schedule (#50969)
- Optimized DAG header to fetch only the most recent DAG run for improved performance (#50767)
- Add ``owner_links`` field to ``DAGDetailsResponse`` for enhanced owner metadata in the API (#50557)
- UI: Move map index column to be in line with other columns when viewing a summary mapped tasks (#50302)
- Separate configurations for colorized and json logs in Task SDK / Celery Executor (#51082)
- Enhanced task log viewer with virtualized rendering for improved performance on large logs (#50746)

Doc Only Changes
""""""""""""""""

- Add dates for Limited Maintenance & EOL for Airflow 2.x (#50794)
- Add Apache Airflow setup instructions for Apple Silicon (#50179)
- Update recommendation for upgrade path to airflow 3 (#50318)
- Add "disappearing DAGs" section on FAQ doc (#49987)
- Update Airflow 3 migration guide with step about custom operators (#50871) (#50948)
- Docs ``assets.rst``: use ``AssetAlias`` for alias in ``Metadata`` example (#50768)
- Do not use outdated ``schedule_interval`` in tutorial dags (#50947)
- Docs: Add Airflow Version in Page Title (#50358)
- Fix callbacks docs (#50377)
- Updating operator extra links doc for AF3 (#50197)
- Prune release notes (#50860)
- Fix types in config templates reference (#50792)
- Fix wrong import for ``PythonOperator`` in tutorial dag (#50962)
- Better structure of extras documentation (#50495)

Airflow 3.0.1 (2025-05-12)
--------------------------

Expand Down
2 changes: 1 addition & 1 deletion dev/breeze/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ PLEASE DO NOT MODIFY THE HASH BELOW! IT IS AUTOMATICALLY UPDATED BY PRE-COMMIT.

---------------------------------------------------------------------------------------------------------

Package config hash: fcc9ad87e1172ef881f3584a066215dde4f617fca6220e81c84f16f94d061231868a754ad9992630e81202a8972df18f64cabf943236c68512c2d99c18085ae7
Package config hash: 19b7a69c4b7ef23d1c665286fd7ca1a1d8c28fa9ba8523da6c3e215d8cd7c4bc0406186898a90c92d8e9f527bc8fa8d5c6407f914d7674e59e4981bb3c795e8c

---------------------------------------------------------------------------------------------------------
7 changes: 5 additions & 2 deletions dev/breeze/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ dependencies = [
"psutil>=5.9.6",
"pygithub>=2.1.1",
"pytest-xdist>=3.3.1",
"pytest>=8.2,<9",
"pytest>=8.3.3",
"pyyaml>=6.0.2",
"requests>=2.31.0",
"restructuredtext-lint>=1.4.0",
Expand All @@ -74,7 +74,10 @@ dependencies = [
"tabulate>=0.9.0",
"tomli>=2.0.1; python_version < '3.11'",
"twine>=4.0.2",
"tqdm>=4.67.1"
"tqdm>=4.67.1",
"boto3>=1.34.90",
"awswrangler>=3.11.0",
"semver>=3.0.4"
]

[project.scripts]
Expand Down
2 changes: 1 addition & 1 deletion devel-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ dependencies = [
"pytest-timeouts>=1.2.1",
"pytest-unordered>=0.6.1",
"pytest-xdist>=3.5.0",
"pytest>=8.3.3,<9",
"pytest>=8.3.3",
]
"sentry" = [
"blinker>=1.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def aws_region():
return AWS_REGION


@mock_aws
@pytest.fixture
def patch_hook(monkeypatch, aws_region):
"""Patch hook object by dummy boto3 Batch client."""
Expand All @@ -59,6 +58,7 @@ def test_batch_waiters(aws_region):
assert isinstance(batch_waiters, BatchWaitersHook)


@mock_aws
class TestBatchWaiters:
@pytest.fixture(autouse=True)
def setup_tests(self, patch_hook):
Expand Down Expand Up @@ -215,6 +215,7 @@ def test_wait_for_job_raises_for_waiter_error(self):
assert mock_waiter.wait.call_count == 1


@mock_aws
class TestBatchJobWaiters:
"""Test default waiters."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,43 +181,43 @@ def test_event_to_str(self):

@pytest.mark.db_test
class TestCloudwatchTaskHandler:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
def setup(self, create_log_template, tmp_path_factory, session):
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,39 @@ def s3mock():

@pytest.mark.db_test
class TestS3RemoteLogIO:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down
4 changes: 2 additions & 2 deletions reproducible_build.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release-notes-hash: 27a6ab5414dce2dd3883fc6a32e67985
source-date-epoch: 1748945953
release-notes-hash: 4c5bee8104d689b9597bf30ff371fe8a
source-date-epoch: 1748953361
2 changes: 1 addition & 1 deletion task-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ dependencies = [
'pendulum>=3.0.0,<4.0;python_version>="3.12"',
"python-dateutil>=2.7.0",
"psutil>=6.1.0",
"structlog>=25.2.0",
"structlog>=25.4.0",
"retryhttp>=1.2.0,!=1.3.0",
]

Expand Down
4 changes: 4 additions & 0 deletions task-sdk/tests/task_sdk/definitions/test_xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def c_to_none(v):
"exc_value": "expand_kwargs() expects a list[dict], not list[None]",
"frames": mock.ANY,
"is_cause": False,
"is_group": False,
"exceptions": [],
"syntax_error": None,
}
],
Expand Down Expand Up @@ -180,6 +182,8 @@ def does_not_work_with_c(v):
"exc_value": "nope",
"frames": mock.ANY,
"is_cause": False,
"is_group": False,
"exceptions": [],
"syntax_error": None,
}
],
Expand Down