Skip to content

Commit e80f30c

Browse files
authored
fix: PLT-936: Scheduled migrations (#8767)
Co-authored-by: triklozoid <triklozoid@users.noreply.github.com>
1 parent f0a5b51 commit e80f30c

File tree

9 files changed

+725
-1
lines changed

9 files changed

+725
-1
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Async Migrations in Label Studio
2+
3+
## Overview
4+
5+
Async migrations are a mechanism for executing heavy database operations (e.g., creating indexes, bulk data updates) in the background without blocking the main Django migration process.
6+
7+
## Key Components
8+
9+
### AsyncMigrationStatus
10+
11+
Model for tracking async migration status (`label_studio/core/models.py:12`).
12+
13+
**Statuses:**
14+
- `SCHEDULED` - Migration is scheduled but not yet started
15+
- `STARTED` - Migration is started or queued
16+
- `IN PROGRESS` - Migration is in progress (check meta for job_id or status)
17+
- `FINISHED` - Migration completed successfully
18+
- `ERROR` - Migration completed with errors (check meta for details)
19+
20+
### make_sql_migration Helper
21+
22+
The main tool for creating new async migrations is located in `label_studio/core/migration_helpers.py:55`.
23+
24+
## How to Create a New Async Migration
25+
26+
### Using the make_sql_migration Helper
27+
28+
```python
29+
from django.db import migrations
30+
from core.migration_helpers import make_sql_migration
31+
32+
# SQL for forward migration
33+
sql_forwards = (
34+
'CREATE INDEX CONCURRENTLY IF NOT EXISTS my_index_name '
35+
'ON my_table (column1, column2);'
36+
)
37+
38+
# SQL for rollback
39+
sql_backwards = (
40+
'DROP INDEX CONCURRENTLY IF EXISTS my_index_name;'
41+
)
42+
43+
class Migration(migrations.Migration):
44+
atomic = False # Important for CONCURRENTLY operations in PostgreSQL
45+
46+
dependencies = [
47+
("tasks", "0054_previous_migration"),
48+
]
49+
50+
operations = [
51+
migrations.RunPython(
52+
*make_sql_migration(
53+
sql_forwards,
54+
sql_backwards,
55+
migration_name=__name__, # Automatically uses module name
56+
apply_on_sqlite=False, # Optional: whether to apply on SQLite (default False)
57+
execute_immediately=False, # Optional: execute immediately or allow scheduling
58+
)
59+
),
60+
]
61+
```
62+
63+
### make_sql_migration Parameters
64+
65+
- **sql_forwards** (required): SQL for forward migration
66+
- **sql_backwards** (required): SQL for rollback
67+
- **migration_name** (required): Migration name. Use `__name__` to automatically use the module name (e.g., `tasks.migrations.0055_task_proj_octlen_idx_async`)
68+
- **apply_on_sqlite** (optional, default `False`): Whether to apply SQL on SQLite
69+
- **execute_immediately** (optional, default `False`): Execute immediately or allow scheduling
70+
71+
## Migration Scheduling
72+
73+
### Automatic Scheduling
74+
75+
By default, migrations execute immediately when running `manage.py migrate`. However, you can enable scheduling mode.
76+
77+
**Enabling scheduling mode:**
78+
79+
Set the environment variable:
80+
```bash
81+
export ALLOW_SCHEDULED_MIGRATIONS=true
82+
```
83+
84+
### Scheduling Behavior
85+
86+
When `ALLOW_SCHEDULED_MIGRATIONS=true` and `execute_immediately=False`:
87+
88+
1. When the migration runs, an `AsyncMigrationStatus` record is created with `SCHEDULED` status
89+
2. The migration is not executed automatically
90+
3. Administrators can manually trigger scheduled migrations via Django Admin
91+
92+
### Forcing Immediate Execution
93+
94+
If you need a migration to execute immediately regardless of `ALLOW_SCHEDULED_MIGRATIONS`:
95+
96+
```python
97+
operations = [
98+
migrations.RunPython(
99+
*make_sql_migration(
100+
sql_forwards,
101+
sql_backwards,
102+
migration_name=__name__,
103+
execute_immediately=True, # Force immediate execution
104+
)
105+
),
106+
]
107+
```
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import logging
2+
from typing import Callable, Tuple
3+
4+
from core.redis import start_job_async_or_sync
5+
from django.conf import settings
6+
from django.db import connection
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
def execute_sql_job(*, migration_name: str, sql: str, apply_on_sqlite: bool = False, reverse: bool = False) -> None:
12+
from core.models import AsyncMigrationStatus
13+
14+
if not reverse:
15+
migration, created = AsyncMigrationStatus.objects.get_or_create(
16+
name=migration_name,
17+
defaults={'status': AsyncMigrationStatus.STATUS_STARTED},
18+
)
19+
if not created and migration.status == AsyncMigrationStatus.STATUS_FINISHED:
20+
logger.info(f'Migration {migration_name} already executed with status FINISHED')
21+
return
22+
if migration.status == AsyncMigrationStatus.STATUS_SCHEDULED:
23+
migration.status = AsyncMigrationStatus.STATUS_STARTED
24+
migration.save()
25+
26+
try:
27+
if connection.vendor == 'sqlite' and not apply_on_sqlite:
28+
logger.info('SQLite detected; skipping SQL execution as requested')
29+
else:
30+
with connection.cursor() as cursor:
31+
cursor.execute(sql)
32+
migration.status = AsyncMigrationStatus.STATUS_FINISHED
33+
migration.save()
34+
except Exception as e:
35+
logger.exception(f'Migration {migration_name} failed: {e}')
36+
migration.status = AsyncMigrationStatus.STATUS_ERROR
37+
if not migration.meta:
38+
migration.meta = {}
39+
migration.meta['error'] = str(e)
40+
migration.save()
41+
raise
42+
else:
43+
# Reverse path: don't create/update AsyncMigrationStatus. Just run SQL.
44+
try:
45+
if connection.vendor == 'sqlite' and not apply_on_sqlite:
46+
logger.info('SQLite detected; skipping SQL execution as requested (reverse)')
47+
return
48+
with connection.cursor() as cursor:
49+
cursor.execute(sql)
50+
except Exception as e:
51+
logger.exception(f'Reverse migration {migration_name} failed: {e}')
52+
raise
53+
54+
55+
def make_sql_migration(
56+
sql_forwards: str,
57+
sql_backwards: str,
58+
*,
59+
apply_on_sqlite: bool = False,
60+
execute_immediately: bool = False,
61+
migration_name: str | None = None,
62+
) -> Tuple[Callable, Callable]:
63+
"""Return (forwards, backwards) for migrations.RunPython.
64+
65+
- forwards: either schedules job or marks as SCHEDULED
66+
- backwards: always schedules job to execute reverse SQL
67+
"""
68+
if not migration_name:
69+
raise ValueError("make_sql_migration requires explicit migration_name like 'app_label:migration_module'")
70+
mig_key = migration_name
71+
72+
def forwards(apps, schema_editor): # noqa: ARG001
73+
if schema_editor.connection.vendor == 'sqlite' and not apply_on_sqlite:
74+
logger.info('Skipping migration for SQLite (apply_on_sqlite=False)')
75+
return
76+
should_execute = execute_immediately or not settings.ALLOW_SCHEDULED_MIGRATIONS
77+
if should_execute:
78+
start_job_async_or_sync(
79+
execute_sql_job,
80+
migration_name=mig_key,
81+
sql=sql_forwards,
82+
apply_on_sqlite=apply_on_sqlite,
83+
reverse=False,
84+
)
85+
else:
86+
AsyncMigrationStatus = apps.get_model('core', 'AsyncMigrationStatus')
87+
AsyncMigrationStatus.objects.get_or_create(
88+
name=mig_key,
89+
defaults={'status': 'SCHEDULED'},
90+
)
91+
92+
def backwards(apps, schema_editor): # noqa: ARG001
93+
start_job_async_or_sync(
94+
execute_sql_job,
95+
migration_name=mig_key,
96+
sql=sql_backwards,
97+
apply_on_sqlite=apply_on_sqlite,
98+
reverse=True,
99+
)
100+
101+
return forwards, backwards
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Generated by Django 5.1.9 on 2025-11-03 00:00
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('core', '0002_deletedrow'),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name='asyncmigrationstatus',
15+
name='status',
16+
field=models.CharField(
17+
choices=[
18+
('SCHEDULED', 'Migration is scheduled but not yet started.'),
19+
('STARTED', 'Migration is started or queued.'),
20+
('IN PROGRESS', 'Migration is in progress. Check meta for job_id or status.'),
21+
('FINISHED', 'Migration completed successfully.'),
22+
('ERROR', 'Migration completed with errors. Check meta for more info.'),
23+
],
24+
default=None,
25+
max_length=100,
26+
null=True,
27+
),
28+
),
29+
]

label_studio/core/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ class AsyncMigrationStatus(models.Model):
2727

2828
name = models.TextField('migration_name', help_text='Migration name')
2929

30+
STATUS_SCHEDULED = 'SCHEDULED'
3031
STATUS_STARTED = 'STARTED'
3132
STATUS_IN_PROGRESS = 'IN PROGRESS'
3233
STATUS_FINISHED = 'FINISHED'
3334
STATUS_ERROR = 'ERROR'
3435
STATUS_CHOICES = (
36+
(STATUS_SCHEDULED, 'Migration is scheduled but not yet started.'),
3537
(STATUS_STARTED, 'Migration is started or queued.'),
3638
(STATUS_IN_PROGRESS, 'Migration is in progress. Check meta for job_id or status.'),
3739
(STATUS_FINISHED, 'Migration completed successfully.'),

label_studio/core/settings/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@
141141
# This indicates whether the code is running in a Continuous Integration environment.
142142
CI = get_bool_env('CI', False)
143143

144+
# Control whether async SQL migrations can be scheduled (SCHEDULED status) instead of running immediately.
145+
# If False, migrations that would normally be scheduled will be executed immediately.
146+
ALLOW_SCHEDULED_MIGRATIONS = get_bool_env('ALLOW_SCHEDULED_MIGRATIONS', False)
147+
144148
# Databases
145149
# https://docs.djangoproject.com/en/2.1/ref/settings/#databases
146150
DJANGO_DB_MYSQL = 'mysql'

0 commit comments

Comments
 (0)