Skip to content

Conversation

@dheerajturaga
Copy link
Member

The command column in the edge_job table is hitting the character limit of 1000 during retires of decent sized BashOperator commands. This is causing the EdgeExecutor to crash and exit leaving the scheduler in a bad state. This PR bumps up the char limit to 2048

Here's the snippet of the error from the scheduler logs

[SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)]

(Background on this error at: https://sqlalche.me/e/14/9h9h)
[2025-06-14T02:26:26.528+0000] {edge_executor.py:343} INFO - Shutting down EdgeExecutor
[2025-06-14T02:26:26.528+0000] {scheduler_job_runner.py:1031} INFO - Exited execute loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
    context._psycopg2_fetched_rows = xtras.execute_values(
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
psycopg2.errors.StringDataRightTruncation: value too long for type character varying(1000)

@dheerajturaga dheerajturaga requested a review from jscheffl as a code owner June 14, 2025 03:13
@boring-cyborg boring-cyborg bot added area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3 labels Jun 14, 2025
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising the PR! In my cases the commadn was always sufficient in length.
I am not sure if 1000 is not enough if in all cases 2048 will be sufficient?

@ashb or @amoghrajesh do you know in case of retry or what is coming how long a airflow.executors.ExecuteTask json object can get? Is 2048 safe?

@amoghrajesh
Copy link
Contributor

@jscheffl @dheerajturaga from what i see in the edge executor interface, the edge_command is nothing but a model dump of workload: https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py#L169

I tried a small experiment
Created a simple dag and ran it couple of times with retries defined.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator


def flaky_task(**context):
    y = 1//0

with DAG(
    dag_id="retry_example_dag",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    retrying_task = PythonOperator(
        task_id="flaky_task",
        python_callable=flaky_task,
        retries=5,
        retry_delay=timedelta(seconds=5),
    )

(Made some tweaks so it prints the size of the workload.model_dump_json() which is command here)

2025-06-14 12:50:54.751778 [info     ] Task execute_workload[a7517e3a-e357-4bf4-8519-841aae0fa0aa] received [celery.worker.strategy]
2025-06-14 12:50:54.897616 [info     ] [a7517e3a-e357-4bf4-8519-841aae0fa0aa] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-34bf-70b1-84cc-284ae6aff602'), task_id='flaky_task', dag_id='retry_example_dag', run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=None) dag_rel_path=PurePosixPath('retry-with-delay.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=1.log' type='ExecuteTask', len of workload is: 903 [airflow.providers.celery.executors.celery_executor_utils]
2025-06-14 12:50:54.915904 [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['LocalFilesystemBackend', 'EnvironmentVariablesBackend'] count=2
2025-06-14 12:50:55.545694 [info     ] Task finished                  [supervisor] duration=0.6320609190006508 exit_code=0 final_state=up_for_retry
2025-06-14 12:50:55.550448 [info     ] Task execute_workload[a7517e3a-e357-4bf4-8519-841aae0fa0aa] succeeded in 0.7965272139990702s: None [celery.app.trace]
^L[2025-06-14 12:50:57 +0000] [665] [INFO] Handling signal: winch
2025-06-14 12:51:01.275889 [info     ] Task execute_workload[241e8614-d7dd-4de9-b901-32195c238f65] received [celery.worker.strategy]
2025-06-14 12:51:01.280838 [info     ] [241e8614-d7dd-4de9-b901-32195c238f65] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-39b3-754c-ae51-ce764bb98068'), task_id='flaky_task', dag_id='retry_example_dag', run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=datetime.datetime(2025, 6, 14, 12, 50, 54, 624148, tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=2.log' type='ExecuteTask', len of workload is: 928 [airflow.providers.celery.executors.celery_executor_utils]
2025-06-14 12:51:01.295521 [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['LocalFilesystemBackend', 'EnvironmentVariablesBackend'] count=2
2025-06-14 12:51:01.453121 [info     ] Task finished                  [supervisor] duration=0.15831252200587187 exit_code=0 final_state=up_for_retry
2025-06-14 12:51:01.458458 [info     ] Task execute_workload[241e8614-d7dd-4de9-b901-32195c238f65] succeeded in 0.18147331300133374s: None [celery.app.trace]
2025-06-14 12:51:06.603739 [info     ] Task execute_workload[7b287860-ad2c-4fc9-9a03-90f429aa3666] received [celery.worker.strategy]
2025-06-14 12:51:06.609338 [info     ] [7b287860-ad2c-4fc9-9a03-90f429aa3666] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-50c7-7f57-a3da-d326eefbde78'), task_id='flaky_task', dag_id='retry_example_dag', run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=3, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=datetime.datetime(2025, 6, 14, 12, 51, 1, 270429, tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=3.log' type='ExecuteTask', len of workload is: 928 [airflow.providers.celery.executors.celery_executor_utils]
2025-06-14 12:51:06.624493 [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['LocalFilesystemBackend', 'EnvironmentVariablesBackend'] count=2
2025-06-14 12:51:06.758592 [info     ] Task finished                  [supervisor] duration=0.13502606399561046 exit_code=0 final_state=up_for_retry
2025-06-14 12:51:06.762165 [info     ] Task execute_workload[7b287860-ad2c-4fc9-9a03-90f429aa3666] succeeded in 0.15771018699888373s: None [celery.app.trace]
2025-06-14 12:51:12.682274 [info     ] Task execute_workload[c2480fb0-ba96-4bbd-b5eb-0e74cd2641cc] received [celery.worker.strategy]
2025-06-14 12:51:12.686425 [info     ] [c2480fb0-ba96-4bbd-b5eb-0e74cd2641cc] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-6580-7a9c-b89b-7b016c42fca4'), task_id='flaky_task', dag_id='retry_example_dag', run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=4, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=datetime.datetime(2025, 6, 14, 12, 51, 6, 599630, tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=4.log' type='ExecuteTask', len of workload is: 928 [airflow.providers.celery.executors.celery_executor_utils

So i can see in normal cases, its 903 and then 928.

Considering that we can max out some fields in length like:
• task_id can be up to 36 characters
• dag_id can be up to 250 characters
• Other fields (e.g., run_id, queue, log_path, executor_config, etc.) add more

I think 2048 would be a good length specifier here. I do not see a situation where it could exceed beyond that and anything shorter could be hit real soon and become limiting.

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just will make a short test and CI needs to be green, then LGTM!

@dheerajturaga
Copy link
Member Author

Thanks @amoghrajesh for the quick sanity check!
Thanks @jscheffl for the review!

@dheerajturaga
Copy link
Member Author

Looks good! Just will make a short test and CI needs to be green, then LGTM!

Great!, I ran the following in airflow db shell when launching breeze to verify breeze start-airflow --db-reset -e --executor EdgeExecutor

SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_name = 'edge_job' AND column_name = 'command' AND table_schema = 'public';

@jscheffl
Copy link
Contributor

Manual tests with Airflow 3 and Airflow 2.11 went fine!

@jscheffl jscheffl merged commit d894843 into apache:main Jun 14, 2025
66 checks passed
@dheerajturaga dheerajturaga deleted the edge_job_command_char_increase branch June 14, 2025 15:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants