From e7d786066d8ff11e8cfe7529a5d55557615ccc48 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Mon, 11 Nov 2024 15:46:23 +0300 Subject: [PATCH 01/17] not prefetching images when not needed --- cvat/apps/dataset_manager/task.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 5b72f92a1ebc..29e6e916311b 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -13,7 +13,7 @@ from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction -from django.db.models.query import Prefetch +from django.db.models.query import Prefetch, QuerySet from django.conf import settings from rest_framework.exceptions import ValidationError @@ -81,9 +81,10 @@ def merge_table_rows(rows, keys_for_merge, field_id): return list(merged_rows.values()) + class JobAnnotation: @classmethod - def add_prefetch_info(cls, queryset): + def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool): assert issubclass(queryset.model, models.Job) label_qs = add_prefetch_fields(models.Label.objects.all(), [ @@ -93,6 +94,12 @@ def add_prefetch_info(cls, queryset): ]) label_qs = JobData.add_prefetch_info(label_qs) + task_data_queryset = models.Data.objects.select_related('video') + if prefetch_images: + task_data_queryset = task_data_queryset.prefetch_related( + Prefetch('images', queryset=models.Image.objects.order_by('frame')) + ) + return queryset.select_related( 'segment', 'segment__task', @@ -103,18 +110,15 @@ def add_prefetch_info(cls, queryset): 'segment__task__project__owner', 'segment__task__project__assignee', - Prefetch('segment__task__data', - queryset=models.Data.objects.select_related('video').prefetch_related( - Prefetch('images', queryset=models.Image.objects.order_by('frame')) - )), + Prefetch('segment__task__data', queryset=task_data_queryset), Prefetch('segment__task__label_set', queryset=label_qs), Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, is_prefetched=False, queryset=None): + def __init__(self, pk, *, is_prefetched: bool = False, queryset: QuerySet = None, prefetch_images: bool = True): if queryset is None: - queryset = self.add_prefetch_info(models.Job.objects) + queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) if is_prefetched: self.db_job: models.Job = queryset.select_related( @@ -1018,7 +1022,7 @@ def put_job_data(pk, data): @plugin_decorator @transaction.atomic def patch_job_data(pk, data, action): - annotation = JobAnnotation(pk) + annotation = JobAnnotation(pk, prefetch_images=False) if action == PatchAction.CREATE: annotation.create(data) elif action == PatchAction.UPDATE: @@ -1031,7 +1035,7 @@ def patch_job_data(pk, data, action): @silk_profile(name="DELETE job data") @transaction.atomic def delete_job_data(pk): - annotation = JobAnnotation(pk) + annotation = JobAnnotation(pk, prefetch_images=False) annotation.delete() def export_job(job_id, dst_file, format_name, server_url=None, save_images=False): From 1c59e5064ef0c10b37fc4297f00c42ae6e3db40d Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Tue, 12 Nov 2024 19:01:27 +0300 Subject: [PATCH 02/17] fixing tests --- cvat/apps/dataset_manager/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 29e6e916311b..0ad01b06129e 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -84,7 +84,7 @@ def merge_table_rows(rows, keys_for_merge, field_id): class JobAnnotation: @classmethod - def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool): + def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): assert issubclass(queryset.model, models.Job) label_qs = add_prefetch_fields(models.Label.objects.all(), [ From b20565295aef09d487d21672e8821d69ae04fa80 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Wed, 13 Nov 2024 13:08:48 +0300 Subject: [PATCH 03/17] changelog entry --- .../20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md diff --git a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md new file mode 100644 index 000000000000..512c4b1cd5a6 --- /dev/null +++ b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md @@ -0,0 +1,4 @@ +### Fixed + +- Optimized memory consumption when importing annotations to a task with a lot of jobs and images + () From 3a5a26d5fb18d1fe35190fb47c0bf1996d4e14cf Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Sun, 17 Nov 2024 23:34:42 +0300 Subject: [PATCH 04/17] do not prefetch video --- cvat/apps/dataset_manager/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 0ad01b06129e..3b0cfaf73980 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -94,9 +94,9 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): ]) label_qs = JobData.add_prefetch_info(label_qs) - task_data_queryset = models.Data.objects.select_related('video') + task_data_queryset = models.Data.objects.all() if prefetch_images: - task_data_queryset = task_data_queryset.prefetch_related( + task_data_queryset = task_data_queryset.select_related('video').prefetch_related( Prefetch('images', queryset=models.Image.objects.order_by('frame')) ) From a0911035594654825cebcbf1a986b836483d4982 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Sun, 17 Nov 2024 23:00:53 +0300 Subject: [PATCH 05/17] prefetch_images=False by default --- cvat/apps/dataset_manager/task.py | 18 +++++++++++++----- cvat/apps/quality_control/quality_reports.py | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 3b0cfaf73980..34fa4bd125f2 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -116,7 +116,7 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, is_prefetched: bool = False, queryset: QuerySet = None, prefetch_images: bool = True): + def __init__(self, pk, *, is_prefetched: bool = False, queryset: QuerySet = None, prefetch_images: bool = False): if queryset is None: queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) @@ -1010,6 +1010,7 @@ def get_job_data(pk): return annotation.data + @silk_profile(name="POST job data") @transaction.atomic def put_job_data(pk, data): @@ -1018,11 +1019,12 @@ def put_job_data(pk, data): return annotation.data + @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic def patch_job_data(pk, data, action): - annotation = JobAnnotation(pk, prefetch_images=False) + annotation = JobAnnotation(pk) if action == PatchAction.CREATE: annotation.create(data) elif action == PatchAction.UPDATE: @@ -1032,12 +1034,14 @@ def patch_job_data(pk, data, action): return annotation.data + @silk_profile(name="DELETE job data") @transaction.atomic def delete_job_data(pk): - annotation = JobAnnotation(pk, prefetch_images=False) + annotation = JobAnnotation(pk) annotation.delete() + def export_job(job_id, dst_file, format_name, server_url=None, save_images=False): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. @@ -1045,13 +1049,14 @@ def export_job(job_id, dst_file, format_name, server_url=None, save_images=False # more dump request received at the same time: # https://github.com/cvat-ai/cvat/issues/217 with transaction.atomic(): - job = JobAnnotation(job_id) + job = JobAnnotation(job_id, prefetch_images=True) job.init_from_db() exporter = make_exporter(format_name) with open(dst_file, 'wb') as f: job.export(f, exporter, host=server_url, save_images=save_images) + @silk_profile(name="GET task data") @transaction.atomic def get_task_data(pk): @@ -1060,6 +1065,7 @@ def get_task_data(pk): return annotation.data + @silk_profile(name="POST task data") @transaction.atomic def put_task_data(pk, data): @@ -1068,6 +1074,7 @@ def put_task_data(pk, data): return annotation.data + @silk_profile(name="UPDATE task data") @transaction.atomic def patch_task_data(pk, data, action): @@ -1112,9 +1119,10 @@ def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: raise CvatImportError(str(ex)) + @transaction.atomic def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly): - job = JobAnnotation(job_id) + job = JobAnnotation(job_id, prefetch_images=True) importer = make_importer(format_name) with open(src_file, 'rb') as f: diff --git a/cvat/apps/quality_control/quality_reports.py b/cvat/apps/quality_control/quality_reports.py index 627c4dc7b978..9430a8c3cc2c 100644 --- a/cvat/apps/quality_control/quality_reports.py +++ b/cvat/apps/quality_control/quality_reports.py @@ -572,7 +572,7 @@ def add_prefetch_info(cls, queryset): @transaction.atomic def __init__(self, job_id: int, *, queryset=None, included_frames=None) -> None: self.job_id = job_id - self.job_annotation = JobAnnotation(job_id, queryset=queryset) + self.job_annotation = JobAnnotation(job_id, queryset=queryset, prefetch_images=True) self.job_annotation.init_from_db() self.job_data = JobData( annotation_ir=self.job_annotation.ir_data, From ccc7be959ad32bb7b25c00b6dabd6ce8795910ac Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Mon, 18 Nov 2024 11:00:24 +0300 Subject: [PATCH 06/17] Update changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md Co-authored-by: Maria Khrustaleva --- .../20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md index 512c4b1cd5a6..8367a870fed0 100644 --- a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md +++ b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md @@ -1,4 +1,4 @@ ### Fixed -- Optimized memory consumption when importing annotations to a task with a lot of jobs and images +- Optimized memory consumption and reduced the number of database queries when importing annotations to a task with a lot of jobs and images () From 9f2d2f29cf181969df53bbecf36b35f7440a264f Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Mon, 18 Nov 2024 12:18:12 +0300 Subject: [PATCH 07/17] fix changelog entry --- .../20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md index 8367a870fed0..8efcd99d7bf8 100644 --- a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md +++ b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md @@ -1,4 +1,5 @@ ### Fixed -- Optimized memory consumption and reduced the number of database queries when importing annotations to a task with a lot of jobs and images +- Optimized memory consumption and reduced the number of database queries + when importing annotations to a task with a lot of jobs and images () From a008601a6251691928dbfa88b91bfdcae448f94f Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Mon, 18 Nov 2024 12:20:22 +0300 Subject: [PATCH 08/17] removing is_prefetched option --- cvat/apps/dataset_manager/task.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 34fa4bd125f2..7e1d4a14c233 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -116,16 +116,11 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, is_prefetched: bool = False, queryset: QuerySet = None, prefetch_images: bool = False): + def __init__(self, pk, *, queryset: QuerySet | None = None, prefetch_images: bool = False): if queryset is None: queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) - if is_prefetched: - self.db_job: models.Job = queryset.select_related( - 'segment__task' - ).select_for_update().get(id=pk) - else: - self.db_job: models.Job = get_cached(queryset, pk=int(pk)) + self.db_job: models.Job = get_cached(queryset, pk=int(pk)) db_segment = self.db_job.segment self.start_frame = db_segment.start_frame @@ -951,7 +946,7 @@ def init_from_db(self): ): continue - gt_annotation = JobAnnotation(db_job.id, is_prefetched=True) + gt_annotation = JobAnnotation(db_job.id) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version From 0dad09b0b1d130d9bb289323c88be9facd514443 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Thu, 21 Nov 2024 12:31:16 +0300 Subject: [PATCH 09/17] returning is_prefetched option with the other name --- cvat/apps/dataset_manager/task.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 7e1d4a14c233..0ba3fabc6f38 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -116,10 +116,13 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, queryset: QuerySet | None = None, prefetch_images: bool = False): + def __init__(self, pk, *, lock_job_in_db: bool = False, queryset: QuerySet | None = None, prefetch_images: bool = False): if queryset is None: queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) + if lock_job_in_db: + queryset = queryset.select_for_update() + self.db_job: models.Job = get_cached(queryset, pk=int(pk)) db_segment = self.db_job.segment @@ -946,7 +949,7 @@ def init_from_db(self): ): continue - gt_annotation = JobAnnotation(db_job.id) + gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version From fa45afa734825770afa26333ff16a1d63a589baf Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Mon, 25 Nov 2024 14:06:53 +0300 Subject: [PATCH 10/17] sharing db_task from TaskAnnotation to JobAnnotation --- cvat/apps/dataset_manager/task.py | 68 ++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 0ba3fabc6f38..1d2cec7f50b2 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -116,14 +116,27 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, lock_job_in_db: bool = False, queryset: QuerySet | None = None, prefetch_images: bool = False): + def __init__( + self, + pk, + *, + lock_job_in_db: bool = False, + queryset: QuerySet | None = None, + prefetch_images: bool = False, + db_task: models.Task | None = None + ): if queryset is None: - queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) + if db_task is None: + queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) + else: + queryset = models.Job.objects.select_related("segment", "segment__task") if lock_job_in_db: queryset = queryset.select_for_update() self.db_job: models.Job = get_cached(queryset, pk=int(pk)) + if db_task is not None: + self.db_job.segment.task = db_task db_segment = self.db_job.segment self.start_frame = db_segment.start_frame @@ -788,11 +801,36 @@ def import_annotations(self, src_file, importer, **options): self.create(job_data.data.slice(self.start_frame, self.stop_frame).serialize()) + class TaskAnnotation: + @classmethod + def add_prefetch_info(cls, queryset: QuerySet): + assert issubclass(queryset.model, models.Task) + + label_qs = add_prefetch_fields(models.Label.objects.all(), [ + 'skeleton', + 'parent', + 'attributespec_set', + ]) + label_qs = TaskData.add_prefetch_info(label_qs) + + return queryset.prefetch_related( + 'project', + 'owner', + 'assignee', + 'project__owner', + 'project__assignee', + + Prefetch('data', queryset=models.Data.objects.select_related('video').prefetch_related( + Prefetch('images', queryset=models.Image.objects.order_by('frame')) + )), + + Prefetch('label_set', queryset=label_qs), + Prefetch('project__label_set', queryset=label_qs), + ) + def __init__(self, pk): - self.db_task = models.Task.objects.prefetch_related( - Prefetch('data__images', queryset=models.Image.objects.order_by('frame')) - ).get(id=pk) + self.db_task = self.add_prefetch_info(models.Task.objects).get(id=pk) requested_job_types = [models.JobType.ANNOTATION] if self.db_task.data.validation_mode == models.ValidationMode.GT_POOL: @@ -828,9 +866,9 @@ def _patch_data(self, data: Union[AnnotationIR, dict], action: Optional[PatchAct for jid, job_data in splitted_data.items(): data = AnnotationIR(self.db_task.dimension) if action is None: - data.data = put_job_data(jid, job_data) + data.data = put_job_data(jid, job_data, db_task=self.db_task) else: - data.data = patch_job_data(jid, job_data, action) + data.data = patch_job_data(jid, job_data, action, db_task=self.db_task) if data.version > self.ir_data.version: self.ir_data.version = data.version @@ -938,7 +976,7 @@ def delete(self, data=None): self._patch_data(data, PatchAction.DELETE) else: for db_job in self.db_jobs: - delete_job_data(db_job.id) + delete_job_data(db_job.id, db_task=self.db_task) def init_from_db(self): self.reset() @@ -949,7 +987,7 @@ def init_from_db(self): ): continue - gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True) + gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True, db_task=self.db_task) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version @@ -1011,8 +1049,8 @@ def get_job_data(pk): @silk_profile(name="POST job data") @transaction.atomic -def put_job_data(pk, data): - annotation = JobAnnotation(pk) +def put_job_data(pk, data, db_task: models.Task | None = None): + annotation = JobAnnotation(pk, db_task=db_task) annotation.put(data) return annotation.data @@ -1021,8 +1059,8 @@ def put_job_data(pk, data): @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic -def patch_job_data(pk, data, action): - annotation = JobAnnotation(pk) +def patch_job_data(pk, data, action, db_task: models.Task | None = None): + annotation = JobAnnotation(pk, db_task=db_task) if action == PatchAction.CREATE: annotation.create(data) elif action == PatchAction.UPDATE: @@ -1035,8 +1073,8 @@ def patch_job_data(pk, data, action): @silk_profile(name="DELETE job data") @transaction.atomic -def delete_job_data(pk): - annotation = JobAnnotation(pk) +def delete_job_data(pk, db_task: models.Task | None = None): + annotation = JobAnnotation(pk, db_task=db_task) annotation.delete() From 5d8fe3ed3257068aa83470884158695f72fcc081 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Tue, 26 Nov 2024 15:43:17 +0300 Subject: [PATCH 11/17] Update cvat/apps/dataset_manager/task.py Co-authored-by: Maxim Zhiltsov --- cvat/apps/dataset_manager/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 1d2cec7f50b2..7cebaf235d7d 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -1085,7 +1085,7 @@ def export_job(job_id, dst_file, format_name, server_url=None, save_images=False # more dump request received at the same time: # https://github.com/cvat-ai/cvat/issues/217 with transaction.atomic(): - job = JobAnnotation(job_id, prefetch_images=True) + job = JobAnnotation(job_id, prefetch_images=True, lock_job_in_db=True) job.init_from_db() exporter = make_exporter(format_name) From 46ae39f86a4c5e940597e2ad80ba36d9606dc248 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Tue, 26 Nov 2024 15:46:46 +0300 Subject: [PATCH 12/17] not prefetching project owner/assignee --- cvat/apps/dataset_manager/task.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 7cebaf235d7d..787148887187 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -107,8 +107,6 @@ def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): 'segment__task__project', 'segment__task__owner', 'segment__task__assignee', - 'segment__task__project__owner', - 'segment__task__project__assignee', Prefetch('segment__task__data', queryset=task_data_queryset), @@ -818,8 +816,6 @@ def add_prefetch_info(cls, queryset: QuerySet): 'project', 'owner', 'assignee', - 'project__owner', - 'project__assignee', Prefetch('data', queryset=models.Data.objects.select_related('video').prefetch_related( Prefetch('images', queryset=models.Image.objects.order_by('frame')) @@ -1124,12 +1120,14 @@ def patch_task_data(pk, data, action): return annotation.data + @silk_profile(name="DELETE task data") @transaction.atomic def delete_task_data(pk): annotation = TaskAnnotation(pk) annotation.delete() + def export_task(task_id, dst_file, format_name, server_url=None, save_images=False): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. @@ -1144,6 +1142,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal with open(dst_file, 'wb') as f: task.export(f, exporter, host=server_url, save_images=save_images) + @transaction.atomic def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): task = TaskAnnotation(task_id) From 2c84472105bd6cba0b8ee420fa966023281e8d44 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Tue, 26 Nov 2024 21:23:19 +0300 Subject: [PATCH 13/17] fixing jobs prefetching --- cvat/apps/dataset_manager/bindings.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cvat/apps/dataset_manager/bindings.py b/cvat/apps/dataset_manager/bindings.py index 1c70520a7090..c49c4e51089c 100644 --- a/cvat/apps/dataset_manager/bindings.py +++ b/cvat/apps/dataset_manager/bindings.py @@ -24,7 +24,7 @@ from datumaro.components.environment import Environment from datumaro.components.extractor import Importer from datumaro.components.format_detection import RejectionReason -from django.db.models import QuerySet +from django.db.models import Prefetch, QuerySet from django.utils import timezone from django.conf import settings @@ -861,7 +861,9 @@ def __init__(self, annotation_ir: AnnotationIR, db_task: Task, **kwargs): @staticmethod def meta_for_task(db_task, host, label_mapping=None): - db_segments = db_task.segment_set.all().prefetch_related('job_set') + db_segments = db_task.segment_set.all().prefetch_related( + Prefetch('job_set', models.Job.objects.order_by("pk")) + ) meta = OrderedDict([ ("id", str(db_task.id)), From a1994e5c8c5935323ac9063d08edfad49aea2315 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Wed, 27 Nov 2024 01:23:35 +0300 Subject: [PATCH 14/17] querying jobs for JobAnnotation in TaskAnnotation --- cvat/apps/dataset_manager/task.py | 84 ++++++++------------ cvat/apps/quality_control/quality_reports.py | 2 +- 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 787148887187..9b7e8e01dd84 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -121,20 +121,18 @@ def __init__( lock_job_in_db: bool = False, queryset: QuerySet | None = None, prefetch_images: bool = False, - db_task: models.Task | None = None + db_job: models.Job | None = None ): - if queryset is None: - if db_task is None: + if db_job is None: + if queryset is None: queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) - else: - queryset = models.Job.objects.select_related("segment", "segment__task") - if lock_job_in_db: - queryset = queryset.select_for_update() + if lock_job_in_db: + queryset = queryset.select_for_update() - self.db_job: models.Job = get_cached(queryset, pk=int(pk)) - if db_task is not None: - self.db_job.segment.task = db_task + self.db_job: models.Job = get_cached(queryset, pk=int(pk)) + else: + self.db_job: models.Job = db_job db_segment = self.db_job.segment self.start_frame = db_segment.start_frame @@ -801,42 +799,22 @@ def import_annotations(self, src_file, importer, **options): class TaskAnnotation: - @classmethod - def add_prefetch_info(cls, queryset: QuerySet): - assert issubclass(queryset.model, models.Task) - - label_qs = add_prefetch_fields(models.Label.objects.all(), [ - 'skeleton', - 'parent', - 'attributespec_set', - ]) - label_qs = TaskData.add_prefetch_info(label_qs) - - return queryset.prefetch_related( - 'project', - 'owner', - 'assignee', - - Prefetch('data', queryset=models.Data.objects.select_related('video').prefetch_related( - Prefetch('images', queryset=models.Image.objects.order_by('frame')) - )), - - Prefetch('label_set', queryset=label_qs), - Prefetch('project__label_set', queryset=label_qs), - ) - - def __init__(self, pk): - self.db_task = self.add_prefetch_info(models.Task.objects).get(id=pk) + def __init__(self, pk, lock_jobs_in_db: bool = False): + db_task = models.Task.objects.select_related('data').get(id=pk) requested_job_types = [models.JobType.ANNOTATION] - if self.db_task.data.validation_mode == models.ValidationMode.GT_POOL: + if db_task.data.validation_mode == models.ValidationMode.GT_POOL: requested_job_types.append(models.JobType.GROUND_TRUTH) - self.db_jobs = ( - models.Job.objects - .select_related("segment") + db_jobs_queryset = ( + JobAnnotation.add_prefetch_info(models.Job.objects) .filter(segment__task_id=pk, type__in=requested_job_types) ) + if lock_jobs_in_db: + db_jobs_queryset = db_jobs_queryset.select_for_update() + + self.db_jobs = list(db_jobs_queryset) + self.db_task = self.db_jobs[0].segment.task self.ir_data = AnnotationIR(self.db_task.dimension) @@ -857,14 +835,14 @@ def _patch_data(self, data: Union[AnnotationIR, dict], action: Optional[PatchAct start = db_job.segment.start_frame stop = db_job.segment.stop_frame jobs[jid] = { "start": start, "stop": stop } - splitted_data[jid] = data.slice(start, stop) + splitted_data[jid] = (data.slice(start, stop), db_job) - for jid, job_data in splitted_data.items(): + for jid, (job_data, db_job) in splitted_data.items(): data = AnnotationIR(self.db_task.dimension) if action is None: - data.data = put_job_data(jid, job_data, db_task=self.db_task) + data.data = put_job_data(jid, job_data, db_job=db_job) else: - data.data = patch_job_data(jid, job_data, action, db_task=self.db_task) + data.data = patch_job_data(jid, job_data, action, db_job=db_job) if data.version > self.ir_data.version: self.ir_data.version = data.version @@ -972,7 +950,7 @@ def delete(self, data=None): self._patch_data(data, PatchAction.DELETE) else: for db_job in self.db_jobs: - delete_job_data(db_job.id, db_task=self.db_task) + delete_job_data(db_job.id, db_job=db_job) def init_from_db(self): self.reset() @@ -983,7 +961,7 @@ def init_from_db(self): ): continue - gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True, db_task=self.db_task) + gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True, db_job=db_job) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version @@ -1045,8 +1023,8 @@ def get_job_data(pk): @silk_profile(name="POST job data") @transaction.atomic -def put_job_data(pk, data, db_task: models.Task | None = None): - annotation = JobAnnotation(pk, db_task=db_task) +def put_job_data(pk, data, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) annotation.put(data) return annotation.data @@ -1055,8 +1033,8 @@ def put_job_data(pk, data, db_task: models.Task | None = None): @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic -def patch_job_data(pk, data, action, db_task: models.Task | None = None): - annotation = JobAnnotation(pk, db_task=db_task) +def patch_job_data(pk, data, action, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) if action == PatchAction.CREATE: annotation.create(data) elif action == PatchAction.UPDATE: @@ -1069,8 +1047,8 @@ def patch_job_data(pk, data, action, db_task: models.Task | None = None): @silk_profile(name="DELETE job data") @transaction.atomic -def delete_job_data(pk, db_task: models.Task | None = None): - annotation = JobAnnotation(pk, db_task=db_task) +def delete_job_data(pk, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) annotation.delete() @@ -1135,7 +1113,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal # more dump request received at the same time: # https://github.com/cvat-ai/cvat/issues/217 with transaction.atomic(): - task = TaskAnnotation(task_id) + task = TaskAnnotation(task_id, lock_jobs_in_db=True) task.init_from_db() exporter = make_exporter(format_name) diff --git a/cvat/apps/quality_control/quality_reports.py b/cvat/apps/quality_control/quality_reports.py index 9430a8c3cc2c..627c4dc7b978 100644 --- a/cvat/apps/quality_control/quality_reports.py +++ b/cvat/apps/quality_control/quality_reports.py @@ -572,7 +572,7 @@ def add_prefetch_info(cls, queryset): @transaction.atomic def __init__(self, job_id: int, *, queryset=None, included_frames=None) -> None: self.job_id = job_id - self.job_annotation = JobAnnotation(job_id, queryset=queryset, prefetch_images=True) + self.job_annotation = JobAnnotation(job_id, queryset=queryset) self.job_annotation.init_from_db() self.job_data = JobData( annotation_ir=self.job_annotation.ir_data, From 8a247183882ac4495f85b3d2119e35a8678f76eb Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Wed, 27 Nov 2024 10:39:15 +0300 Subject: [PATCH 15/17] type annotations --- cvat/apps/dataset_manager/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 9b7e8e01dd84..625677a7adc4 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -1023,7 +1023,7 @@ def get_job_data(pk): @silk_profile(name="POST job data") @transaction.atomic -def put_job_data(pk, data, db_job: models.Job | None = None): +def put_job_data(pk, data: AnnotationIR | dict, db_job: models.Job | None = None): annotation = JobAnnotation(pk, db_job=db_job) annotation.put(data) @@ -1033,7 +1033,7 @@ def put_job_data(pk, data, db_job: models.Job | None = None): @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic -def patch_job_data(pk, data, action, db_job: models.Job | None = None): +def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, db_job: models.Job | None = None): annotation = JobAnnotation(pk, db_job=db_job) if action == PatchAction.CREATE: annotation.create(data) From 0c9388fa8650e6d4686beeabaa8a486319275a33 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Wed, 27 Nov 2024 11:15:50 +0300 Subject: [PATCH 16/17] locking jobs in TaskAnnotation.init_from_db --- cvat/apps/dataset_manager/task.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 625677a7adc4..b1302cfc5164 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -123,6 +123,9 @@ def __init__( prefetch_images: bool = False, db_job: models.Job | None = None ): + assert db_job is None or lock_job_in_db is False + assert (db_job is None and queryset is None) or prefetch_images is False + assert db_job is None or queryset is None if db_job is None: if queryset is None: queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) @@ -799,22 +802,19 @@ def import_annotations(self, src_file, importer, **options): class TaskAnnotation: - def __init__(self, pk, lock_jobs_in_db: bool = False): - db_task = models.Task.objects.select_related('data').get(id=pk) + def __init__(self, pk): + self.db_task = models.Task.objects.prefetch_related( + Prefetch('data__images', queryset=models.Image.objects.order_by('frame')) + ).get(id=pk) requested_job_types = [models.JobType.ANNOTATION] - if db_task.data.validation_mode == models.ValidationMode.GT_POOL: + if self.db_task.data.validation_mode == models.ValidationMode.GT_POOL: requested_job_types.append(models.JobType.GROUND_TRUTH) - db_jobs_queryset = ( - JobAnnotation.add_prefetch_info(models.Job.objects) + self.db_jobs = ( + JobAnnotation.add_prefetch_info(models.Job.objects, prefetch_images=False) .filter(segment__task_id=pk, type__in=requested_job_types) ) - if lock_jobs_in_db: - db_jobs_queryset = db_jobs_queryset.select_for_update() - - self.db_jobs = list(db_jobs_queryset) - self.db_task = self.db_jobs[0].segment.task self.ir_data = AnnotationIR(self.db_task.dimension) @@ -955,13 +955,13 @@ def delete(self, data=None): def init_from_db(self): self.reset() - for db_job in self.db_jobs: + for db_job in self.db_jobs.select_for_update(): if db_job.type == models.JobType.GROUND_TRUTH and not ( self.db_task.data.validation_mode == models.ValidationMode.GT_POOL ): continue - gt_annotation = JobAnnotation(db_job.id, lock_job_in_db=True, db_job=db_job) + gt_annotation = JobAnnotation(db_job.id, db_job=db_job) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version @@ -1113,7 +1113,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal # more dump request received at the same time: # https://github.com/cvat-ai/cvat/issues/217 with transaction.atomic(): - task = TaskAnnotation(task_id, lock_jobs_in_db=True) + task = TaskAnnotation(task_id) task.init_from_db() exporter = make_exporter(format_name) From 021de6e9bf6e009f61061a6e6e89fee311e7b3b1 Mon Sep 17 00:00:00 2001 From: Dmitrii Lavrukhin Date: Wed, 27 Nov 2024 12:16:41 +0300 Subject: [PATCH 17/17] making db_job params kw only --- cvat/apps/dataset_manager/task.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index b1302cfc5164..45f1eaff4e8e 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -1023,7 +1023,7 @@ def get_job_data(pk): @silk_profile(name="POST job data") @transaction.atomic -def put_job_data(pk, data: AnnotationIR | dict, db_job: models.Job | None = None): +def put_job_data(pk, data: AnnotationIR | dict, *, db_job: models.Job | None = None): annotation = JobAnnotation(pk, db_job=db_job) annotation.put(data) @@ -1033,7 +1033,7 @@ def put_job_data(pk, data: AnnotationIR | dict, db_job: models.Job | None = None @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic -def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, db_job: models.Job | None = None): +def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, *, db_job: models.Job | None = None): annotation = JobAnnotation(pk, db_job=db_job) if action == PatchAction.CREATE: annotation.create(data) @@ -1047,7 +1047,7 @@ def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, db_job: m @silk_profile(name="DELETE job data") @transaction.atomic -def delete_job_data(pk, db_job: models.Job | None = None): +def delete_job_data(pk, *, db_job: models.Job | None = None): annotation = JobAnnotation(pk, db_job=db_job) annotation.delete()