Skip to content

Commit

Permalink
Fixed several issues:
Browse files Browse the repository at this point in the history
- fixed outdated data in the response when updating disabled_frames of the task validation layout
- reduced number of requests to db
- delete outdated cache chunks by batches
  • Loading branch information
Marishka17 committed Nov 13, 2024
1 parent 1e7ff33 commit 044daa6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 16 deletions.
3 changes: 3 additions & 0 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ def remove_segment_chunk(
self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality)
)

def bulk_delete(self, keys: Sequence[str]) -> None:
self._cache.delete_many(keys)

def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]:
return self._to_data_with_mime(self._get_cache_item(self._make_preview_key(db_storage)))

Expand Down
1 change: 1 addition & 0 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ class Task(TimestampedModel):
blank=True, on_delete=models.SET_NULL, related_name='+')

segment_set: models.manager.RelatedManager[Segment]
project_id: int | None

# Extend default permission model
class Meta:
Expand Down
66 changes: 50 additions & 16 deletions cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from cvat.apps.engine.permissions import TaskPermission
from cvat.apps.engine.utils import parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse
from cvat.apps.engine.rq_job_handler import RQJobMetaField, RQId
from django.db.models import Q
from datumaro.util import take_by

from drf_spectacular.utils import OpenApiExample, extend_schema_field, extend_schema_serializer

Expand Down Expand Up @@ -978,6 +980,12 @@ def validate(self, attrs):

return super().validate(attrs)

def __init__(self, instance: models.Job | None = None, data: dict | None = None, touch_related: bool = True, *args, **kwargs):
# Touch related objects' updated_date only when updating
# directly validation layout for a single job (PATCH /api/jobs/id/validation_layout)
self._touch_related = touch_related
super().__init__(instance, data, *args, **kwargs)

@transaction.atomic
def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job:
from cvat.apps.engine.cache import MediaCache
Expand Down Expand Up @@ -1139,6 +1147,8 @@ def _to_rel_frame(abs_frame: int) -> int:
segment_frame_map = dict(zip(segment_honeypots, requested_frames))

media_cache = MediaCache()
cache_keys_to_delete = []

for chunk_id in sorted(updated_segment_chunk_ids):
chunk_frames = segment_frames[
chunk_id * db_data.chunk_size :
Expand Down Expand Up @@ -1173,9 +1183,15 @@ def _iterate_chunk_frames():
f.write(chunk.getvalue())

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM:
# FUTURE-TODO: long-term operation should be done in a background process
_write_updated_static_chunk()

media_cache.remove_segment_chunk(db_segment, chunk_id, quality=quality)
# media_cache.remove_segment_chunk(db_segment, chunk_id, quality=quality)
cache_keys_to_delete.append(media_cache._make_chunk_key(db_segment, chunk_id, quality=quality))


for batch_to_delete in take_by(cache_keys_to_delete, 1000):
media_cache.bulk_delete(batch_to_delete)

db_segment.chunks_updated_date = timezone.now()
db_segment.save(update_fields=['chunks_updated_date'])
Expand All @@ -1192,12 +1208,14 @@ def _iterate_chunk_frames():
db_data.save(update_fields=['deleted_frames'])

db_job.touch()
db_segment.job_set.exclude(id=db_job.id).update(updated_date=timezone.now())
db_task.touch()
if db_task.project:
db_task.project.touch()

return instance
if self._touch_related:
db_segment.job_set.exclude(id=db_job.id).update(updated_date=timezone.now())
db_task.touch()
if db_task.project_id is not None:
db_task.project.touch()

return db_job

class JobValidationLayoutReadSerializer(serializers.Serializer):
honeypot_count = serializers.IntegerField(min_value=0, required=False)
Expand Down Expand Up @@ -1310,10 +1328,11 @@ def update(self, instance: models.Task, validated_data: dict[str, Any]) -> model
)

gt_job_meta_serializer = JobDataMetaWriteSerializer(instance.gt_job, {
"deleted_frames": requested_disabled_frames
})
"deleted_frames": requested_disabled_frames,
}, touch_parents=False)
gt_job_meta_serializer.is_valid(raise_exception=True)
gt_job_meta_serializer.save()
updated_gt_job = gt_job_meta_serializer.save()
instance = updated_gt_job.segment.task

frame_selection_method = validated_data.get('frame_selection_method')
if frame_selection_method and not (
Expand Down Expand Up @@ -1343,12 +1362,13 @@ def update(self, instance: models.Task, validated_data: dict[str, Any]) -> model
)

if frame_selection_method:
for db_job in (
job_queryset = (
models.Job.objects.select_related("segment")
.filter(segment__task_id=instance.id, type=models.JobType.ANNOTATION)
.filter(Q(stage=models.StageChoice.ANNOTATION) & Q(state=models.StateChoice.NEW))
.order_by("segment__start_frame")
.all()
):
)
for db_job in job_queryset.all():
job_serializer_params = {
'frame_selection_method': frame_selection_method
}
Expand All @@ -1363,12 +1383,21 @@ def update(self, instance: models.Task, validated_data: dict[str, Any]) -> model
if abs_frame in segment_frame_set
]

# do not touch related objects' updated_date (task, project, segments)
# after each job update
job_validation_layout_serializer = JobValidationLayoutWriteSerializer(
db_job, job_serializer_params
db_job, job_serializer_params, touch_related=False
)
job_validation_layout_serializer.is_valid(raise_exception=True)
job_validation_layout_serializer.save()

job_queryset.update(updated_date=timezone.now())

# update task and project updated_date once at the end of the operation
instance.touch()
if instance.project_id is not None:
instance.project.touch()

return instance

class TaskValidationLayoutReadSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -2388,6 +2417,10 @@ class Meta:
model = models.Job
fields = ('deleted_frames',)

def __init__(self, instance: models.Job | None = None, data: dict | None = None, touch_parents: bool = True, *args, **kwargs) -> None:
self._touch_parents = touch_parents
super().__init__(instance, data, *args, **kwargs)

@transaction.atomic
def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job:
db_segment = instance.segment
Expand Down Expand Up @@ -2455,9 +2488,10 @@ def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models
db_data.deleted_frames = updated_deleted_task_frames
db_data.save(update_fields=['deleted_frames'])

db_task.touch()
if db_task.project:
db_task.project.touch()
if self._touch_parents:
db_task.touch()
if db_task.project_id is not None:
db_task.project.touch()

return instance

Expand Down

0 comments on commit 044daa6

Please sign in to comment.