Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@


def remove(apps, schema_editor):
db_alias = schema_editor.connection.alias
View = apps.get_model('data_manager', 'View')
views = View.objects.all()
views = View.objects.using(db_alias).all()

for view in views:
if 'hiddenColumns' in view.data:
Expand All @@ -16,7 +17,7 @@ def remove(apps, schema_editor):
view.data['hiddenColumns']['labeling'].append('tasks:annotations_ids')
view.data['hiddenColumns']['labeling'] = list(set(view.data['hiddenColumns']['labeling']))

view.save()
view.save(using=db_alias)


class Migration(migrations.Migration):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@


def remove(apps, schema_editor):
db_alias = schema_editor.connection.alias
View = apps.get_model('data_manager', 'View')
views = View.objects.all()
views = View.objects.using(db_alias).all()

for view in views:
if 'hiddenColumns' in view.data:
Expand All @@ -16,7 +17,7 @@ def remove(apps, schema_editor):
view.data['hiddenColumns']['labeling'].append('tasks:predictions_model_versions')
view.data['hiddenColumns']['labeling'] = list(set(view.data['hiddenColumns']['labeling']))

view.save()
view.save(using=db_alias)


class Migration(migrations.Migration):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from django.db import migrations, connection
from django.db import migrations
from copy import deepcopy
from django.apps import apps as django_apps
from django.conf import settings
Expand All @@ -12,7 +12,7 @@
logger = logging.getLogger(__name__)


def forward_migration():
def forward_migration(db_alias):
"""
Migrates views that have agreement_selected populated to the new structure

Expand All @@ -36,7 +36,7 @@ def forward_migration():
'ground_truth': bool
}
"""
migration, created = AsyncMigrationStatus.objects.get_or_create(
migration, created = AsyncMigrationStatus.objects.using(db_alias).get_or_create(
name=migration_name,
defaults={'status': AsyncMigrationStatus.STATUS_STARTED}
)
Expand All @@ -49,7 +49,7 @@ def forward_migration():
# Iterate using values() to avoid loading full model instances
# Fetch only the fields we need, filtering to views that have 'agreement_selected' in data
qs = (
View.objects
View.objects.using(db_alias)
.filter(data__has_key='agreement_selected')
.filter(data__agreement_selected__isnull=False)
.values('id', 'data')
Expand All @@ -69,18 +69,19 @@ def forward_migration():
}

# Update only the JSON field via update(); do not load model instance or call save()
View.objects.filter(id=view_id).update(data=new_data)
View.objects.using(db_alias).filter(id=view_id).update(data=new_data)
logger.info(f'Updated View {view_id} agreement selected to default all annotators + all models')
updated += 1

if updated:
logger.info(f'{migration_name} Updated {updated} View rows')

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save(update_fields=['status'])
migration.save(update_fields=['status'], using=db_alias)

def forwards(apps, schema_editor):
start_job_async_or_sync(forward_migration, queue_name=settings.SERVICE_QUEUE_NAME)
db_alias = schema_editor.connection.alias
start_job_async_or_sync(forward_migration, db_alias=db_alias, queue_name=settings.SERVICE_QUEUE_NAME)


def backwards(apps, schema_editor):
Expand All @@ -100,4 +101,3 @@ class Migration(migrations.Migration):
]



17 changes: 10 additions & 7 deletions label_studio/io_storages/migrations/0014_init_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
logger = logging.getLogger(__name__)


def update_storage(storage):
def update_storage(storage, db_alias=None):
logger.info(f'=> Migration for {storage._meta.label} statuses started')
storage.objects.update(status='initialized')
instances = list(storage.objects.all().only('id', 'meta', 'status', 'last_sync_count'))
manager = storage.objects.using(db_alias) if db_alias else storage.objects
manager.update(status='initialized')
instances = list(manager.all().only('id', 'meta', 'status', 'last_sync_count', 'project_id'))

for instance in instances:
prefix = f'Project ID={instance.project.id} {instance}'
prefix = f'Project ID={instance.project_id} {instance}'

# import source storages
if 'import' in storage._meta.label_lower:
count = instance.links.count() - instance.last_sync_count if instance.last_sync_count else 0
links_manager = instance.links.using(db_alias) if db_alias else instance.links
count = links_manager.count() - instance.last_sync_count if instance.last_sync_count else 0
instance.meta['tasks_existed'] = count if count > 0 else 0
if instance.meta['tasks_existed'] and instance.meta['tasks_existed'] > 0:
instance.status = 'completed'
Expand All @@ -29,11 +31,12 @@ def update_storage(storage):
instance.status = 'completed'
logger.info(f'{prefix} total_annotations = {instance.last_sync_count}')

storage.objects.bulk_update(instances, fields=['meta', 'status'], batch_size=100)
manager.bulk_update(instances, fields=['meta', 'status'], batch_size=100)
logger.info(f'=> Migration for {storage._meta.label} statuses finished')


def forwards(apps, schema_editor):
db_alias = schema_editor.connection.alias
storages = [
apps.get_model('io_storages', 'AzureBlobImportStorage'),
apps.get_model('io_storages', 'AzureBlobExportStorage'),
Expand All @@ -48,7 +51,7 @@ def forwards(apps, schema_editor):
]

for storage in storages:
update_storage(storage)
update_storage(storage, db_alias)


def backwards(apps, schema_editor):
Expand Down
25 changes: 14 additions & 11 deletions label_studio/io_storages/migrations/0017_auto_20240731_1638.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def drop_index_sql(table_name, index_name, column_name):
]


def forward_migration(migration_name):
migration = AsyncMigrationStatus.objects.create(
def forward_migration(migration_name, db_alias):
migration = AsyncMigrationStatus.objects.using(db_alias).create(
name=migration_name,
status=AsyncMigrationStatus.STATUS_STARTED,
)
Expand All @@ -79,7 +79,8 @@ def forward_migration(migration_name):
)

# Get db cursor
cursor = connection.cursor()
from django.db import connections
cursor = connections[db_alias].cursor()
for table in tables:
index_sql = create_index_sql(table['table_name'], table['index_name'], table['column_name'])
fk_sql = create_fk_sql(table['table_name'], table['fk_constraint'], table['column_name'], "task_completion",
Expand All @@ -90,13 +91,13 @@ def forward_migration(migration_name):
cursor.execute(fk_sql)

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save()
migration.save(using=db_alias)
logger.debug(
f'Async migration {migration_name} complete'
)

def reverse_migration(migration_name):
migration = AsyncMigrationStatus.objects.create(
def reverse_migration(migration_name, db_alias):
migration = AsyncMigrationStatus.objects.using(db_alias).create(
name=migration_name,
status=AsyncMigrationStatus.STATUS_STARTED,
)
Expand All @@ -105,26 +106,29 @@ def reverse_migration(migration_name):
)

# Get db cursor
cursor = connection.cursor()
from django.db import connections
cursor = connections[db_alias].cursor()
for table in tables:
reverse_sql = drop_index_sql(table['table_name'], table['index_name'], table['column_name'])
# Run reverse_sql
cursor.execute(reverse_sql)

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save()
migration.save(using=db_alias)
logger.debug(
f'Async migration {migration_name} complete'
)


def forwards(apps, schema_editor):
# Dispatch migrations to rqworkers
start_job_async_or_sync(forward_migration, migration_name=migration_name)
db_alias = schema_editor.connection.alias
start_job_async_or_sync(forward_migration, migration_name=migration_name, db_alias=db_alias)


def backwards(apps, schema_editor):
start_job_async_or_sync(reverse_migration, migration_name=migration_name)
db_alias = schema_editor.connection.alias
start_job_async_or_sync(reverse_migration, migration_name=migration_name, db_alias=db_alias)


def get_operations():
Expand Down Expand Up @@ -158,4 +162,3 @@ class Migration(migrations.Migration):
]

operations = get_operations()

Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@
from ml_model_providers.models import ModelProviderConnection, ModelProviders


def _fill_model_version_model_provider_connection():
def _fill_model_version_model_provider_connection(db_alias: str):
for provider in [ModelProviders.OPENAI, ModelProviders.AZURE_OPENAI]:
this_provider_model_versions = ThirdPartyModelVersion.objects.filter(provider=provider).values('id', 'organization_id', 'provider_model_id')
this_provider_model_versions = (
ThirdPartyModelVersion.objects.using(db_alias)
.filter(provider=provider)
.values('id', 'organization_id', 'provider_model_id')
)
for provider_model_version in this_provider_model_versions:
connection_ids = ModelProviderConnection.objects.filter(
connection_ids = ModelProviderConnection.objects.using(db_alias).filter(
organization_id=provider_model_version['organization_id'],
provider=provider,
**({'deployment_name': provider_model_version['provider_model_id']} if provider == ModelProviders.AZURE_OPENAI else {}),
).values_list('id', flat=True)[:1]
connection_id = connection_ids[0] if connection_ids else None
ThirdPartyModelVersion.objects.filter(id=provider_model_version['id']).update(model_provider_connection_id=connection_id)
ThirdPartyModelVersion.objects.using(db_alias).filter(id=provider_model_version['id']).update(model_provider_connection_id=connection_id)

def forwards(apps, schema_editor):
start_job_async_or_sync(_fill_model_version_model_provider_connection)
db_alias = schema_editor.connection.alias
start_job_async_or_sync(_fill_model_version_model_provider_connection, db_alias=db_alias)


def backwards(apps, schema_editor):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@


def rename_disabled_to_off0006(apps, schema_editor):
db_alias = schema_editor.connection.alias
OrganizationMember = apps.get_model('organizations', 'OrganizationMember')
OrganizationMember.objects.filter(role="Disabled").update(role="Off")
OrganizationMember.objects.using(db_alias).filter(role="Disabled").update(role="Off")

migrations.AlterField(
model_name='organizationmember',
Expand All @@ -23,8 +24,9 @@ def rename_disabled_to_off0006(apps, schema_editor):


def rename_disabled_to_off0007(apps, schema_editor):
db_alias = schema_editor.connection.alias
OrganizationMember = apps.get_model('organizations', 'OrganizationMember')
OrganizationMember.objects.filter(role="Off").update(role="Deactivated")
OrganizationMember.objects.using(db_alias).filter(role="Off").update(role="Deactivated")

migrations.AlterField(
model_name='organizationmember',
Expand Down
17 changes: 9 additions & 8 deletions label_studio/projects/migrations/0026_auto_20231103_0020.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@
logger = logging.getLogger(__name__)


def _fill_label_config_hash(migration_name):
project_tuples = Project.objects.all().values_list('id', 'parsed_label_config')
def _fill_label_config_hash(migration_name, db_alias):
project_tuples = Project.objects.using(db_alias).all().values_list('id', 'parsed_label_config')
for project_id, parsed_label_config in project_tuples:
migration = AsyncMigrationStatus.objects.create(
migration = AsyncMigrationStatus.objects.using(db_alias).create(
project_id=project_id,
name=migration_name,
status=AsyncMigrationStatus.STATUS_STARTED,
)

hashed_label_config = hash(str(parsed_label_config))
Project.objects.filter(id=project_id).update(label_config_hash=hashed_label_config)
Project.objects.using(db_alias).filter(id=project_id).update(label_config_hash=hashed_label_config)

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save()
migration.save(using=db_alias)


def fill_label_config_hash(migration_name):
def fill_label_config_hash(migration_name, db_alias):
logger.info('Start filling label config hash')
start_job_async_or_sync(_fill_label_config_hash, migration_name=migration_name)
start_job_async_or_sync(_fill_label_config_hash, migration_name=migration_name, db_alias=db_alias)
logger.info('Finished filling label config hash')


def forward(apps, schema_editor):
fill_label_config_hash('0026_auto_20231103_0020')
db_alias = schema_editor.connection.alias
fill_label_config_hash('0026_auto_20231103_0020', db_alias)


def backwards(apps, schema_editor):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from django.db import migrations, connection
from django.db import migrations, connections
from django.conf import settings
from core.redis import start_job_async_or_sync
from core.models import AsyncMigrationStatus
Expand All @@ -8,8 +8,8 @@
migration_name = '0030_project_search_vector_index'

# Actual DDL to run
def forward_migration(migration_name):
migration = AsyncMigrationStatus.objects.create(
def forward_migration(migration_name, db_alias):
migration = AsyncMigrationStatus.objects.using(db_alias).create(
name=migration_name,
status=AsyncMigrationStatus.STATUS_STARTED,
)
Expand All @@ -21,16 +21,16 @@ def forward_migration(migration_name):
CREATE INDEX CONCURRENTLY IF NOT EXISTS project_search_vector_idx ON project USING GIN (search_vector);
'''

with connection.cursor() as cursor:
with connections[db_alias].cursor() as cursor:
cursor.execute(sql)

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save()
migration.save(using=db_alias)
logger.debug(f'Async migration {migration_name} complete')

# Reverse DDL
def reverse_migration(migration_name):
migration = AsyncMigrationStatus.objects.create(
def reverse_migration(migration_name, db_alias):
migration = AsyncMigrationStatus.objects.using(db_alias).create(
name=migration_name,
status=AsyncMigrationStatus.STATUS_STARTED,
)
Expand All @@ -39,23 +39,27 @@ def reverse_migration(migration_name):
# Drop index (handle database differences)
sql = 'DROP INDEX CONCURRENTLY IF EXISTS "project_search_vector_idx";'

with connection.cursor() as cursor:
with connections[db_alias].cursor() as cursor:
cursor.execute(sql)

migration.status = AsyncMigrationStatus.STATUS_FINISHED
migration.save()
migration.save(using=db_alias)
logger.debug(f'Async migration rollback {migration_name} complete')

# Hook into Django migration
def forwards(apps, schema_editor):
if connection.vendor == 'postgresql':
start_job_async_or_sync(forward_migration, migration_name=migration_name)
db_alias = schema_editor.connection.alias
conn = connections[db_alias]
if conn.vendor == 'postgresql':
start_job_async_or_sync(forward_migration, migration_name=migration_name, db_alias=db_alias)
else:
logger.debug(f'No index to create if is sqllite')

def backwards(apps, schema_editor):
if connection.vendor == 'postgresql':
start_job_async_or_sync(reverse_migration, migration_name=migration_name)
db_alias = schema_editor.connection.alias
conn = connections[db_alias]
if conn.vendor == 'postgresql':
start_job_async_or_sync(reverse_migration, migration_name=migration_name, db_alias=db_alias)
else:
logger.debug(f'No index to drop if is sqllite')

Expand All @@ -66,4 +70,4 @@ class Migration(migrations.Migration):
]
operations = [
migrations.RunPython(forwards, backwards),
]
]
Loading
Loading