Skip to content

Dag Tag renaming (from lower case to upper case) cause dag-processor failure #56940

@yanshil

Description

@yanshil

Apache Airflow version

3.1.0

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

It seems like renaming tag of a dag would cause problem in Airflow Dag Processor.

Originally I have a dag A with tags=['dangerous']. Today I wrote another dag B and tag it as DANGEROUS. I renamed tags in A as tags=['DANGEROUS'] to make them uniform, and then I found out my dag processor got crash with the following message.

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 7, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py", line 53, in dag_processor
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py", line 56, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 368, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 397, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 272, in run
    return self._run_parsing_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 361, in _run_parsing_loop
    self._collect_results()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 827, in _collect_results
    self._file_stats[file] = process_parse_results(
                             ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 1155, in process_parse_results
    update_dag_parsing_results_in_db(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 372, in update_dag_parsing_results_in_db
    for attempt in run_with_db_retries(logger=log):
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 445, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 382, in update_dag_parsing_results_in_db
    SerializedDAG.bulk_write_to_db(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py", line 2867, in bulk_write_to_db
    orm_dags = dag_op.add_dags(session=session)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 450, in add_dags
    orm_dags = self.find_orm_dags(session=session)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 447, in find_orm_dags
    return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}
                                    ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1778, in scalars
    return self.execute(
           ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1716, in execute
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1555, in _connection_for_bind
    return self._transaction._connection_for_bind(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 724, in _connection_for_bind
    self._assert_active()
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (MySQLdb.IntegrityError) (1062, "Duplicate entry 'DANGEROUS-modify_dr_conf' for key 'dag_tag.PRIMARY'")
[SQL: INSERT INTO dag_tag (name, dag_id) VALUES (%s, %s)]
[parameters: ('DANGEROUS', 'modify_dr_conf')]

I have to rename the tag back to lowercase in dag A to make dag-process alive again. But now I got dangerous in dag A and DANGEROUS in dag B, and I can't even rename DANGEROUS to dangerous because I think there might be a similar case for dag B if I do so.

Image

I tried another possible solution, which not works.

  1. Delete the dag A with original lowercase dangerous tag on UI
  2. Wait for the delete finished and dag disappear
  3. Upload new dag A with new UPPERCASE DANGEROUS tag
  4. But the UI still show the old lowercase tag, and the dag-processor crashed.

My final solution now is to completly rename the dag A to another dag id, so that it can be considered as a totally new dag and get rid of this problem.

What you think should happen instead?

No response

How to reproduce

I suppose simply renaming a lowercase tag to uppercase might be able to reproduce the problem, but I will still attach my full action list.

  1. I created a dag A with tag=['dangerous']
@dag(
    dag_id="modify_dr_conf",
    tags=["dangerous", "maintenance", "database"],
)
def modify_dr_conf(): ...
  1. I created a dag B with tag=['DANGEROUS']

@dag(
    dag_id="airflow_clean_db",
    tags=["DANGEROUS", "maintenance", "database"],
)
def airflow_clean_db(): ...
  1. I decided to rename tag in A as DANGEROUS, and now it is
@dag(
    dag_id="modify_dr_conf",
    tags=["DANGEROUS", "maintenance", "database"],
)
def modify_dr_conf(): ...

Operating System

Helm Chart 1.18.0 with Docker Image based on apache/airflow:3.1.0-python3.11

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Helm Chart 1.18.0

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions