From 3e0d062b6d8b4ce091a080996fa15003c4485f1a Mon Sep 17 00:00:00 2001 From: Michael Eliachevitch Date: Tue, 14 Sep 2021 17:14:18 +0200 Subject: [PATCH 1/3] Allow for gbasf2 projects with multiple output sub directories Before that, the assumption was that all outputs are saved in a final sub00 directory. But from gbasf2 release v5r1p3 to be released on 2021-09-16, jobs with many outputs (> 1000) can have outputs in additional subxy directories. This means that when downloading, we will also get multiple sub directories in the download. However, currently, the b2luigi user expects that all downloaded file names will be in a common directory, so after a completed download, we move the contents of all sub* directories into the final output directory. We can then still cross-check that the download was complete by replace sub with a wildcard when doing the remote gb2_ds_list command See https://github.com/nils-braun/b2luigi/issues/80 --- b2luigi/batch/processes/gbasf2.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/b2luigi/batch/processes/gbasf2.py b/b2luigi/batch/processes/gbasf2.py index 2c7a234c..0d01843c 100644 --- a/b2luigi/batch/processes/gbasf2.py +++ b/b2luigi/batch/processes/gbasf2.py @@ -11,6 +11,7 @@ from collections import Counter from datetime import datetime from functools import lru_cache +from glob import glob from itertools import groupby from typing import Iterable, Set @@ -573,7 +574,7 @@ def _get_gbasf2_dataset_query(self, output_file_name: str) -> str: "but gbasf2 batch only supports root outputs" ) dataset_query_string = \ - f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/sub00/{output_file_stem}_*{output_file_ext}" + f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/*/{output_file_stem}_*{output_file_ext}" return dataset_query_string def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir: bool = False) -> bool: @@ -593,13 +594,16 @@ def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir: task_output_dict = flatten_to_dict(self.task.output()) output_target = task_output_dict[output_file_name] output_dir_path = output_target.path - if check_temp_dir: - output_dir_path = os.path.join(output_dir_path + ".partial", self.gbasf2_project_name, 'sub00') if not os.path.isdir(output_dir_path): raise FileNotFoundError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), output_dir_path) - downloaded_dataset_basenames = os.listdir(output_dir_path) + if check_temp_dir: + # files in the temporary download directory + downloaded_dataset_basenames = glob(os.path.join(output_dir_path, ".partial", self.gbasf2_project_name, "sub*")) + else: + # file in the final output directory + downloaded_dataset_basenames = os.listdir(output_dir_path) - # not get the remote set of grid file names for the gbasf2 project output matching output_file_name + # get the remote set of grid file names for the gbasf2 project output matching output_file_name ds_query_string = self._get_gbasf2_dataset_query(output_file_name) ds_list_command = shlex.split(f"gb2_ds_list {ds_query_string}") output_dataset_grid_filepaths = run_with_gbasf2(ds_list_command, capture_output=True).stdout.splitlines() @@ -686,7 +690,7 @@ def _download_dataset(self): if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory raise # re-raise exception if a different error occurred - tmp_output_dir = os.path.join(tmp_output_dir_path, self.gbasf2_project_name, 'sub00') + tmp_output_dir = os.path.join(tmp_output_dir_path, self.gbasf2_project_name) if not self._local_gb2_dataset_is_complete(output_file_name, check_temp_dir=True): raise RuntimeError( f"Download incomplete. The downloaded set of files in {tmp_output_dir} is not equal to the " + @@ -697,7 +701,12 @@ def _download_dataset(self): f"Moving output files to directory: {output_dir_path}") if os.path.exists(output_dir_path): shutil.rmtree(output_dir_path) - shutil.move(src=tmp_output_dir, dst=output_dir_path) + # sub00 and other sub directories in case of other datasets + sub_directories = glob(os.path.join(tmp_output_dir, "sub*")) + for sub_dir in sub_directories: + if not os.path.isdir(sub_dir): + raise NotADirectoryError(f"{sub_dir} is not a directory.") + shutil.move(src=sub_dir, dst=output_dir_path) shutil.rmtree(tmp_output_dir_path) def _download_logs(self): From 614e9186cab1d20717c0f17708d3829ea8594643 Mon Sep 17 00:00:00 2001 From: Michael Eliachevitch Date: Wed, 15 Sep 2021 12:03:24 +0200 Subject: [PATCH 2/3] Fix tmp dir path in check if downloaded dataset is complete --- b2luigi/batch/processes/gbasf2.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/b2luigi/batch/processes/gbasf2.py b/b2luigi/batch/processes/gbasf2.py index 0d01843c..0f551a8a 100644 --- a/b2luigi/batch/processes/gbasf2.py +++ b/b2luigi/batch/processes/gbasf2.py @@ -594,20 +594,21 @@ def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir: task_output_dict = flatten_to_dict(self.task.output()) output_target = task_output_dict[output_file_name] output_dir_path = output_target.path - if not os.path.isdir(output_dir_path): - raise FileNotFoundError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), output_dir_path) if check_temp_dir: # files in the temporary download directory - downloaded_dataset_basenames = glob(os.path.join(output_dir_path, ".partial", self.gbasf2_project_name, "sub*")) + glob_expression = os.path.join(f"{output_dir_path}.partial", self.gbasf2_project_name, "sub*", "*.root") + downloaded_dataset_basenames = [os.path.basename(fpath) for fpath in glob(glob_expression)] else: # file in the final output directory downloaded_dataset_basenames = os.listdir(output_dir_path) + if not downloaded_dataset_basenames: + return False # get the remote set of grid file names for the gbasf2 project output matching output_file_name ds_query_string = self._get_gbasf2_dataset_query(output_file_name) ds_list_command = shlex.split(f"gb2_ds_list {ds_query_string}") output_dataset_grid_filepaths = run_with_gbasf2(ds_list_command, capture_output=True).stdout.splitlines() - output_dataset_basenames = [os.path.basename(grid_path) for grid_path in output_dataset_grid_filepaths] + output_dataset_basenames = [os.path.basename(lfn) for lfn in output_dataset_grid_filepaths] # remove duplicate LFNs that gb2_ds_list returns for outputs from rescheduled jobs output_dataset_basenames = get_unique_lfns(output_dataset_basenames) # check if local and remote datasets are equal From 486f357ee7939140a1e9418974a5306c0023b131 Mon Sep 17 00:00:00 2001 From: Michael Eliachevitch Date: Wed, 15 Sep 2021 12:29:31 +0200 Subject: [PATCH 3/3] Make wildcard for sub in dataset query more specific Instead of "/*/" for matching different sub directories, use "/sub*/", to prevent other subdirectories from being matched (even if that should never happen, better safe than sorry) --- b2luigi/batch/processes/gbasf2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/b2luigi/batch/processes/gbasf2.py b/b2luigi/batch/processes/gbasf2.py index c1778468..b5268c87 100644 --- a/b2luigi/batch/processes/gbasf2.py +++ b/b2luigi/batch/processes/gbasf2.py @@ -574,7 +574,7 @@ def _get_gbasf2_dataset_query(self, output_file_name: str) -> str: "but gbasf2 batch only supports root outputs" ) dataset_query_string = \ - f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/*/{output_file_stem}_*{output_file_ext}" + f"/belle/user/{self.dirac_user}/{self.gbasf2_project_name}/sub*/{output_file_stem}_*{output_file_ext}" return dataset_query_string def _local_gb2_dataset_is_complete(self, output_file_name: str, check_temp_dir: bool = False) -> bool: