From 5cda8840291fde7758f721b299966945085c065c Mon Sep 17 00:00:00 2001 From: vorozhkog Date: Sat, 30 Nov 2024 21:00:46 +0000 Subject: [PATCH] add async methods for videos and pcd projects --- dev_requirements.txt | 2 +- local.env | 10 +- src/main.py | 259 +++++++++++++++++++++++++++---------------- 3 files changed, 170 insertions(+), 101 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index b3a3959..83c5a6a 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1 +1 @@ -supervisely==6.73.162 +git+https://github.com/supervisely/supervisely.git@project-async-updates diff --git a/local.env b/local.env index 64a1d89..76648b7 100644 --- a/local.env +++ b/local.env @@ -1,3 +1,7 @@ -TEAM_ID=448 -WORKSPACE_ID=690 -PROJECT_ID=35637 \ No newline at end of file +# TEAM_ID=448 +# WORKSPACE_ID=690 +# PROJECT_ID=35637 + +TEAM_ID = 431 +WORKSPACE_ID = 1019 +PROJECT_ID = 40721 \ No newline at end of file diff --git a/src/main.py b/src/main.py index 5b3a2da..c331c45 100644 --- a/src/main.py +++ b/src/main.py @@ -30,9 +30,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.elapsed = self.end - self.start msg = self.message or f"Block execution" if self.items_cnt is not None: - log_msg = f"{msg} time: {self.elapsed:.4f} seconds per {self.items_cnt} images ({self.elapsed/self.items_cnt:.4f} seconds per image)" + log_msg = f"{msg} time: {self.elapsed:.3f} seconds per {self.items_cnt} item ({self.elapsed/self.items_cnt:.3f} seconds per item)" else: - log_msg = f"{msg} time: {self.elapsed:.4f} seconds" + log_msg = f"{msg} time: {self.elapsed:.3f} seconds" sly.logger.info(log_msg) if sly.is_development(): @@ -62,7 +62,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): os.environ["modal.state.items"] except KeyError: sly.logger.warn( - "The option to download project is not selected, project will be download with items" + "The option to download items is not selected, project will be downloaded with items" ) DOWNLOAD_ITEMS = True else: @@ -98,11 +98,6 @@ def export_only_labeled_items(api: sly.Api): sly.fs.mkdir(RESULT_DIR, True) sly.logger.info("Export folder has been created") - if api.server_address.startswith("https://"): - semaphore = asyncio.Semaphore(10) - else: - semaphore = None - if project.type == str(sly.ProjectType.IMAGES): project_fs = Project(RESULT_DIR, OpenMode.CREATE) project_fs.set_meta(meta) @@ -110,6 +105,7 @@ def export_only_labeled_items(api: sly.Api): dataset_path = sly.Dataset._get_dataset_path(dataset_info.name, parents) dataset_id = dataset_info.id dataset_fs = project_fs.create_dataset(dataset_info.name, dataset_path) + images = api.image.get_list(dataset_id) total_items_cnt = len(images) @@ -119,14 +115,17 @@ def export_only_labeled_items(api: sly.Api): img_names = [info.name for info in images] try: ann_progress = Progress("Downloading annotations...", total_items_cnt, min_report_percent=10) + anns = [] with Timer("Annotation downloading", total_items_cnt): - coro = api.annotation.download_batch_async(dataset_id, ids, semaphore, progress_cb=ann_progress.iters_done_report) + coro = api.annotation.download_bulk_async( + dataset_id, ids, ann_progress.iters_done_report + ) loop = sly.utils.get_or_create_event_loop() if loop.is_running(): future = asyncio.run_coroutine_threadsafe(coro, loop) - anns = future.result() + anns.extend(future.result()) else: - loop.run_until_complete(coro) + anns.extend(loop.run_until_complete(coro)) ann_jsons = [ann_info.annotation for ann_info in anns] except Exception as e: sly.logger.warn( @@ -137,14 +136,17 @@ def export_only_labeled_items(api: sly.Api): if DOWNLOAD_ITEMS: try: image_progress = Progress("Downloading images...", total_items_cnt, min_report_percent=10) + img_bytes = [] with Timer("Image downloading", total_items_cnt): - coro = api.image.download_bytes_many_async(ids, semaphore, image_progress.iters_done_report) + coro = api.image.download_bytes_many_async( + ids, progress_cb=image_progress.iters_done_report + ) loop = sly.utils.get_or_create_event_loop() if loop.is_running(): future = asyncio.run_coroutine_threadsafe(coro, loop) - img_bytes = future.result() + img_bytes.extend(future.result()) else: - loop.run_until_complete(coro) + img_bytes.extend(loop.run_until_complete(coro)) for name, img_bytes, ann_json in zip(img_names, img_bytes, ann_jsons): ann = sly.Annotation.from_json(ann_json, meta) if ann.is_empty(): @@ -169,7 +171,7 @@ def export_only_labeled_items(api: sly.Api): ) if total_items_cnt == not_labeled_items_cnt: - sly.logger.warn( + sly.logger.warning( "There are no labeled items in dataset {}".format(dataset_info.name) ) else: @@ -182,52 +184,71 @@ def export_only_labeled_items(api: sly.Api): for dataset_info in api.dataset.get_list(project_id): dataset_fs = project_fs.create_dataset(dataset_info.name) videos = api.video.get_list(dataset_info.id) - labeled_items_cnt = 0 not_labeled_items_cnt = 0 - ds_progress = Progress( - "Downloading dataset: {}".format(dataset_info.name), - total_cnt=len(videos), - ) - for batch in sly.batched(videos, batch_size=10): - video_ids = [video_info.id for video_info in batch] - video_names = [video_info.name for video_info in batch] + total_items_cnt = len(videos) + + video_ids = [video_info.id for video_info in videos] + video_names = [video_info.name for video_info in videos] + try: + video_ann_progress = Progress( + "Downloading video annotations...", total_items_cnt, min_report_percent=10 + ) + with Timer("Video annotation downloading", total_items_cnt): + coro = api.video.annotation.download_bulk_async( + dataset_info.id, video_ids, progress_cb=video_ann_progress.iters_done_report + ) # not implemented yet + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.result() + else: + loop.run_until_complete(coro) + except Exception as e: + sly.logger.warn( + f"Can not download {len(video_ids)} annotations: {repr(e)}. Skip batch." + ) + continue + video_paths = [] + for i, (video_name, ann_json) in enumerate(zip(video_names, ann_jsons)): + video_ann = sly.VideoAnnotation.from_json(ann_json, meta, key_id_map) + if video_ann.is_empty(): + not_labeled_items_cnt += 1 + video_ids.pop(i) + continue + + video_path = None + if DOWNLOAD_ITEMS: + video_path = dataset_fs.generate_item_path(video_name) + video_paths.append(video_path) + dataset_fs.add_item_file( + video_name, video_path, ann=video_ann, _validate_item=False + ) + if len(video_paths) == len(video_ids): + progress = Progress("Downloading videos", len(video_ids)) try: - ann_jsons = api.video.annotation.download_bulk(dataset_info.id, video_ids) + with Timer("Video downloading", len(video_ids)): + coro = api.video.download_paths_async( + video_ids, video_paths, progress_cb=progress.iters_done_report + ) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.result() + else: + loop.run_until_complete(coro) except Exception as e: - sly.logger.warn( - f"Can not download {len(video_ids)} annotations: {repr(e)}. Skip batch." - ) - continue - for video_id, video_name, ann_json in zip(video_ids, video_names, ann_jsons): - video_ann = sly.VideoAnnotation.from_json(ann_json, meta, key_id_map) - if video_ann.is_empty(): - not_labeled_items_cnt += 1 - continue - video_file_path = None - labeled_items_cnt += 1 - if DOWNLOAD_ITEMS: - try: - video_file_path = dataset_fs.generate_item_path(video_name) - api.video.download_path(video_id, video_file_path) - except Exception as e: - sly.logger.warn( - f"Can not download video {video_name}: {repr(e)}. Skip video." - ) - continue - dataset_fs.add_item_file( - video_name, video_file_path, ann=video_ann, _validate_item=False + sly.logger.warning( + f"An error occured while downloading videos. Error: {repr(e)}" ) - ds_progress.iters_done_report(len(batch)) - sly.logger.info( - "In dataset {} {} items labeled, {} items not labeled".format( - dataset_info.name, labeled_items_cnt, not_labeled_items_cnt - ) - ) - if len(videos) == not_labeled_items_cnt: - sly.logger.warn( + if total_items_cnt == not_labeled_items_cnt: + sly.logger.warning( "There are no labeled items in dataset {}".format(dataset_info.name) ) + else: + sly.logger.info( + f"Dataset {dataset_info.name} has {total_items_cnt-not_labeled_items_cnt}/{total_items_cnt} items labeled" + ) project_fs.set_key_id_map(key_id_map) @@ -238,57 +259,101 @@ def export_only_labeled_items(api: sly.Api): for dataset_info in api.dataset.get_list(project_id): dataset_fs = project_fs.create_dataset(dataset_info.name) pointclouds = api.pointcloud.get_list(dataset_info.id) - labeled_items_cnt = 0 not_labeled_items_cnt = 0 - ds_progress = Progress( - "Downloading dataset: {!r}".format(dataset_info.name), - total_cnt=len(pointclouds), - ) - for batch in sly.batched(pointclouds, batch_size=1): - pointcloud_ids = [pointcloud_info.id for pointcloud_info in batch] - pointcloud_names = [pointcloud_info.name for pointcloud_info in batch] + total_items_cnt = len(pointclouds) - ann_jsons = api.pointcloud.annotation.download_bulk(dataset_info.id, pointcloud_ids) + pointcloud_ids = [pointcloud_info.id for pointcloud_info in pointclouds] + pointcloud_names = [pointcloud_info.name for pointcloud_info in pointclouds] + pcd_file_paths = [dataset_fs.generate_item_path(name) for name in pointcloud_names] - for pointcloud_id, pointcloud_name, ann_json in zip( - pointcloud_ids, pointcloud_names, ann_jsons - ): - pc_ann = sly.PointcloudAnnotation.from_json(ann_json, meta, key_id_map) - if pc_ann.is_empty(): - not_labeled_items_cnt += 1 - continue - pointcloud_file_path = dataset_fs.generate_item_path(pointcloud_name) - labeled_items_cnt += 1 - if DOWNLOAD_ITEMS: - api.pointcloud.download_path(pointcloud_id, pointcloud_file_path) - related_images_path = dataset_fs.get_related_images_path(pointcloud_name) - related_images = api.pointcloud.get_list_related_images(pointcloud_id) - for rimage_info in related_images: - name = rimage_info[ApiField.NAME] - rimage_id = rimage_info[ApiField.ID] - path_img = os.path.join(related_images_path, name) - path_json = os.path.join(related_images_path, name + ".json") - api.pointcloud.download_related_image(rimage_id, path_img) - dump_json_file(rimage_info, path_json) - - dataset_fs.add_item_file( - pointcloud_name, - pointcloud_file_path, - ann=pc_ann, - _validate_item=False, + anns_json = [] + try: + ann_progress = Progress( + "Downloading annotations...", total_items_cnt, min_report_percent=10 + ) + with Timer("Annotation downloading", total_items_cnt): + coro = api.pointcloud.annotation.download_bulk_async( # not implemented yet + dataset_id, ids, progress_cb=ann_progress.iters_done_report + ) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + anns_json.extend(future.result()) + else: + anns_json.extend(loop.run_until_complete(coro)) + except Exception as e: + sly.logger.warn( + f"Can not download {total_items_cnt} annotations from dataset {dataset_info.name}: {repr(e)}. Skip batch." + ) + continue + + if DOWNLOAD_ITEMS: + try: + with Timer("Pointcloud downloading", total_items_cnt): + coro = api.pointcloud.download_paths_async( + pointcloud_ids, + pcd_file_paths, + ) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.result() + else: + loop.run_until_complete(coro) + except Exception as e: + sly.logger.warning( + f"An error occured while downloading PCD items from dataset: {dataset_info.name}. Error: {repr(e)}" ) + continue - ds_progress.iters_done_report(len(batch)) - sly.logger.info( - "In dataset {} {} items labeled, {} items not labeled".format( - dataset_info.name, labeled_items_cnt, not_labeled_items_cnt + rimage_paths = [] + rimage_ids = [] + for pcd_id, pcd_name in zip(pointcloud_ids, pointcloud_names): + rimage_path = dataset_fs.get_related_images_path(pcd_name) + rimage_info = api.pointcloud.get_list_related_images(pcd_id) + name = rimage_info[ApiField.NAME] + rimage_ids.append(rimage_info[ApiField.ID]) + rimage_paths.append(os.path.join(rimage_path, name)) + path_json = os.path.join(rimage_path, name + ".json") + dump_json_file(rimage_info, path_json) + try: + with Timer("Related image downloading", len(rimage_ids)): + coro = api.pointcloud.download_related_images_async( + rimage_ids, rimage_paths + ) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.result() + else: + loop.run_until_complete(coro) + except Exception as e: + sly.logger.warning( + f"An error occured while downloading PCD related images. Error: {repr(e)}" + ) + continue + + for pcd_path, pointcloud_name, ann_json in zip( + pcd_file_paths, pointcloud_names, anns_json + ): + pc_ann = sly.PointcloudAnnotation.from_json(ann_json, meta, key_id_map) + if pc_ann.is_empty(): + not_labeled_items_cnt += 1 + continue + dataset_fs.add_item_file( + pointcloud_name, + pcd_path, + ann=pc_ann, + _validate_item=False, ) - ) - if len(pointclouds) == not_labeled_items_cnt: - sly.logger.warn( + if total_items_cnt == not_labeled_items_cnt: + sly.logger.warning( "There are no labeled items in dataset {}".format(dataset_info.name) ) - + else: + sly.logger.info( + f"Dataset {dataset_info.name} has {total_items_cnt-not_labeled_items_cnt}/{total_items_cnt} items labeled" + ) project_fs.set_key_id_map(key_id_map) dir_size = sly.fs.get_directory_size(RESULT_PROJECT_DIR)