Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 37: Kill Prefect/Globus prune tasks when encountering "PERMISSION_DENIED" error. #39

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,11 @@ def process_new_832_ALCF_flow(folder_name: str,
# Step 4: Schedule deletion of files from ALCF, NERSC, and data832
logger.info("Scheduling deletion of files from ALCF, NERSC, and data832")
nersc_transfer_success = False
# alcf_transfer_success = True
# alcf_reconstruction_success = True
# alcf_tiff_to_zarr_success = True
# data832_tiff_transfer_success = True
# data832_zarr_transfer_success = True

schedule_pruning(
alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
Expand Down
17 changes: 9 additions & 8 deletions orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from prefect import flow, get_run_logger
from prefect.blocks.system import JSON
from typing import Union

from orchestration.flows.bl832.config import Config832
from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
Expand All @@ -12,7 +13,7 @@
def prune_files(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint = None,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None
):
"""
Expand Down Expand Up @@ -47,7 +48,7 @@ def prune_files(
def prune_spot832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -62,7 +63,7 @@ def prune_spot832(
def prune_data832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -76,7 +77,7 @@ def prune_data832(
def prune_data832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -90,7 +91,7 @@ def prune_data832_raw(
def prune_data832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -104,7 +105,7 @@ def prune_data832_scratch(
def prune_alcf832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -118,7 +119,7 @@ def prune_alcf832_raw(
def prune_alcf832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -132,7 +133,7 @@ def prune_alcf832_scratch(
def prune_nersc832_alsdev_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand Down
11 changes: 8 additions & 3 deletions orchestration/globus/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from pathlib import Path
from time import time
from typing import Dict, List
from typing import Dict, List, Union
from dotenv import load_dotenv
from globus_sdk import (
ClientCredentialsAuthorizer,
Expand Down Expand Up @@ -243,7 +243,12 @@ def task_wait(

if task["nice_status"] in ["FILE_NOT_FOUND"]:
transfer_client.cancel_task(task_id)
raise TransferError("Received FILE_NOT_FOUND, cancelling task")
raise TransferError(f"Received FILE_NOT_FOUND, cancelling Globus task {task_id}")

if task["nice_status"] in ["PERMISSION_DENIED"]:
transfer_client.cancel_task(task_id)
raise TransferError(f"Received PERMISSION_DENIED, cancelling Globus task {task_id}")

return True


Expand All @@ -252,7 +257,7 @@ def prune_one_safe(
if_older_than_days: int,
tranfer_client: TransferClient,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None],
max_wait_seconds: int = 120,
logger=logger,
):
Expand Down
6 changes: 5 additions & 1 deletion scripts/polaris/tiff_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def main():
last_part = os.path.basename(os.path.normpath(tiff_dir))
zarr_dir = os.path.abspath(os.path.join(tiff_dir, '..', last_part + '.zarr'))
if not os.path.exists(zarr_dir):
os.makedirs(zarr_dir)
os.makedirs(zarr_dir, mode=0o2775, exist_ok=True)

print('Output directory: ' + zarr_dir)

Expand All @@ -80,6 +80,10 @@ def main():
# Set permissions for the output directory and its contents
set_permissions_recursive(zarr_dir)

# Extract and set permissions for the parent directory (folder_name)
parent_dir = os.path.abspath(os.path.join(tiff_dir, '../')) # Extract parent directory
set_permissions_recursive(parent_dir) # Set permissions for parent directory


if __name__ == "__main__":
main()
Loading