Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for gbasf2 projects with multiple output sub<xy> directories #122

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions b2luigi/batch/processes/gbasf2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from collections import Counter
from datetime import datetime
from functools import lru_cache
from glob import glob
from itertools import groupby
from typing import Iterable, Set

Expand Down Expand Up @@ -573,7 +574,7 @@ def _get_gbasf2_dataset_query(self, output_file_name: str) -> str:
"but gbasf2 batch only supports root outputs"
)
dataset_query_string = \
f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/sub00/{output_file_stem}_*{output_file_ext}"
f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/sub*/{output_file_stem}_*{output_file_ext}"
return dataset_query_string

def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir: bool = False) -> bool:
Expand All @@ -594,12 +595,16 @@ def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir:
output_target = task_output_dict[output_file_name]
output_dir_path = output_target.path
if check_temp_dir:
output_dir_path = os.path.join(output_dir_path + ".partial", self.gbasf2_project_name, 'sub00')
if not os.path.isdir(output_dir_path):
raise FileNotFoundError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), output_dir_path)
downloaded_dataset_basenames = os.listdir(output_dir_path)
# files in the temporary download directory
glob_expression = os.path.join(f"{output_dir_path}.partial", self.gbasf2_project_name, "sub*", "*.root")
downloaded_dataset_basenames = [os.path.basename(fpath) for fpath in glob(glob_expression)]
else:
# file in the final output directory
downloaded_dataset_basenames = os.listdir(output_dir_path)
if not downloaded_dataset_basenames:
return False

# not get the remote set of grid file names for the gbasf2 project output matching output_file_name
# get the remote set of grid file names for the gbasf2 project output matching output_file_name
ds_query_string = self._get_gbasf2_dataset_query(output_file_name)
output_dataset_grid_filepaths = query_lpns(ds_query_string)
output_dataset_basenames = [os.path.basename(grid_path) for grid_path in output_dataset_grid_filepaths]
Expand Down Expand Up @@ -685,7 +690,7 @@ def _download_dataset(self):
if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
raise # re-raise exception if a different error occurred

tmp_output_dir = os.path.join(tmp_output_dir_path, self.gbasf2_project_name, 'sub00')
tmp_output_dir = os.path.join(tmp_output_dir_path, self.gbasf2_project_name)
if not self._local_gb2_dataset_is_complete(output_file_name, check_temp_dir=True):
raise RuntimeError(
f"Download incomplete. The downloaded set of files in {tmp_output_dir} is not equal to the " +
Expand All @@ -696,7 +701,12 @@ def _download_dataset(self):
f"Moving output files to directory: {output_dir_path}")
if os.path.exists(output_dir_path):
shutil.rmtree(output_dir_path)
shutil.move(src=tmp_output_dir, dst=output_dir_path)
# sub00 and other sub<xy> directories in case of other datasets
sub_directories = glob(os.path.join(tmp_output_dir, "sub*"))
for sub_dir in sub_directories:
if not os.path.isdir(sub_dir):
raise NotADirectoryError(f"{sub_dir} is not a directory.")
shutil.move(src=sub_dir, dst=output_dir_path)
shutil.rmtree(tmp_output_dir_path)

def _download_logs(self):
Expand Down Expand Up @@ -768,7 +778,7 @@ def exists(self):

def check_dataset_exists_on_grid(gbasf2_project_name, dirac_user=None):
"""
Use ``gb2_ds_list`` command to see if an output dataset exists for the gbasf2 project
Check if an output dataset exists for the gbasf2 project
"""
lpns = query_lpns(gbasf2_project_name, dirac_user=dirac_user)
return len(lpns) > 0
Expand Down