Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AAP-16926 Delete unpartitioned tables in a separate transaction #14572

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 71 additions & 39 deletions awx/main/management/commands/cleanup_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


# Django
from django.apps import apps
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connection
from django.db.models import Min, Max
Expand Down Expand Up @@ -198,35 +199,56 @@ def cleanup_workflow_jobs_partition(self):
delete_meta.delete_jobs()
return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count)

def _handle_unpartitioned_events(self, model, pk_list):
"""
If unpartitioned job events remain, it will cascade those from jobs in pk_list
if the unpartitioned table is no longer necessary, it will drop the table
"""
def has_unpartitioned_table(self, model):
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key
with connection.cursor() as cursor:
cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';")
row = cursor.fetchone()
if row is None:
self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated')
return
if pk_list:
with connection.cursor() as cursor:
pk_list_csv = ','.join(map(str, pk_list))
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})")
return False
return True

def _delete_unpartitioned_table(self, model):
"If the unpartitioned table is no longer necessary, it will drop the table"
tblname = unified_job_class_to_event_table_name(model)
if not self.has_unpartitioned_table(model):
self.logger.debug(f'Table _unpartitioned_{tblname} does not exist, you are fully migrated.')
return

with connection.cursor() as cursor:
# same as UnpartitionedJobEvent.objects.aggregate(Max('created'))
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"')
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}";')
row = cursor.fetchone()
last_created = row[0]
if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
else:
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')
if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}')
cursor.execute(f'DROP TABLE _unpartitioned_{tblname}')

if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
else:
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')

if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(
f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}\n'
'WARNING - this will happen in a separate transaction so a failure will not roll back prior cleanup'
)
with connection.cursor() as cursor:
cursor.execute(f'DROP TABLE _unpartitioned_{tblname};')

def _delete_unpartitioned_events(self, model, pk_list):
"If unpartitioned job events remain, it will cascade those from jobs in pk_list"
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key

# Bail if the unpartitioned table does not exist anymore
if not self.has_unpartitioned_table(model):
return

# Table still exists, delete individual unpartitioned events
if pk_list:
with connection.cursor() as cursor:
self.logger.debug(f'Deleting {len(pk_list)} events from _unpartitioned_{tblname}, use a longer cleanup window to delete the table.')
pk_list_csv = ','.join(map(str, pk_list))
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv});")

def cleanup_jobs(self):
# Hack to avoid doing N+1 queries as each item in the Job query set does
Expand All @@ -249,7 +271,8 @@ def cleanup_jobs(self):

_, results = qs_batch.delete()
deleted += results['main.Job']
self._handle_unpartitioned_events(Job, pk_list)
# Avoid dropping the job event table in case we have interacted with it already
self._delete_unpartitioned_events(Job, pk_list)

return skipped, deleted

Expand All @@ -272,7 +295,7 @@ def cleanup_ad_hoc_commands(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(AdHocCommand, pk_list)
self._delete_unpartitioned_events(AdHocCommand, pk_list)

skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -300,7 +323,7 @@ def cleanup_project_updates(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(ProjectUpdate, pk_list)
self._delete_unpartitioned_events(ProjectUpdate, pk_list)

skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -328,7 +351,7 @@ def cleanup_inventory_updates(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(InventoryUpdate, pk_list)
self._delete_unpartitioned_events(InventoryUpdate, pk_list)

skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand All @@ -352,7 +375,7 @@ def cleanup_management_jobs(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(SystemJob, pk_list)
self._delete_unpartitioned_events(SystemJob, pk_list)

skipped += SystemJob.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -397,7 +420,6 @@ def cleanup_notifications(self):
skipped += Notification.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted

@transaction.atomic
def handle(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
Expand Down Expand Up @@ -425,19 +447,29 @@ def handle(self, *args, **options):
del s.receivers[:]
s.sender_receivers_cache.clear()

for m in model_names:
if m not in models_to_cleanup:
continue
with transaction.atomic():
for m in models_to_cleanup:
skipped, deleted = getattr(self, 'cleanup_%s' % m)()

skipped, deleted = getattr(self, 'cleanup_%s' % m)()
func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition

func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition
if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)

if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)
# Deleting unpartitioned tables cannot be done in same transaction as updates to related tables
if not self.dry_run:
with transaction.atomic():
for m in models_to_cleanup:
unified_job_class_name = m[:-1].title().replace('Management', 'System').replace('_', '')
unified_job_class = apps.get_model('main', unified_job_class_name)
try:
unified_job_class().event_class
except (NotImplementedError, AttributeError):
continue # no need to run this for models without events
self._delete_unpartitioned_table(unified_job_class)