Skip to content

Commit

Permalink
add async methods for videos and pcd projects
Browse files Browse the repository at this point in the history
  • Loading branch information
vorozhkog committed Nov 30, 2024
1 parent 4417cb7 commit 5cda884
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 101 deletions.
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
supervisely==6.73.162
git+https://github.com/supervisely/supervisely.git@project-async-updates
10 changes: 7 additions & 3 deletions local.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
TEAM_ID=448
WORKSPACE_ID=690
PROJECT_ID=35637
# TEAM_ID=448
# WORKSPACE_ID=690
# PROJECT_ID=35637

TEAM_ID = 431
WORKSPACE_ID = 1019
PROJECT_ID = 40721
259 changes: 162 additions & 97 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = self.end - self.start
msg = self.message or f"Block execution"
if self.items_cnt is not None:
log_msg = f"{msg} time: {self.elapsed:.4f} seconds per {self.items_cnt} images ({self.elapsed/self.items_cnt:.4f} seconds per image)"
log_msg = f"{msg} time: {self.elapsed:.3f} seconds per {self.items_cnt} item ({self.elapsed/self.items_cnt:.3f} seconds per item)"
else:
log_msg = f"{msg} time: {self.elapsed:.4f} seconds"
log_msg = f"{msg} time: {self.elapsed:.3f} seconds"
sly.logger.info(log_msg)

if sly.is_development():
Expand Down Expand Up @@ -62,7 +62,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
os.environ["modal.state.items"]
except KeyError:
sly.logger.warn(
"The option to download project is not selected, project will be download with items"
"The option to download items is not selected, project will be downloaded with items"
)
DOWNLOAD_ITEMS = True
else:
Expand Down Expand Up @@ -98,18 +98,14 @@ def export_only_labeled_items(api: sly.Api):
sly.fs.mkdir(RESULT_DIR, True)
sly.logger.info("Export folder has been created")

if api.server_address.startswith("https://"):
semaphore = asyncio.Semaphore(10)
else:
semaphore = None

if project.type == str(sly.ProjectType.IMAGES):
project_fs = Project(RESULT_DIR, OpenMode.CREATE)
project_fs.set_meta(meta)
for parents, dataset_info in api.dataset.tree(project_id):
dataset_path = sly.Dataset._get_dataset_path(dataset_info.name, parents)
dataset_id = dataset_info.id
dataset_fs = project_fs.create_dataset(dataset_info.name, dataset_path)

images = api.image.get_list(dataset_id)

total_items_cnt = len(images)
Expand All @@ -119,14 +115,17 @@ def export_only_labeled_items(api: sly.Api):
img_names = [info.name for info in images]
try:
ann_progress = Progress("Downloading annotations...", total_items_cnt, min_report_percent=10)
anns = []
with Timer("Annotation downloading", total_items_cnt):
coro = api.annotation.download_batch_async(dataset_id, ids, semaphore, progress_cb=ann_progress.iters_done_report)
coro = api.annotation.download_bulk_async(
dataset_id, ids, ann_progress.iters_done_report
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
anns = future.result()
anns.extend(future.result())
else:
loop.run_until_complete(coro)
anns.extend(loop.run_until_complete(coro))
ann_jsons = [ann_info.annotation for ann_info in anns]
except Exception as e:
sly.logger.warn(
Expand All @@ -137,14 +136,17 @@ def export_only_labeled_items(api: sly.Api):
if DOWNLOAD_ITEMS:
try:
image_progress = Progress("Downloading images...", total_items_cnt, min_report_percent=10)
img_bytes = []
with Timer("Image downloading", total_items_cnt):
coro = api.image.download_bytes_many_async(ids, semaphore, image_progress.iters_done_report)
coro = api.image.download_bytes_many_async(
ids, progress_cb=image_progress.iters_done_report
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
img_bytes = future.result()
img_bytes.extend(future.result())
else:
loop.run_until_complete(coro)
img_bytes.extend(loop.run_until_complete(coro))
for name, img_bytes, ann_json in zip(img_names, img_bytes, ann_jsons):
ann = sly.Annotation.from_json(ann_json, meta)
if ann.is_empty():
Expand All @@ -169,7 +171,7 @@ def export_only_labeled_items(api: sly.Api):
)

if total_items_cnt == not_labeled_items_cnt:
sly.logger.warn(
sly.logger.warning(
"There are no labeled items in dataset {}".format(dataset_info.name)
)
else:
Expand All @@ -182,52 +184,71 @@ def export_only_labeled_items(api: sly.Api):
for dataset_info in api.dataset.get_list(project_id):
dataset_fs = project_fs.create_dataset(dataset_info.name)
videos = api.video.get_list(dataset_info.id)
labeled_items_cnt = 0
not_labeled_items_cnt = 0
ds_progress = Progress(
"Downloading dataset: {}".format(dataset_info.name),
total_cnt=len(videos),
)
for batch in sly.batched(videos, batch_size=10):
video_ids = [video_info.id for video_info in batch]
video_names = [video_info.name for video_info in batch]
total_items_cnt = len(videos)

video_ids = [video_info.id for video_info in videos]
video_names = [video_info.name for video_info in videos]
try:
video_ann_progress = Progress(
"Downloading video annotations...", total_items_cnt, min_report_percent=10
)
with Timer("Video annotation downloading", total_items_cnt):
coro = api.video.annotation.download_bulk_async(
dataset_info.id, video_ids, progress_cb=video_ann_progress.iters_done_report
) # not implemented yet
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
else:
loop.run_until_complete(coro)
except Exception as e:
sly.logger.warn(
f"Can not download {len(video_ids)} annotations: {repr(e)}. Skip batch."
)
continue
video_paths = []
for i, (video_name, ann_json) in enumerate(zip(video_names, ann_jsons)):
video_ann = sly.VideoAnnotation.from_json(ann_json, meta, key_id_map)
if video_ann.is_empty():
not_labeled_items_cnt += 1
video_ids.pop(i)
continue

video_path = None
if DOWNLOAD_ITEMS:
video_path = dataset_fs.generate_item_path(video_name)
video_paths.append(video_path)
dataset_fs.add_item_file(
video_name, video_path, ann=video_ann, _validate_item=False
)
if len(video_paths) == len(video_ids):
progress = Progress("Downloading videos", len(video_ids))
try:
ann_jsons = api.video.annotation.download_bulk(dataset_info.id, video_ids)
with Timer("Video downloading", len(video_ids)):
coro = api.video.download_paths_async(
video_ids, video_paths, progress_cb=progress.iters_done_report
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
else:
loop.run_until_complete(coro)
except Exception as e:
sly.logger.warn(
f"Can not download {len(video_ids)} annotations: {repr(e)}. Skip batch."
)
continue
for video_id, video_name, ann_json in zip(video_ids, video_names, ann_jsons):
video_ann = sly.VideoAnnotation.from_json(ann_json, meta, key_id_map)
if video_ann.is_empty():
not_labeled_items_cnt += 1
continue
video_file_path = None
labeled_items_cnt += 1
if DOWNLOAD_ITEMS:
try:
video_file_path = dataset_fs.generate_item_path(video_name)
api.video.download_path(video_id, video_file_path)
except Exception as e:
sly.logger.warn(
f"Can not download video {video_name}: {repr(e)}. Skip video."
)
continue
dataset_fs.add_item_file(
video_name, video_file_path, ann=video_ann, _validate_item=False
sly.logger.warning(
f"An error occured while downloading videos. Error: {repr(e)}"
)

ds_progress.iters_done_report(len(batch))
sly.logger.info(
"In dataset {} {} items labeled, {} items not labeled".format(
dataset_info.name, labeled_items_cnt, not_labeled_items_cnt
)
)
if len(videos) == not_labeled_items_cnt:
sly.logger.warn(
if total_items_cnt == not_labeled_items_cnt:
sly.logger.warning(
"There are no labeled items in dataset {}".format(dataset_info.name)
)
else:
sly.logger.info(
f"Dataset {dataset_info.name} has {total_items_cnt-not_labeled_items_cnt}/{total_items_cnt} items labeled"
)

project_fs.set_key_id_map(key_id_map)

Expand All @@ -238,57 +259,101 @@ def export_only_labeled_items(api: sly.Api):
for dataset_info in api.dataset.get_list(project_id):
dataset_fs = project_fs.create_dataset(dataset_info.name)
pointclouds = api.pointcloud.get_list(dataset_info.id)
labeled_items_cnt = 0
not_labeled_items_cnt = 0
ds_progress = Progress(
"Downloading dataset: {!r}".format(dataset_info.name),
total_cnt=len(pointclouds),
)
for batch in sly.batched(pointclouds, batch_size=1):
pointcloud_ids = [pointcloud_info.id for pointcloud_info in batch]
pointcloud_names = [pointcloud_info.name for pointcloud_info in batch]
total_items_cnt = len(pointclouds)

ann_jsons = api.pointcloud.annotation.download_bulk(dataset_info.id, pointcloud_ids)
pointcloud_ids = [pointcloud_info.id for pointcloud_info in pointclouds]
pointcloud_names = [pointcloud_info.name for pointcloud_info in pointclouds]
pcd_file_paths = [dataset_fs.generate_item_path(name) for name in pointcloud_names]

for pointcloud_id, pointcloud_name, ann_json in zip(
pointcloud_ids, pointcloud_names, ann_jsons
):
pc_ann = sly.PointcloudAnnotation.from_json(ann_json, meta, key_id_map)
if pc_ann.is_empty():
not_labeled_items_cnt += 1
continue
pointcloud_file_path = dataset_fs.generate_item_path(pointcloud_name)
labeled_items_cnt += 1
if DOWNLOAD_ITEMS:
api.pointcloud.download_path(pointcloud_id, pointcloud_file_path)
related_images_path = dataset_fs.get_related_images_path(pointcloud_name)
related_images = api.pointcloud.get_list_related_images(pointcloud_id)
for rimage_info in related_images:
name = rimage_info[ApiField.NAME]
rimage_id = rimage_info[ApiField.ID]
path_img = os.path.join(related_images_path, name)
path_json = os.path.join(related_images_path, name + ".json")
api.pointcloud.download_related_image(rimage_id, path_img)
dump_json_file(rimage_info, path_json)

dataset_fs.add_item_file(
pointcloud_name,
pointcloud_file_path,
ann=pc_ann,
_validate_item=False,
anns_json = []
try:
ann_progress = Progress(
"Downloading annotations...", total_items_cnt, min_report_percent=10
)
with Timer("Annotation downloading", total_items_cnt):
coro = api.pointcloud.annotation.download_bulk_async( # not implemented yet
dataset_id, ids, progress_cb=ann_progress.iters_done_report
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
anns_json.extend(future.result())
else:
anns_json.extend(loop.run_until_complete(coro))
except Exception as e:
sly.logger.warn(
f"Can not download {total_items_cnt} annotations from dataset {dataset_info.name}: {repr(e)}. Skip batch."
)
continue

if DOWNLOAD_ITEMS:
try:
with Timer("Pointcloud downloading", total_items_cnt):
coro = api.pointcloud.download_paths_async(
pointcloud_ids,
pcd_file_paths,
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
else:
loop.run_until_complete(coro)
except Exception as e:
sly.logger.warning(
f"An error occured while downloading PCD items from dataset: {dataset_info.name}. Error: {repr(e)}"
)
continue

ds_progress.iters_done_report(len(batch))
sly.logger.info(
"In dataset {} {} items labeled, {} items not labeled".format(
dataset_info.name, labeled_items_cnt, not_labeled_items_cnt
rimage_paths = []
rimage_ids = []
for pcd_id, pcd_name in zip(pointcloud_ids, pointcloud_names):
rimage_path = dataset_fs.get_related_images_path(pcd_name)
rimage_info = api.pointcloud.get_list_related_images(pcd_id)
name = rimage_info[ApiField.NAME]
rimage_ids.append(rimage_info[ApiField.ID])
rimage_paths.append(os.path.join(rimage_path, name))
path_json = os.path.join(rimage_path, name + ".json")
dump_json_file(rimage_info, path_json)
try:
with Timer("Related image downloading", len(rimage_ids)):
coro = api.pointcloud.download_related_images_async(
rimage_ids, rimage_paths
)
loop = sly.utils.get_or_create_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
else:
loop.run_until_complete(coro)
except Exception as e:
sly.logger.warning(
f"An error occured while downloading PCD related images. Error: {repr(e)}"
)
continue

for pcd_path, pointcloud_name, ann_json in zip(
pcd_file_paths, pointcloud_names, anns_json
):
pc_ann = sly.PointcloudAnnotation.from_json(ann_json, meta, key_id_map)
if pc_ann.is_empty():
not_labeled_items_cnt += 1
continue
dataset_fs.add_item_file(
pointcloud_name,
pcd_path,
ann=pc_ann,
_validate_item=False,
)
)
if len(pointclouds) == not_labeled_items_cnt:
sly.logger.warn(
if total_items_cnt == not_labeled_items_cnt:
sly.logger.warning(
"There are no labeled items in dataset {}".format(dataset_info.name)
)

else:
sly.logger.info(
f"Dataset {dataset_info.name} has {total_items_cnt-not_labeled_items_cnt}/{total_items_cnt} items labeled"
)
project_fs.set_key_id_map(key_id_map)

dir_size = sly.fs.get_directory_size(RESULT_PROJECT_DIR)
Expand Down

0 comments on commit 5cda884

Please sign in to comment.