Skip to content

Commit

Permalink
per #1687, modified runtime freq logic to get relevant files for each…
Browse files Browse the repository at this point in the history
… time iteration instead of gathering all files and subsetting them for each time. Modified logic in SeriesAnalysis wrapper to handle writing of file list files using RuntimeFreq logic to prevent 2 different approaches for handling file lists. SeriesAnalysis file list files will still be written to the appropriate directory but the file name will no longer include the range of forecasts used. This is OK because the other output files contain this info and it is not necessary for the file lists since they are found in the directory for each forecast time range
  • Loading branch information
georgemccabe committed Sep 27, 2022
1 parent 8c29a40 commit 2422b6c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 97 deletions.
99 changes: 69 additions & 30 deletions metplus/wrappers/runtime_freq_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ def run_all_times_custom(self, custom):
@returns True on success, False on failure
"""
# get a list of all input files that are available
if not self.get_all_files(custom):
self.log_error("A problem occurred trying to obtain input files")
return None

runtime_freq = self.c_dict['RUNTIME_FREQ']
if runtime_freq == 'RUN_ONCE':
self.run_once(custom)
Expand All @@ -159,6 +154,10 @@ def run_once(self, custom):
time_input['valid'] = '*'
time_input['lead'] = '*'

if not self.get_all_files(custom):
self.log_error("A problem occurred trying to obtain input files")
return None

return self.run_at_time_once(time_input)

def run_once_per_init_or_valid(self, custom):
Expand All @@ -182,6 +181,8 @@ def run_once_per_init_or_valid(self, custom):

time_input['lead'] = '*'

self.c_dict['ALL_FILES'] = self.get_all_files_from_leads(time_input)

self.clear()
if not self.run_at_time_once(time_input):
success = False
Expand Down Expand Up @@ -209,6 +210,8 @@ def run_once_per_lead(self, custom):
time_input['init'] = '*'
time_input['valid'] = '*'

self.c_dict['ALL_FILES'] = self.get_all_files_for_lead(time_input)

self.clear()
if not self.run_at_time_once(time_input):
success = False
Expand Down Expand Up @@ -272,34 +275,67 @@ def get_all_files(self, custom=None):
instance=self.instance,
custom=custom)

# loop over all forecast leads
wildcard_if_empty = self.c_dict.get('WILDCARD_LEAD_IF_EMPTY',
False)
lead_seq = get_lead_sequence(self.config,
time_input,
wildcard_if_empty=wildcard_if_empty)
for lead in lead_seq:
time_input['lead'] = lead

# set current lead time config and environment variables
time_info = time_util.ti_calculate(time_input)

if skip_time(time_info, self.c_dict.get('SKIP_TIMES')):
continue

file_dict = self.get_files_from_time(time_info)
if file_dict:
if isinstance(file_dict, list):
all_files.extend(file_dict)
else:
all_files.append(file_dict)
lead_files = self.get_all_files_from_leads(time_input)
all_files.extend(lead_files)

if not all_files:
return False

self.c_dict['ALL_FILES'] = all_files
return True

def get_all_files_from_leads(self, time_input):
lead_files = []
# loop over all forecast leads
wildcard_if_empty = self.c_dict.get('WILDCARD_LEAD_IF_EMPTY',
False)
lead_seq = get_lead_sequence(self.config,
time_input,
wildcard_if_empty=wildcard_if_empty)
for lead in lead_seq:
current_time_input = time_input.copy()
current_time_input['lead'] = lead

# set current lead time config and environment variables
time_info = time_util.ti_calculate(current_time_input)

if skip_time(time_info, self.c_dict.get('SKIP_TIMES')):
continue

file_dict = self.get_files_from_time(time_info)
if file_dict:
if isinstance(file_dict, list):
lead_files.extend(file_dict)
else:
lead_files.append(file_dict)

return lead_files

def get_all_files_for_lead(self, time_input):
new_files = []
for run_time in time_generator(self.config):
if run_time is None:
continue

current_time_input = time_input.copy()
if 'valid' in run_time:
current_time_input['valid'] = run_time['valid']
del current_time_input['init']
elif 'init' in run_time:
current_time_input['init'] = run_time['init']
del current_time_input['valid']
time_info = time_util.ti_calculate(current_time_input)
if skip_time(time_info, self.c_dict.get('SKIP_TIMES')):
continue
file_dict = self.get_files_from_time(time_info)
if file_dict:
if isinstance(file_dict, list):
new_files.extend(file_dict)
else:
new_files.append(file_dict)

return new_files

def get_files_from_time(self, time_info):
"""! Create dictionary containing time information (key time_info) and
any relevant files for that runtime.
Expand All @@ -322,12 +358,13 @@ def compare_time_info(self, runtime, filetime):
@returns True if file's info matches the requirements for current
runtime or False if not.
"""
# False if init/valid is not wildcard and the file time doesn't match
for time_val in ['init', 'valid']:
if (runtime[time_val] != '*' and
filetime[time_val] != runtime[time_val]):
return False

if runtime['lead'] == '*':
if runtime.get('lead', '*') == '*':
return True

# convert each value to seconds to compare
Expand Down Expand Up @@ -377,7 +414,7 @@ def find_input_files(self, time_info, fill_missing=False):

return all_input_files

def subset_input_files(self, time_info):
def subset_input_files(self, time_info, output_dir=None):
"""! Obtain a subset of input files from the c_dict ALL_FILES based on
the time information for the current run.
Expand Down Expand Up @@ -414,7 +451,9 @@ def subset_input_files(self, time_info):
list_file_dict = {}
for identifier, input_files in all_input_files.items():
list_file_name = self.get_list_file_name(time_info, identifier)
list_file_path = self.write_list_file(list_file_name, input_files)
list_file_path = self.write_list_file(list_file_name,
input_files,
output_dir=output_dir)
list_file_dict[identifier] = list_file_path

return list_file_dict
Expand All @@ -439,7 +478,7 @@ def get_list_file_name(self, time_info, identifier):
else:
valid = time_info['valid'].strftime('%Y%m%d%H%M%S')

if time_info['lead'] == '*':
if time_info.get('lead', '*') == '*':
lead = 'ALL'
else:
lead = time_util.ti_get_seconds_from_lead(time_info['lead'],
Expand Down
115 changes: 48 additions & 67 deletions metplus/wrappers/series_analysis_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ..util import do_string_sub, parse_template, get_tags
from ..util import get_lead_sequence, get_lead_sequence_groups
from ..util import ti_get_hours_from_lead, ti_get_seconds_from_lead
from ..util import ti_get_lead_string
from ..util import ti_get_lead_string, ti_calculate
from ..util import parse_var_list
from ..util import add_to_time_input
from ..util import field_read_prob_info
Expand Down Expand Up @@ -460,16 +460,29 @@ def run_once_per_lead(self, custom):

input_dict['init'] = '*'
input_dict['valid'] = '*'
lead_hours = [ti_get_lead_string(item, plural=False) for
item in lead_group[1]]
lead_hours_str = [ti_get_lead_string(item, plural=False) for
item in lead_group[1]]

self.logger.debug(f"Processing {lead_group[0]} - forecast leads: "
f"{', '.join(lead_hours)}")
f"{', '.join(lead_hours_str)}")

lead_hours = [ti_get_hours_from_lead(item) for item in lead_group[1]]
self.c_dict['ALL_FILES'] = self.get_all_files_for_leads(input_dict,
lead_hours)
if not self.run_at_time_once(input_dict, lead_group):
success = False

return success

def get_all_files_for_leads(self, input_dict, lead_hours):
all_files = []
current_input_dict = input_dict.copy()
for lead_hour in lead_hours:
current_input_dict['lead_hours'] = lead_hour
new_files = self.get_all_files_for_lead(current_input_dict)
all_files.extend(new_files)
return all_files

def run_at_time_once(self, time_info, lead_group=None):
"""! Attempt to build series_analysis command for run time
Expand Down Expand Up @@ -577,12 +590,23 @@ def get_files_from_time(self, time_info):
if fcst_files is None or obs_files is None:
return None

file_dict['fcst'] = fcst_files
file_dict['obs'] = obs_files
fcst_key, obs_key = self._get_fcst_obs_keys(storm_id)

file_dict[fcst_key] = fcst_files
file_dict[obs_key] = obs_files
file_dict_list.append(file_dict)

return file_dict_list

@staticmethod
def _get_fcst_obs_keys(storm_id):
fcst_key = 'fcst'
obs_key = 'obs'
if storm_id != '*':
fcst_key = f'{fcst_key}_{storm_id}'
obs_key = f'{obs_key}_{storm_id}'
return fcst_key, obs_key

def find_input_files(self, time_info, data_type):
"""! Loop over list of input templates and find files for each
Expand All @@ -596,27 +620,6 @@ def find_input_files(self, time_info, data_type):
mandatory=False)
return input_files

def subset_input_files(self, time_info):
"""! Obtain a subset of input files from the c_dict ALL_FILES based on
the time information for the current run.
@param time_info dictionary containing time information
@returns the path to a ascii file containing the list of files
or None if could not find any files
"""
fcst_files = []
obs_files = []
for file_dict in self.c_dict['ALL_FILES']:
# compare time information for each input file
# add file to list of files to use if it matches
if not self.compare_time_info(time_info, file_dict['time_info']):
continue

fcst_files.extend(file_dict['fcst'])
obs_files.extend(file_dict['obs'])

return fcst_files, obs_files

def compare_time_info(self, runtime, filetime):
"""! Call parents implementation then if the current run time and file
time may potentially still not match, use storm_id to check
Expand Down Expand Up @@ -696,48 +699,20 @@ def _get_fcst_and_obs_path(self, time_info, storm_id, lead_group):

return fcst_path, obs_path

all_fcst_files = []
all_obs_files = []
lead_loop = leads if leads else [None]
for lead in lead_loop:
if lead is not None:
time_info['lead'] = lead

fcst_files, obs_files = self.subset_input_files(time_info)
if fcst_files and obs_files:
all_fcst_files.extend(fcst_files)
all_obs_files.extend(obs_files)

# skip if no files were found
if not all_fcst_files or not all_obs_files:
return None, None

output_dir = self.get_output_dir(time_info, storm_id, label)

# create forecast (or both) file list
if self.c_dict['USING_BOTH']:
data_type = 'BOTH'
else:
data_type = 'FCST'
list_file_dict = self.subset_input_files(time_info, output_dir)
if not list_file_dict:
return None, None

fcst_ascii_filename = self._get_ascii_filename(data_type,
storm_id,
leads)
fcst_path = self.write_list_file(fcst_ascii_filename,
all_fcst_files,
output_dir=output_dir)
# add storm_id and label to time_info for output filename
self._add_storm_id_and_label(time_info, storm_id, label)

fcst_key, obs_key = self._get_fcst_obs_keys(storm_id)
fcst_path = list_file_dict[fcst_key]
if self.c_dict['USING_BOTH']:
return fcst_path, fcst_path

# create analysis file list
obs_ascii_filename = self._get_ascii_filename('OBS',
storm_id,
leads)
obs_path = self.write_list_file(obs_ascii_filename,
all_obs_files,
output_dir=output_dir)

obs_path = list_file_dict[obs_key]
return fcst_path, obs_path

def _check_python_embedding(self):
Expand Down Expand Up @@ -818,17 +793,23 @@ def get_output_dir(self, time_info, storm_id, label):
output_dir_template = os.path.join(self.c_dict['OUTPUT_DIR'],
self.c_dict['OUTPUT_TEMPLATE'])
output_dir_template = os.path.dirname(output_dir_template)

# get output directory including storm ID and label
current_time_info = time_info.copy()
self._add_storm_id_and_label(current_time_info, storm_id, label)
output_dir = do_string_sub(output_dir_template,
**current_time_info)
return output_dir

@staticmethod
def _add_storm_id_and_label(time_info, storm_id, label):
if storm_id == '*':
storm_id_out = 'all_storms'
else:
storm_id_out = storm_id

# get output directory including storm ID and label
time_info['storm_id'] = storm_id_out
time_info['label'] = label
output_dir = do_string_sub(output_dir_template,
**time_info)
return output_dir

def build_and_run_series_request(self, time_info, fcst_path, obs_path):
"""! Build up the -obs, -fcst, -out necessary for running the
Expand Down

0 comments on commit 2422b6c

Please sign in to comment.