Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Aug 22, 2025

The MySQL UUID v7 generation function in migration 0042 was creating malformed UUIDs that fail Pydantic validation when the scheduler attempts to enqueue task instances.

Problem

The original MySQL function had two critical issues:

  1. Generated only 16 random hex characters instead of the required 20
  2. Used SUBSTRING(rand_hex, 9) without length limit, producing 8-character final segments instead of the required 12 characters

This resulted in malformed UUIDs like:

  • Bad: 0198cf6d-fb98-4555-7301-e29b8403 (32 chars, last segment: 8 chars)
  • Good: 0198cf6d-fb98-4555-7301-e29b8403abcd (36 chars, last segment: 12 chars)

Simple Reproduction

The issue can be demonstrated with pure Python and the malformed UUIDs:

from pydantic import BaseModel
from uuid import UUID

class TaskInstanceDemo(BaseModel):
    id: UUID

# This fails with the exact error from the issue
bad_uuid = "0198cf6d-fb98-4555-7301-e29b8403"  # 32 chars
TaskInstanceDemo(id=bad_uuid)
# ValidationError: Input should be a valid UUID, invalid group length in group 4: expected 12, found 8

# This works fine  
good_uuid = "0198cf6d-fb98-4555-7301-e29b8403abcd"  # 36 chars
TaskInstanceDemo(id=good_uuid)  # ✓ Success

When This Issue Occurs

The validation error happens when:

  1. Task instances exist in 'scheduled' state before migrating from 2.10 to 3.0.x
  2. These tasks receive malformed UUIDs during migration
  3. Scheduler tries to enqueue these tasks via ExecuteTask.make()
  4. Pydantic validation fails: 'invalid group length in group 4: expected 12, found 8'

Users with no scheduled tasks during migration or who create new DAG runs typically don't encounter this issue since new task instances get proper UUIDs from the Python uuid7() function.

Solution

Updated the MySQL uuid_generate_v7 function to:

  • Use RANDOM_BYTES(10) for cryptographically secure 20-character hex data
  • Apply explicit SUBSTRING(rand_hex, 9, 12) to ensure 12-character final segment
  • Mark function as NOT DETERMINISTIC (correct for random functions)
  • Use CHAR(20) declaration matching actual usage

Why No New Data Migration

I decided against creating a separate migration to fix existing malformed UUIDs because:

  1. Limited scope - Only affects task instances in 'scheduled' state during migration
  2. Self-healing - System recovers as old tasks complete and new ones are created
  3. Risk mitigation - Avoid complex primary key modifications in production
  4. Alternative available - Manual fix script provided below for affected users
  5. Prevention focus - Fixing root cause prevents future occurrences

Manual Fix for Affected Users

If you encounter the UUID validation error, you can fix existing malformed UUIDs:

-- Fix malformed UUIDs by extending them to proper length
UPDATE task_instance
SET id = CONCAT(
    SUBSTRING(id, 1, 23),  -- Keep first 23 chars (including last dash)
    LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0')  -- Add 8 random hex chars
)
WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) = 8;  -- Find 8-char final segments

-- Verify the fix
SELECT id, LENGTH(id) as uuid_length,
       LENGTH(SUBSTRING_INDEX(id, '-', -1)) as last_segment_length
FROM task_instance
WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) != 12
LIMIT 5;

Testing

Error in Scheduler:

[2025-08-22T02:17:43.203+0000] {scheduler_job_runner.py:710} INFO - Trying to enqueue tasks: [<TaskInstance: as.simplest_dag manual__2025-08-22T01:39:07.403055+00:00 [scheduled]>, <TaskInstance: as.simplest_dag scheduled__2025-08-22T02:15:00+00:00 [scheduled]>] for executor: LocalExecutor(parallelism=32)
[2025-08-22T02:17:43.207+0000] {scheduler_job_runner.py:984} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 980, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1266, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1408, in _do_scheduling
    num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 716, in _critical_section_enqueue_task_instances
    self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, executor, session=session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 665, in _enqueue_task_instances_with_queued_state
    workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator)
  File "/opt/airflow/airflow-core/src/airflow/executors/workloads.py", line 114, in make
    ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
  File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskInstance
id
  Input should be a valid UUID, invalid group length in group 4: expected 12, found 8 [type=uuid_parsing, input_value='0198cf6e-670d-3797-7578-d9f380de', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/uuid_parsing
[2025-08-22T02:17:43.208+0000] {local_executor.py:230} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2025-08-22T02:17:43.208+0000] {scheduler_job_runner.py:996} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 368, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 397, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 980, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1266, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1408, in _do_scheduling
    num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 716, in _critical_section_enqueue_task_instances
    self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, executor, session=session)
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 665, in _enqueue_task_instances_with_queued_state
    workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator)
  File "/opt/airflow/airflow-core/src/airflow/executors/workloads.py", line 114, in make
    ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
  File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskInstance
id
  Input should be a valid UUID, invalid group length in group 4: expected 12, found 8 [type=uuid_parsing, input_value='0198cf6e-670d-3797-7578-d9f380de', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/uuid_parsing
image

Closes #54554

@kaxil kaxil added this to the Airflow 3.0.6 milestone Aug 22, 2025
@kaxil kaxil requested a review from ephraimbuddy as a code owner August 22, 2025 02:46
@kaxil kaxil added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Aug 22, 2025
The MySQL UUID v7 generation function in migration 0042 was creating malformed UUIDs
that fail Pydantic validation when the scheduler attempts to enqueue task instances.

## Problem

The original MySQL function had two critical issues:
1. Generated only 16 random hex characters instead of the required 20
2. Used SUBSTRING(rand_hex, 9) without length limit, producing 8-character final
   segments instead of the required 12 characters

This resulted in malformed UUIDs like:
- Bad:  0198cf6d-fb98-4555-7301-e29b8403 (32 chars, last segment: 8 chars)
- Good: 0198cf6d-fb98-4555-7301-e29b8403abcd (36 chars, last segment: 12 chars)

## Simple Reproduction

The issue can be demonstrated with pure Python and the malformed UUIDs:

```python
from pydantic import BaseModel
from uuid import UUID

class TaskInstanceDemo(BaseModel):
    id: UUID

# This fails with the exact error from the issue
bad_uuid = "0198cf6d-fb98-4555-7301-e29b8403"  # 32 chars
TaskInstanceDemo(id=bad_uuid)
# ValidationError: Input should be a valid UUID, invalid group length in group 4: expected 12, found 8

# This works fine
good_uuid = "0198cf6d-fb98-4555-7301-e29b8403abcd"  # 36 chars
TaskInstanceDemo(id=good_uuid)  # ✓ Success
```

## When This Issue Occurs

The validation error happens when:
1. Task instances exist in 'scheduled' state before migrating from 2.10 to 3.0.x
2. These tasks receive malformed UUIDs during migration
3. Scheduler tries to enqueue these tasks via ExecuteTask.make()
4. Pydantic validation fails: 'invalid group length in group 4: expected 12, found 8'

Users with no scheduled tasks during migration or who create new DAG runs typically
don't encounter this issue since new task instances get proper UUIDs from the
Python uuid7() function.

## Solution

Updated the MySQL uuid_generate_v7 function to:
- Use RANDOM_BYTES(10) for cryptographically secure 20-character hex data
- Apply explicit SUBSTRING(rand_hex, 9, 12) to ensure 12-character final segment
- Mark function as NOT DETERMINISTIC (correct for random functions)
- Use CHAR(20) declaration matching actual usage

## Why No Data Migration

We decided against creating a separate migration to fix existing malformed UUIDs because:

1. **Limited scope** - Only affects task instances in 'scheduled' state during migration
2. **Self-healing** - System recovers as old tasks complete and new ones are created
3. **Risk mitigation** - Avoid complex primary key modifications in production
4. **Alternative available** - Manual fix script provided below for affected users
5. **Prevention focus** - Fixing root cause prevents future occurrences

## Manual Fix for Affected Users

If you encounter the UUID validation error, you can fix existing malformed UUIDs:

```sql
-- Fix malformed UUIDs by extending them to proper length
UPDATE task_instance
SET id = CONCAT(
    SUBSTRING(id, 1, 23),  -- Keep first 23 chars (including last dash)
    LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0')  -- Add 8 random hex chars
)
WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) = 8;  -- Find 8-char final segments

-- Verify the fix
SELECT id, LENGTH(id) as uuid_length,
       LENGTH(SUBSTRING_INDEX(id, '-', -1)) as last_segment_length
FROM task_instance
WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) != 12
LIMIT 5;
```

## Testing

Verified the fix generates valid UUIDs:
- Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (36 chars total)
- Final segment: 12 characters (not 8)
- Passes standard UUID validation patterns

Fixes apache#54554
@kaxil kaxil force-pushed the fix/mysql-uuid-migration-malformed-ids branch from debe60a to c48b1ef Compare August 22, 2025 03:57
@kaxil kaxil requested a review from vatsrahul1001 August 22, 2025 04:30
@kaxil kaxil merged commit 600716f into apache:main Aug 22, 2025
57 checks passed
@kaxil kaxil deleted the fix/mysql-uuid-migration-malformed-ids branch August 22, 2025 04:42
@github-actions
Copy link

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 600716f v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

kaxil added a commit that referenced this pull request Aug 22, 2025
mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Aug 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:db-migrations PRs with DB migration backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch kind:documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

taskInstance Id format wrong while migrating to 3.0.4 and using MySQL as database

2 participants