Skip to content

Dag Processor crashes during DAG import #53759

@sm684

Description

@sm684

Apache Airflow version

3.0.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

After updating Airflow from 3.0.2 to 3.0.3 the Dag Processor crashes during DAG import.

[2025-07-25T12:20:25.457+0000] {manager.py:531} INFO - Not time to refresh bundle dags-folder
[2025-07-25T12:20:26.056+0000] {dag.py:1622} INFO - Sync 1 DAGs
[2025-07-25T12:20:26.068+0000] {dag.py:2236} INFO - Setting next_dagrun for dag_test_Kafka to None, run_after=None
[2025-07-25T12:20:26.130+0000] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type json
LINE 1: UPDATE serialized_dag SET data='{"__version": 2, "dag": {"ti...
^
DETAIL: Token "NaN" is invalid.
CONTEXT: JSON data, line 1: ...DLE FROM AR_MEARCH_TEMP);", "pyfunction": NaN...

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 269, in run
return self._run_parsing_loop()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 358, in _run_parsing_loop
self._collect_results()
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 816, in _collect_results
self._file_stats[file] = process_parse_results(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 1147, in process_parse_results
update_dag_parsing_results_in_db(
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/collection.py", line 387, in update_dag_parsing_results_in_db
session.flush()
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self.flush(objects)
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in exit
compat.raise
(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise

raise exception
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self.handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in handle_dbapi_exception
util.raise
(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise

raise exception
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type json
LINE 1: UPDATE serialized_dag SET data='{"__version": 2, "dag": {"ti...
^
DETAIL: Token "NaN" is invalid.
CONTEXT: JSON data, line 1: ...DLE FROM AR_MEARCH_TEMP);", "pyfunction": NaN...

[SQL: UPDATE serialized_dag SET data=%(data)s, last_updated=%(last_updated)s, dag_hash=%(dag_hash)s WHERE serialized_dag.id = %(serialized_dag_id)s]
[parameters: {'data': '{"__version": 2, "dag": {"timezone": "UTC", "disable_bundle_versioning": false, "edge_info": {}, "default_args": {"__var": {"owner": "airflow", "depe ... (49653 characters truncated) ... tion", "schema": {"__var": {"type": "string", "title": "Subsidiary ID were tables will be copied from", "enum": ["15", "25"]}, "__type": "dict"}}]]}}', 'last_updated': datetime.datetime(2025, 7, 25, 12, 20, 26, 126182, tzinfo=Timezone('UTC')), 'dag_hash': '6eff31cfb5f35e87f41456650758ab1c', 'serialized_dag_id': UUID('01984175-37ea-7111-9667-0be114c0b564')}]
(Background on this error at: https://sqlalche.me/e/14/9h9h)

Image

What you think should happen instead?

The Dag Processor should not fail, the failing DAG import should be highlighted under DAG import errors and the Dag Processor should continue.

How to reproduce

See attached DAG

dag_test_Kafka.txt

sql_4_all_group.csv

Operating System

AlmaLinux 9.6 (Sage Margay)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==9.9.0
apache-airflow-providers-celery==3.12.1
apache-airflow-providers-cncf-kubernetes==10.6.1
apache-airflow-providers-common-compat==1.7.2
apache-airflow-providers-common-io==1.6.1
apache-airflow-providers-common-messaging==1.0.4
apache-airflow-providers-common-sql==1.27.3
apache-airflow-providers-docker==4.4.1
apache-airflow-providers-elasticsearch==6.3.1
apache-airflow-providers-fab==2.3.0
apache-airflow-providers-ftp==3.13.1
apache-airflow-providers-git==0.0.4
apache-airflow-providers-google==16.1.0
apache-airflow-providers-grpc==3.8.1
apache-airflow-providers-hashicorp==4.3.1
apache-airflow-providers-http==5.3.2
apache-airflow-providers-microsoft-azure==12.5.0
apache-airflow-providers-mysql==6.3.2
apache-airflow-providers-odbc==4.10.1
apache-airflow-providers-openlineage==2.5.0
apache-airflow-providers-postgres==6.2.1
apache-airflow-providers-redis==4.1.1
apache-airflow-providers-sendgrid==4.1.2
apache-airflow-providers-sftp==5.3.2
apache-airflow-providers-slack==9.1.2
apache-airflow-providers-smtp==2.1.1
apache-airflow-providers-snowflake==6.5.0
apache-airflow-providers-ssh==4.1.1
apache-airflow-providers-standard==1.4.1

Deployment

Docker-Compose

Deployment details

No response

Anything else?

Every time

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions