From d5b72f8d5682915ae365dd290b2b43ed5f7215b6 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Wed, 26 Oct 2022 14:13:57 -0700 Subject: [PATCH 01/18] [WIP] add config file --- oneflux/config_file.yaml | 32 +++++++++++ runoneflux.py | 116 +++++++++++++++++++++++++-------------- 2 files changed, 108 insertions(+), 40 deletions(-) create mode 100644 oneflux/config_file.yaml diff --git a/oneflux/config_file.yaml b/oneflux/config_file.yaml new file mode 100644 index 00000000..7f936acc --- /dev/null +++ b/oneflux/config_file.yaml @@ -0,0 +1,32 @@ +run: + command: all + data_dir: /home/toanngo/Documents/GitHub/ameriflux/ONEFlux/data + site_id: US-ARc + site_dir: US-ARc_sample_input + first_year: 2005 + last_year: 2006 + prod: + perc: + log_file: fluxnet_pipeline_US-ARc.log + force_py: true + mcr_dir: /home/toanngo/Documents/GitHub/MATLAB_Runtime/v94 + timestamp: + record_interval: hh + version_processing: + version_data: + era_first_year: + era_last_year: + steps: + qc_auto_execute: true + ustar_mp_execute: true + user_cp_execute: true + meteo_proc_execute: true + nee_proc_execute: true + energy_proc_execute: true + nee_partition_nt_execute: true + nee_partition_dt_execute: true + prepare_ure_execute: true + ure_execute: true + fluxnet2015_execute: true + fluxnet2015_site_plots: true + simulation: false diff --git a/runoneflux.py b/runoneflux.py index 5fcbc03a..bda0f393 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -15,12 +15,14 @@ import logging import argparse import traceback +import yaml from oneflux import ONEFluxError, log_config, log_trace, VERSION_PROCESSING, VERSION_METADATA from oneflux.tools.partition_nt import run_partition_nt, PROD_TO_COMPARE, PERC_TO_COMPARE from oneflux.tools.partition_dt import run_partition_dt from oneflux.tools.pipeline import run_pipeline, NOW_TS from oneflux.pipeline.common import ERA_FIRST_YEAR, ERA_LAST_YEAR +import oneflux log = logging.getLogger(__name__) @@ -32,12 +34,12 @@ # cli arguments parser = argparse.ArgumentParser() - parser.add_argument('command', metavar="COMMAND", help="ONEFlux command to be run", type=str, choices=COMMAND_LIST) - parser.add_argument('datadir', metavar="DATA-DIR", help="Absolute path to general data directory", type=str) - parser.add_argument('siteid', metavar="SITE-ID", help="Site Flux ID in the form CC-XXX", type=str) - parser.add_argument('sitedir', metavar="SITE-DIR", help="Relative path to site data directory (within data-dir)", type=str) - parser.add_argument('firstyear', metavar="FIRST-YEAR", help="First year of data to be processed", type=int) - parser.add_argument('lastyear', metavar="LAST-YEAR", help="Last year of data to be processed", type=int) + parser.add_argument('--command', metavar="COMMAND", help="ONEFlux command to be run", type=str, choices=COMMAND_LIST, default=None) + parser.add_argument('--datadir', metavar="DATA-DIR", help="Absolute path to general data directory", type=str, default=None ) + parser.add_argument('--siteid', metavar="SITE-ID", help="Site Flux ID in the form CC-XXX", type=str, default=None) + parser.add_argument('--sitedir', metavar="SITE-DIR", help="Relative path to site data directory (within data-dir)", type=str, default=None) + parser.add_argument('--firstyear', metavar="FIRST-YEAR", help="First year of data to be processed", type=int, default=None) + parser.add_argument('--lastyear', metavar="LAST-YEAR", help="Last year of data to be processed", type=int, default=None) parser.add_argument('--perc', metavar="PERC", help="List of percentiles to be processed", dest='perc', type=str, choices=PERC_TO_COMPARE, action='append', nargs='*') parser.add_argument('--prod', metavar="PROD", help="List of products to be processed", dest='prod', type=str, choices=PROD_TO_COMPARE, action='append', nargs='*') parser.add_argument('-l', '--logfile', help="Logging file path", type=str, dest='logfile', default=DEFAULT_LOGGING_FILENAME) @@ -47,60 +49,94 @@ parser.add_argument('--recint', help="Record interval for site", type=str, choices=['hh', 'hr'], dest='recint', default='hh') parser.add_argument('--versionp', help="Version of processing (hardcoded default)", type=str, dest='versionp', default=str(VERSION_PROCESSING)) parser.add_argument('--versiond', help="Version of data (hardcoded default)", type=str, dest='versiond', default=str(VERSION_METADATA)) - parser.add_argument('--era-fy', help="ERA first year of data (default {y})".format(y=ERA_FIRST_YEAR), type=int, dest='erafy', default=int(ERA_FIRST_YEAR)) - parser.add_argument('--era-ly', help="ERA last year of data (default {y})".format(y=ERA_LAST_YEAR), type=int, dest='eraly', default=int(ERA_LAST_YEAR)) + parser.add_argument('--era-fy', help="ERA first year of data (default {y})".format(y=ERA_FIRST_YEAR), type=int, dest='erafy', default=None) + parser.add_argument('--era-ly', help="ERA last year of data (default {y})".format(y=ERA_LAST_YEAR), type=int, dest='eraly', default=None) + parser.add_argument('--configfile', help="Path to config file", type=str, dest='configfile', default=None) args = parser.parse_args() # setup logging file and stdout log_config(level=logging.DEBUG, filename=args.logfile, std=True, std_level=logging.DEBUG) - # set defaults if no perc or prod - perc = (PERC_TO_COMPARE if args.perc is None else args.perc[0]) - prod = (PROD_TO_COMPARE if args.prod is None else args.prod[0]) + command = datadir = siteid = sitedir = firstyear = lastyear = \ + prod = perc = logfile = forcepy = versionp = versiond = erafy = eraly = None + steps = {} + + if not args.configfile: + if not args.command and not args.datadir and not args.siteid \ + and not args.sitedir and not args.firstyear and not args.lastyear: + raise Exception('Please provide path to config file or required parameters: command, datadir, siteid, sitedir.') + else: + run_config = {} + with open(args.configfile, 'r') as f: + config = yaml.safe_load(f) + run_config = config.get('run', {}) + command = args.command if args.command else run_config.get('command', 'all') + datadir = args.datadir if args.datadir else run_config.get('data_dir', None) + siteid = args.siteid if args.siteid else run_config.get('site_id', None) + sitedir = args.sitedir if args.sitedir else run_config.get('site_dir', None) + firstyear = args.firstyear if args.firstyear else run_config.get('first_year', None) + lastyear = args.lastyear if args.lastyear else run_config.get('last_year', None) + prod = args.prod if args.prod else run_config.get('prod', None) + perc = args.perc if args.perc else run_config.get('perc', None) + logfile = args.logfile if args.logfile else run_config.get('log_file', DEFAULT_LOGGING_FILENAME) + forcepy = args.forcepy if args.forcepy else run_config.get('force_py', False) + mcr_directory = args.mrc_directory if args.mcr_directory else run_config.get('mcr_directory', None) + timestamp = args.timestamp if args.timestamp else run_config.get('record_interval', 'hh') + recint = args.recint if args.recint else run_config.get('mcr_directory', NOW_TS) + versionp = args.versionp if args.versionp else run_config.get('version_processing', str(VERSION_PROCESSING)) + versiond = args.versiond if args.versiond else run_config.get('version_data', str(VERSION_METADATA)) + erafy = args.erafy if args.erafy else run_config.get('era_first_year', int(ERA_FIRST_YEAR)) + eraly = args.eraly if args.eraly else run_config.get('era_last_year', int(ERA_LAST_YEAR)) + if not erafy: + erafy = int(ERA_FIRST_YEAR) + if not eraly: + eraly = int(ERA_LAST_YEAR) + steps = run_config.get('steps', {}) - firstyear = args.firstyear - lastyear = args.lastyear + # set defaults if no perc or prod + perc = (PERC_TO_COMPARE if perc is None else perc[0]) + prod = (PROD_TO_COMPARE if prod is None else prod[0]) msg = "Using:" - msg += "command ({c})".format(c=args.command) - msg += ", data-dir ({i})".format(i=args.datadir) - msg += ", site-id ({i})".format(i=args.siteid) - msg += ", site-dir ({d})".format(d=args.sitedir) + msg += "command ({c})".format(c=command) + msg += ", data-dir ({i})".format(i=datadir) + msg += ", site-id ({i})".format(i=siteid) + msg += ", site-dir ({d})".format(d=sitedir) msg += ", first-year ({y})".format(y=firstyear) msg += ", last-year ({y})".format(y=lastyear) msg += ", perc ({i})".format(i=perc) msg += ", prod ({i})".format(i=prod) - msg += ", log-file ({f})".format(f=args.logfile) - msg += ", force-py ({i})".format(i=args.forcepy) - msg += ", versionp ({i})".format(i=args.versionp) - msg += ", versiond ({i})".format(i=args.versiond) - msg += ", era-fy ({i})".format(i=args.erafy) - msg += ", era-ly ({i})".format(i=args.eraly) + msg += ", log-file ({f})".format(f=logfile) + msg += ", force-py ({i})".format(i=forcepy) + msg += ", versionp ({i})".format(i=versionp) + msg += ", versiond ({i})".format(i=versiond) + msg += ", era-fy ({i})".format(i=erafy) + msg += ", era-ly ({i})".format(i=eraly) log.debug(msg) # start execution try: # check arguments - print os.path.join(args.datadir, args.sitedir) - if not os.path.isdir(os.path.join(args.datadir, args.sitedir)): - raise ONEFluxError("Site dir not found: {d}".format(d=args.sitedir)) + print os.path.join(datadir, sitedir) + if not os.path.isdir(os.path.join(datadir, sitedir)): + raise ONEFluxError("Site dir not found: {d}".format(d=sitedir)) # run command - log.info("Starting execution: {c}".format(c=args.command)) - if args.command == 'all': - run_pipeline(datadir=args.datadir, siteid=args.siteid, sitedir=args.sitedir, firstyear=firstyear, lastyear=lastyear, - prod_to_compare=prod, perc_to_compare=perc, mcr_directory=args.mcr_directory, timestamp=args.timestamp, - record_interval=args.recint, version_data=args.versiond, version_proc=args.versionp, - era_first_year=args.erafy, era_last_year=args.eraly) - elif args.command == 'partition_nt': - run_partition_nt(datadir=args.datadir, siteid=args.siteid, sitedir=args.sitedir, years_to_compare=range(firstyear, lastyear + 1), - py_remove_old=args.forcepy, prod_to_compare=prod, perc_to_compare=perc) - elif args.command == 'partition_dt': - run_partition_dt(datadir=args.datadir, siteid=args.siteid, sitedir=args.sitedir, years_to_compare=range(firstyear, lastyear + 1), - py_remove_old=args.forcepy, prod_to_compare=prod, perc_to_compare=perc) + log.info("Starting execution: {c}".format(c=command)) + if command == 'all': + run_pipeline(datadir=datadir, siteid=siteid, sitedir=sitedir, firstyear=firstyear, lastyear=lastyear, + prod_to_compare=prod, perc_to_compare=perc, mcr_directory=mcr_directory, timestamp=timestamp, + record_interval=recint, version_data=versiond, version_proc=versionp, + era_first_year=erafy, era_last_year=eraly, steps=steps) + elif command == 'partition_nt': + run_partition_nt(datadir=datadir, siteid=siteid, sitedir=sitedir, years_to_compare=range(firstyear, lastyear + 1), + py_remove_old=forcepy, prod_to_compare=prod, perc_to_compare=perc) + elif command == 'partition_dt': + run_partition_dt(datadir=datadir, siteid=siteid, sitedir=sitedir, years_to_compare=range(firstyear, lastyear + 1), + py_remove_old=forcepy, prod_to_compare=prod, perc_to_compare=perc) else: - raise ONEFluxError("Unknown command: {c}".format(c=args.command)) - log.info("Finished execution: {c}".format(c=args.command)) + raise ONEFluxError("Unknown command: {c}".format(c=command)) + log.info("Finished execution: {c}".format(c=command)) except Exception as e: msg = log_trace(exception=e, level=logging.CRITICAL, log=log) From d292c26d20c8115865236aba3c844b6835a383b3 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Tue, 1 Nov 2022 09:27:01 -0700 Subject: [PATCH 02/18] add PyYAML lib --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 44066824..bf655d82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy>=1.11.0,<1.16.0 scipy>=0.17.0 matplotlib>=1.5.1 statsmodels>=0.8.0 +PyYAML \ No newline at end of file From 3c2bc7de1406d6099b492e470dc3e8e19b01498d Mon Sep 17 00:00:00 2001 From: toanngosy Date: Tue, 1 Nov 2022 09:48:52 -0700 Subject: [PATCH 03/18] save run params --- runoneflux.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/runoneflux.py b/runoneflux.py index bda0f393..245ba59b 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -17,12 +17,12 @@ import traceback import yaml +from datetime import datetime from oneflux import ONEFluxError, log_config, log_trace, VERSION_PROCESSING, VERSION_METADATA from oneflux.tools.partition_nt import run_partition_nt, PROD_TO_COMPARE, PERC_TO_COMPARE from oneflux.tools.partition_dt import run_partition_dt from oneflux.tools.pipeline import run_pipeline, NOW_TS from oneflux.pipeline.common import ERA_FIRST_YEAR, ERA_LAST_YEAR -import oneflux log = logging.getLogger(__name__) @@ -81,8 +81,8 @@ logfile = args.logfile if args.logfile else run_config.get('log_file', DEFAULT_LOGGING_FILENAME) forcepy = args.forcepy if args.forcepy else run_config.get('force_py', False) mcr_directory = args.mrc_directory if args.mcr_directory else run_config.get('mcr_directory', None) - timestamp = args.timestamp if args.timestamp else run_config.get('record_interval', 'hh') - recint = args.recint if args.recint else run_config.get('mcr_directory', NOW_TS) + timestamp = args.timestamp if args.timestamp else run_config.get('timestamp', NOW_TS) + recint = args.recint if args.recint else run_config.get('mcr_directory', None) versionp = args.versionp if args.versionp else run_config.get('version_processing', str(VERSION_PROCESSING)) versiond = args.versiond if args.versiond else run_config.get('version_data', str(VERSION_METADATA)) erafy = args.erafy if args.erafy else run_config.get('era_first_year', int(ERA_FIRST_YEAR)) @@ -121,6 +121,30 @@ if not os.path.isdir(os.path.join(datadir, sitedir)): raise ONEFluxError("Site dir not found: {d}".format(d=sitedir)) + run_params = {'run': + { + 'command': command, + 'data_dir': datadir, + 'site_id': siteid, + 'first_year': firstyear, + 'last_year': lastyear, + 'prod': prod, + 'perc': perc, + 'log_file': logfile, + 'force_py': forcepy, + 'mcr_dir': mcr_directory, + 'timestamp': timestamp, + 'record_interval': recint, + 'version_processing': versionp, + 'version_data': versiond, + 'era_first_year': erafy, + 'era_last_year': eraly, + 'steps': steps + }} + + datetime_log = datetime.now().strftime('%y_%m_%d-%H_%M_%S') + with open(os.path.join(datadir, sitedir, 'run_params_{}.yaml'.format(datetime_log)), 'w') as f: + yaml.dump(run_params, f) # run command log.info("Starting execution: {c}".format(c=command)) if command == 'all': From 34797f615a4879711c52a2b4cd01693fcd0c7106 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Thu, 17 Nov 2022 09:47:45 -0800 Subject: [PATCH 04/18] add config description and template --- data/.gitkeep | 0 oneflux/configs/__init__.py | 0 oneflux/configs/config.yaml | 5 + oneflux/configs/config_description.yaml | 133 +++++++++++++ .../config_template.yaml} | 31 +-- oneflux/configs/utils.py | 182 ++++++++++++++++++ runoneflux.py | 171 +++------------- 7 files changed, 364 insertions(+), 158 deletions(-) create mode 100644 data/.gitkeep create mode 100644 oneflux/configs/__init__.py create mode 100644 oneflux/configs/config.yaml create mode 100644 oneflux/configs/config_description.yaml rename oneflux/{config_file.yaml => configs/config_template.yaml} (51%) create mode 100644 oneflux/configs/utils.py diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/oneflux/configs/__init__.py b/oneflux/configs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oneflux/configs/config.yaml b/oneflux/configs/config.yaml new file mode 100644 index 00000000..8f47b851 --- /dev/null +++ b/oneflux/configs/config.yaml @@ -0,0 +1,5 @@ +run: + siteid: US-ARc + sitedir: US-ARc_sample_input + mcr: /home/toanngo/Documents/GitHub/MATLAB_Runtime/v94 + diff --git a/oneflux/configs/config_description.yaml b/oneflux/configs/config_description.yaml new file mode 100644 index 00000000..1be37290 --- /dev/null +++ b/oneflux/configs/config_description.yaml @@ -0,0 +1,133 @@ +run: + command: + metavar: COMMAND + help: ONEFlux command to be run + type: str + choices: ['partition_nt', 'partition_dt', 'all'] + datadir: + metavar: DATA-DIR + help: Absolute path to general data directory + type: str + siteid: + metavar: SITE-ID + help: Site Flux ID in the form CC-XXX + type: str + sitedir: + metavar: SITE-DIR + help: Relative path to site data directory (within data-dir) + type: str + firstyear: + metavar: FIRST-YEAR + help: First year of data to be processed + type: int + lastyear: + metavar: LAST-YEAR + help: Last year of data to be processed + type: int + perc: + metavar: PERC + help: List of percentiles to be processed + dest: perc + type: str + choices: oneflux.tools.partition_nt:PERC_TO_COMPARE + action: append + nargs: '*' + prod: + metavar: PROD + help: List of products to be processed + dest: prod + type: str + choices: oneflux.tools.partition_nt:PROD_TO_COMPARE + action: append + nargs: '*' + logfile: + help: Logging file path + type: str + dest: logfile + force_py: + help: Force execution of PY partitioning (saves original output, generates new) + action: store_true + dest: forcepy + mcr: + help: Path to MCR directory + type: str + dest: mcr_directory + ts: + help: Timestamp to be used in processing IDs + type: str + dest: timestamp + recint: + help: Record interval for site + type: str + choices: ['hh', 'hr'] + dest: recint + versionp: + help: Version of processing (hardcoded default) + type: str + dest: versionp + versiond: + help: Version of data (hardcoded default) + type: str + dest: versiond + era_fy: + help: ERA first year of data + type: int + dest: erafy + era_ly: + help: ERA last year of data + type: int + dest: eraly + configfile: + help: Path to config file + type: str + dest: configfile + +01_qc_visual: + dummy-step-1: + help: dummy-step-1 + type: str +02_qc_auto: + dummy-step-2: + help: dummy-step-2 + type: str +04_ustar_mp: + dummy-step-4: + help: dummy-step-4 + type: str +05_ustar_cp: + dummy-step-5: + help: dummy-step-5 + type: str +06_meteo_era: + dummy-step-6: + help: dummy-step-6 + type: str +07_meteo_proc: + dummy-step-7: + help: dummy-step-7 + type: str +08_nee_proc: + dummy-step-8: + help: dummy-step-8 + type: str +09_energy_proc: + dummy-step-9: + help: dummy-step-9 + type: str +10_nee_partition_nt: + dummy-step-10: + help: dummy-step-10 + type: str +11_nee_partition_dt: + dummy-step-11: + help: dummy-step-11 + type: str +12_ure: + dummy-step-12: + help: dummy-step-12 + type: str +99_fluxnet2015: + dummy-step-99: + help: dummy-step-99 + type: str + diff --git a/oneflux/config_file.yaml b/oneflux/configs/config_template.yaml similarity index 51% rename from oneflux/config_file.yaml rename to oneflux/configs/config_template.yaml index 7f936acc..8c24bfe9 100644 --- a/oneflux/config_file.yaml +++ b/oneflux/configs/config_template.yaml @@ -1,21 +1,22 @@ run: command: all - data_dir: /home/toanngo/Documents/GitHub/ameriflux/ONEFlux/data - site_id: US-ARc - site_dir: US-ARc_sample_input - first_year: 2005 - last_year: 2006 - prod: + datadir: ./data + siteid: + sitedir: + firstyear: 2005 + lastyear: 2006 perc: - log_file: fluxnet_pipeline_US-ARc.log - force_py: true - mcr_dir: /home/toanngo/Documents/GitHub/MATLAB_Runtime/v94 - timestamp: - record_interval: hh - version_processing: - version_data: - era_first_year: - era_last_year: + prod: + logfile: oneflux.log + force_py: false + mcr: /MATLAB_Runtime/v94 + ts: oneflux.tools.pipeline:NOW_TS + recint: hh + versionp: oneflux:VERSION_PROCESSING + versiond: oneflux:VERSION_METADATA + era_fy: oneflux.pipeline.common:ERA_FIRST_YEAR + era_ly: oneflux.pipeline.common:ERA_LAST_YEAR + configfile: steps: qc_auto_execute: true ustar_mp_execute: true diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py new file mode 100644 index 00000000..1d26e201 --- /dev/null +++ b/oneflux/configs/utils.py @@ -0,0 +1,182 @@ +import argparse +import datetime as dt +import importlib +import logging +import os +import sys +import yaml + +from oneflux import ONEFluxError +from oneflux.tools.partition_nt import PROD_TO_COMPARE, PERC_TO_COMPARE +from oneflux.tools.partition_nt import run_partition_nt +from oneflux.tools.partition_dt import run_partition_dt +from oneflux.tools.pipeline import run_pipeline + + +log = logging.getLogger(__name__) + +RUN_MODE = {'all': run_pipeline, + 'partition_nt': run_partition_nt, + 'partition_dt': run_partition_dt + } +YAML_TEMPLATE_PATH = 'oneflux/configs/config_template.yaml' +YAML_DESCRIPTION_PATH ='oneflux/configs/config_description.yaml' + +class ONEFluxConfig: + def __init__(self): + self.param_dest = {} + self.args = self.argparse_from_yaml() + self.perc = (PERC_TO_COMPARE if self.args.perc is None else self.args.perc[0]) + self.prod = (PROD_TO_COMPARE if self.args.prod is None else self.args.prod[0]) + + def argparse_from_yaml(self): + with open(YAML_DESCRIPTION_PATH, 'r') as f: + config_description = yaml.safe_load(f) + with open(YAML_TEMPLATE_PATH, 'r') as f: + config_default = yaml.safe_load(f) + parser = argparse.ArgumentParser() + + self.config_default = config_default + for parser_group_name, params in config_description.items(): + group = parser.add_argument_group(parser_group_name) + default_val = config_default.get(parser_group_name, {}) + for param_name, param_details in params.items(): + if 'type' in param_details: + param_details['type'] = _type_from_str(param_details['type']) + if 'choices' in param_details: + param_details['choices'] = _parse_choices(param_details['choices']) + if 'dest' in param_details: + self.param_dest[param_name] = param_details['dest'] + if param_name in default_val: + param_details['default'] = default_val[param_name] + if isinstance(param_details['default'], str) and ':' in param_details['default']: + module, class_name = default_val[param_name].split(':') + param_details['default'] = _str_to_class(module, class_name) + param_details['help'] += ' (default to {})'.format(param_details['default']) + param_name = param_name.replace('_', '-') + group.add_argument('--{}'.format(param_name), **param_details) + args = parser.parse_args() + commandline_args = [arg for arg in sys.argv[1:] if '-' in arg or '--' in arg] + configfile = args.configfile + args_dict = vars(args) + if configfile: + with open(configfile, 'r') as f: + config = yaml.safe_load(f) + for _, params in config.items(): + for param_name, param_value in params.items(): + if param_name not in commandline_args or param_name.replace('_', '-') not in commandline_args: + if param_name in self.param_dest: + param_name = self.param_dest[param_name] + args_dict[param_name] = param_value + return argparse.Namespace(**args_dict) + + def get_pipeline_params(self): + params = { + 'datadir': self.args.datadir, + 'siteid': self.args.siteid, + 'sitedir': self.args.sitedir, + 'firstyear': self.args.firstyear, + 'lastyear': self.args.lastyear, + 'prod_to_compare': self.prod, + 'perc_to_compare': self.perc, + 'mcr_directory': self.args.mcr_directory, + 'timestamp': self.args.timestamp, + 'record_interval': self.args.recint, + 'version_data': self.args.versiond, + 'version_proc': self.args.versionp, + 'era_first_year': self.args.erafy, + 'era_last_year': self.args.eraly + } + return params + + def get_partition_nt_params(self): + params = { + 'datadir': self.args.datadir, + 'siteid': self.args.siteid, + 'sitedir': self.args.sitedir, + 'years_to_compare':range(self.args.firstyear, self.args.lastyear + 1), + 'py_remove_old': self.args.forcepy, + 'prod_to_compare': self.perc, + 'perc_to_compare': self.prod + } + return params + + def get_partition_dt_params(self): + params = { + 'datadir': self.args.datadir, + 'siteid': self.args.siteid, + 'sitedir': self.args.sitedir, + 'years_to_compare':range(self.args.firstyear, self.args.lastyear + 1), + 'py_remove_old': self.args.forcepy, + 'prod_to_compare': self.perc, + 'perc_to_compare': self.prod + } + return params + + def export_to_yaml(self): + saved_dict = {} + args_dict = vars(self.args) + for group_name, params in self.config_default.items(): + saved_dict[group_name] = {} + for param, param_value in params.items(): + if param not in self.param_dest: + p = param + if p in args_dict and args_dict[p] != param_value: + saved_dict[group_name][p] = args_dict[p] + else: + p = self.param_dest[param] + if p in args_dict and args_dict[p] != param_value: + saved_dict[group_name][param] = args_dict[p] + with open('run_params_{}.yaml'.format(dt.datetime.now().strftime('%d-%m-%Y_%H-%M-%S')), 'w') as f: + yaml.dump(saved_dict, f, default_flow_style=False) + + + def log_msg(self): + msg = "Using: \n" + msg += "\tcommand ({c}) \n".format(c=self.args.command) + msg += "\tdata-dir ({i}) \n".format(i=self.args.datadir) + msg += "\tsite-id ({i}) \n".format(i=self.args.siteid) + msg += "\tsite-dir ({d}) \n".format(d=self.args.sitedir) + msg += "\tfirst-year ({y}) \n".format(y=self.args.firstyear) + msg += "\tlast-year ({y}) \n".format(y=self.args.lastyear) + msg += "\tperc ({i}) \n".format(i=self.perc) + msg += "\tprod ({i}) \n".format(i=self.prod) + msg += "\tlog-file ({f}) \n".format(f=self.args.logfile) + msg += "\tforce-py ({i}) \n".format(i=self.args.forcepy) + msg += "\tversionp ({i}) \n".format(i=self.args.versionp) + msg += "\tversiond ({i}) \n".format(i=self.args.versiond) + msg += "\tera-fy ({i}) \n".format(i=self.args.erafy) + msg += "\tera-ly ({i}) \n".format(i=self.args.eraly) + log.debug(msg) + + def run_check(self): + if self.args.command not in RUN_MODE: + raise ONEFluxError("Unknown command: {c}".format(c=self.args.command)) + if not os.path.isdir(os.path.join(self.args.datadir, self.args.sitedir)): + raise ONEFluxError("Site dir not found: {d}".format(d=self.args.sitedir)) + + def get_run_mode_func(self): + return RUN_MODE[self.args.command] + +def _type_from_str(s): + s_dict = {'str': str, 'int': int, 'float': float} + return s_dict[s] + +def _str_to_class(module, class_name): + try: + m = importlib.import_module(module) + try: + c = getattr(m, class_name) + except AttributeError: + raise AttributeError('Class does not exist') + except ImportError: + raise ImportError('Module does not exist') + return c + +def _parse_choices(choices): + if isinstance(choices, list): + return choices + # import from file + elif isinstance(choices, str): + module, class_name = choices.split(':') + return _str_to_class(module, class_name) diff --git a/runoneflux.py b/runoneflux.py index 245ba59b..d92328b3 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -10,163 +10,48 @@ @contact: gzpastorello@lbl.gov @date: 2017-01-31 ''' -import os import sys import logging -import argparse import traceback -import yaml -from datetime import datetime -from oneflux import ONEFluxError, log_config, log_trace, VERSION_PROCESSING, VERSION_METADATA -from oneflux.tools.partition_nt import run_partition_nt, PROD_TO_COMPARE, PERC_TO_COMPARE -from oneflux.tools.partition_dt import run_partition_dt -from oneflux.tools.pipeline import run_pipeline, NOW_TS -from oneflux.pipeline.common import ERA_FIRST_YEAR, ERA_LAST_YEAR +from oneflux import log_config, log_trace +from oneflux.configs.utils import ONEFluxConfig log = logging.getLogger(__name__) - -DEFAULT_LOGGING_FILENAME = 'oneflux.log' -COMMAND_LIST = ['partition_nt', 'partition_dt', 'all'] - # main function if __name__ == '__main__': # cli arguments - parser = argparse.ArgumentParser() - parser.add_argument('--command', metavar="COMMAND", help="ONEFlux command to be run", type=str, choices=COMMAND_LIST, default=None) - parser.add_argument('--datadir', metavar="DATA-DIR", help="Absolute path to general data directory", type=str, default=None ) - parser.add_argument('--siteid', metavar="SITE-ID", help="Site Flux ID in the form CC-XXX", type=str, default=None) - parser.add_argument('--sitedir', metavar="SITE-DIR", help="Relative path to site data directory (within data-dir)", type=str, default=None) - parser.add_argument('--firstyear', metavar="FIRST-YEAR", help="First year of data to be processed", type=int, default=None) - parser.add_argument('--lastyear', metavar="LAST-YEAR", help="Last year of data to be processed", type=int, default=None) - parser.add_argument('--perc', metavar="PERC", help="List of percentiles to be processed", dest='perc', type=str, choices=PERC_TO_COMPARE, action='append', nargs='*') - parser.add_argument('--prod', metavar="PROD", help="List of products to be processed", dest='prod', type=str, choices=PROD_TO_COMPARE, action='append', nargs='*') - parser.add_argument('-l', '--logfile', help="Logging file path", type=str, dest='logfile', default=DEFAULT_LOGGING_FILENAME) - parser.add_argument('--force-py', help="Force execution of PY partitioning (saves original output, generates new)", action='store_true', dest='forcepy', default=False) - parser.add_argument('--mcr', help="Path to MCR directory", type=str, dest='mcr_directory', default=None) - parser.add_argument('--ts', help="Timestamp to be used in processing IDs", type=str, dest='timestamp', default=NOW_TS) - parser.add_argument('--recint', help="Record interval for site", type=str, choices=['hh', 'hr'], dest='recint', default='hh') - parser.add_argument('--versionp', help="Version of processing (hardcoded default)", type=str, dest='versionp', default=str(VERSION_PROCESSING)) - parser.add_argument('--versiond', help="Version of data (hardcoded default)", type=str, dest='versiond', default=str(VERSION_METADATA)) - parser.add_argument('--era-fy', help="ERA first year of data (default {y})".format(y=ERA_FIRST_YEAR), type=int, dest='erafy', default=None) - parser.add_argument('--era-ly', help="ERA last year of data (default {y})".format(y=ERA_LAST_YEAR), type=int, dest='eraly', default=None) - parser.add_argument('--configfile', help="Path to config file", type=str, dest='configfile', default=None) - args = parser.parse_args() + config = ONEFluxConfig() # setup logging file and stdout - log_config(level=logging.DEBUG, filename=args.logfile, std=True, std_level=logging.DEBUG) - - command = datadir = siteid = sitedir = firstyear = lastyear = \ - prod = perc = logfile = forcepy = versionp = versiond = erafy = eraly = None - steps = {} - - if not args.configfile: - if not args.command and not args.datadir and not args.siteid \ - and not args.sitedir and not args.firstyear and not args.lastyear: - raise Exception('Please provide path to config file or required parameters: command, datadir, siteid, sitedir.') - else: - run_config = {} - with open(args.configfile, 'r') as f: - config = yaml.safe_load(f) - run_config = config.get('run', {}) - command = args.command if args.command else run_config.get('command', 'all') - datadir = args.datadir if args.datadir else run_config.get('data_dir', None) - siteid = args.siteid if args.siteid else run_config.get('site_id', None) - sitedir = args.sitedir if args.sitedir else run_config.get('site_dir', None) - firstyear = args.firstyear if args.firstyear else run_config.get('first_year', None) - lastyear = args.lastyear if args.lastyear else run_config.get('last_year', None) - prod = args.prod if args.prod else run_config.get('prod', None) - perc = args.perc if args.perc else run_config.get('perc', None) - logfile = args.logfile if args.logfile else run_config.get('log_file', DEFAULT_LOGGING_FILENAME) - forcepy = args.forcepy if args.forcepy else run_config.get('force_py', False) - mcr_directory = args.mrc_directory if args.mcr_directory else run_config.get('mcr_directory', None) - timestamp = args.timestamp if args.timestamp else run_config.get('timestamp', NOW_TS) - recint = args.recint if args.recint else run_config.get('mcr_directory', None) - versionp = args.versionp if args.versionp else run_config.get('version_processing', str(VERSION_PROCESSING)) - versiond = args.versiond if args.versiond else run_config.get('version_data', str(VERSION_METADATA)) - erafy = args.erafy if args.erafy else run_config.get('era_first_year', int(ERA_FIRST_YEAR)) - eraly = args.eraly if args.eraly else run_config.get('era_last_year', int(ERA_LAST_YEAR)) - if not erafy: - erafy = int(ERA_FIRST_YEAR) - if not eraly: - eraly = int(ERA_LAST_YEAR) - steps = run_config.get('steps', {}) + log_config(level=logging.DEBUG, + filename=config.args.logfile, + std=True, + std_level=logging.DEBUG) # set defaults if no perc or prod - perc = (PERC_TO_COMPARE if perc is None else perc[0]) - prod = (PROD_TO_COMPARE if prod is None else prod[0]) - - msg = "Using:" - msg += "command ({c})".format(c=command) - msg += ", data-dir ({i})".format(i=datadir) - msg += ", site-id ({i})".format(i=siteid) - msg += ", site-dir ({d})".format(d=sitedir) - msg += ", first-year ({y})".format(y=firstyear) - msg += ", last-year ({y})".format(y=lastyear) - msg += ", perc ({i})".format(i=perc) - msg += ", prod ({i})".format(i=prod) - msg += ", log-file ({f})".format(f=logfile) - msg += ", force-py ({i})".format(i=forcepy) - msg += ", versionp ({i})".format(i=versionp) - msg += ", versiond ({i})".format(i=versiond) - msg += ", era-fy ({i})".format(i=erafy) - msg += ", era-ly ({i})".format(i=eraly) - log.debug(msg) - + config.log_msg() + config.run_check() + config.export_to_yaml() # start execution - try: - # check arguments - print os.path.join(datadir, sitedir) - if not os.path.isdir(os.path.join(datadir, sitedir)): - raise ONEFluxError("Site dir not found: {d}".format(d=sitedir)) - - run_params = {'run': - { - 'command': command, - 'data_dir': datadir, - 'site_id': siteid, - 'first_year': firstyear, - 'last_year': lastyear, - 'prod': prod, - 'perc': perc, - 'log_file': logfile, - 'force_py': forcepy, - 'mcr_dir': mcr_directory, - 'timestamp': timestamp, - 'record_interval': recint, - 'version_processing': versionp, - 'version_data': versiond, - 'era_first_year': erafy, - 'era_last_year': eraly, - 'steps': steps - }} - - datetime_log = datetime.now().strftime('%y_%m_%d-%H_%M_%S') - with open(os.path.join(datadir, sitedir, 'run_params_{}.yaml'.format(datetime_log)), 'w') as f: - yaml.dump(run_params, f) - # run command - log.info("Starting execution: {c}".format(c=command)) - if command == 'all': - run_pipeline(datadir=datadir, siteid=siteid, sitedir=sitedir, firstyear=firstyear, lastyear=lastyear, - prod_to_compare=prod, perc_to_compare=perc, mcr_directory=mcr_directory, timestamp=timestamp, - record_interval=recint, version_data=versiond, version_proc=versionp, - era_first_year=erafy, era_last_year=eraly, steps=steps) - elif command == 'partition_nt': - run_partition_nt(datadir=datadir, siteid=siteid, sitedir=sitedir, years_to_compare=range(firstyear, lastyear + 1), - py_remove_old=forcepy, prod_to_compare=prod, perc_to_compare=perc) - elif command == 'partition_dt': - run_partition_dt(datadir=datadir, siteid=siteid, sitedir=sitedir, years_to_compare=range(firstyear, lastyear + 1), - py_remove_old=forcepy, prod_to_compare=prod, perc_to_compare=perc) - else: - raise ONEFluxError("Unknown command: {c}".format(c=command)) - log.info("Finished execution: {c}".format(c=command)) - - except Exception as e: - msg = log_trace(exception=e, level=logging.CRITICAL, log=log) - log.critical("***Problem during execution*** {e}".format(e=str(e))) - tb = traceback.format_exc() - log.critical("***Problem traceback*** {s}".format(s=str(tb))) - sys.exit(msg) + # try: + # # run command + # log.info("Starting execution: {c}".format(c=config.args.command)) + # run_mode_func = config.get_run_mode_func() + # if config.args.command == 'all': + # run_mode_func(**config.get_pipeline_params()) + # elif config.args.command == 'partition_nt': + # run_mode_func(**config.get_partition_nt_params()) + # elif config.args.command == 'partition_dt': + # run_mode_func(**config.get_partition_dt_params()) + # log.info("Finished execution: {c}".format(c=config.args.command)) + + # except Exception as e: + # msg = log_trace(exception=e, level=logging.CRITICAL, log=log) + # log.critical("***Problem during execution*** {e}".format(e=str(e))) + # tb = traceback.format_exc() + # log.critical("***Problem traceback*** {s}".format(s=str(tb))) + # sys.exit(msg) sys.exit(0) From c8639a1890b3f39e36ab0630b6236fc7e9bac582 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Thu, 17 Nov 2022 10:51:44 -0800 Subject: [PATCH 05/18] update runoneflux --- oneflux/configs/utils.py | 1 + runoneflux.py | 36 ++++++++++++++++++------------------ 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 1d26e201..480bfab6 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -19,6 +19,7 @@ 'partition_nt': run_partition_nt, 'partition_dt': run_partition_dt } + YAML_TEMPLATE_PATH = 'oneflux/configs/config_template.yaml' YAML_DESCRIPTION_PATH ='oneflux/configs/config_description.yaml' diff --git a/runoneflux.py b/runoneflux.py index d92328b3..e8bc8a1d 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -35,23 +35,23 @@ config.run_check() config.export_to_yaml() # start execution - # try: - # # run command - # log.info("Starting execution: {c}".format(c=config.args.command)) - # run_mode_func = config.get_run_mode_func() - # if config.args.command == 'all': - # run_mode_func(**config.get_pipeline_params()) - # elif config.args.command == 'partition_nt': - # run_mode_func(**config.get_partition_nt_params()) - # elif config.args.command == 'partition_dt': - # run_mode_func(**config.get_partition_dt_params()) - # log.info("Finished execution: {c}".format(c=config.args.command)) - - # except Exception as e: - # msg = log_trace(exception=e, level=logging.CRITICAL, log=log) - # log.critical("***Problem during execution*** {e}".format(e=str(e))) - # tb = traceback.format_exc() - # log.critical("***Problem traceback*** {s}".format(s=str(tb))) - # sys.exit(msg) + try: + # run command + log.info("Starting execution: {c}".format(c=config.args.command)) + run_mode_func = config.get_run_mode_func() + if config.args.command == 'all': + run_mode_func(**config.get_pipeline_params()) + elif config.args.command == 'partition_nt': + run_mode_func(**config.get_partition_nt_params()) + elif config.args.command == 'partition_dt': + run_mode_func(**config.get_partition_dt_params()) + log.info("Finished execution: {c}".format(c=config.args.command)) + + except Exception as e: + msg = log_trace(exception=e, level=logging.CRITICAL, log=log) + log.critical("***Problem during execution*** {e}".format(e=str(e))) + tb = traceback.format_exc() + log.critical("***Problem traceback*** {s}".format(s=str(tb))) + sys.exit(msg) sys.exit(0) From 3c81d88dc7d10442e2cc14352f27467e26193615 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Wed, 23 Nov 2022 09:23:16 -0800 Subject: [PATCH 06/18] seperate groups --- oneflux/configs/config_description.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/oneflux/configs/config_description.yaml b/oneflux/configs/config_description.yaml index 1be37290..b349eab0 100644 --- a/oneflux/configs/config_description.yaml +++ b/oneflux/configs/config_description.yaml @@ -86,46 +86,57 @@ run: dummy-step-1: help: dummy-step-1 type: str + 02_qc_auto: dummy-step-2: help: dummy-step-2 type: str + 04_ustar_mp: dummy-step-4: help: dummy-step-4 type: str + 05_ustar_cp: dummy-step-5: help: dummy-step-5 type: str + 06_meteo_era: dummy-step-6: help: dummy-step-6 type: str + 07_meteo_proc: dummy-step-7: help: dummy-step-7 type: str + 08_nee_proc: dummy-step-8: help: dummy-step-8 type: str + 09_energy_proc: dummy-step-9: help: dummy-step-9 type: str + 10_nee_partition_nt: dummy-step-10: help: dummy-step-10 type: str + 11_nee_partition_dt: dummy-step-11: help: dummy-step-11 type: str + 12_ure: dummy-step-12: help: dummy-step-12 type: str + 99_fluxnet2015: dummy-step-99: help: dummy-step-99 From 91e96f09c90cd4cac5d5b8a2194bd951a691b766 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 10:36:56 -0800 Subject: [PATCH 07/18] change datatype of erafy to int --- oneflux/pipeline/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oneflux/pipeline/common.py b/oneflux/pipeline/common.py index 866b259e..4eead754 100644 --- a/oneflux/pipeline/common.py +++ b/oneflux/pipeline/common.py @@ -42,7 +42,7 @@ MODE_PRODUCT = 'FLUXNET2015' MODE_ERA = 'ERAI' # most recent year available for ERA -- assuming new ERA year available after March each year - ERA_FIRST_YEAR = '1989' + ERA_FIRST_YEAR = 1989 ERA_LAST_YEAR = (NOW_DATETIME.year - 1 if (NOW_DATETIME.month > 3) else NOW_DATETIME.year - 2) ERA_FIRST_TIMESTAMP_START_TEMPLATE = '{y}01010000' From 7fbab898322418435242dcdc1ac2c5713ab3c6d8 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 10:54:35 -0800 Subject: [PATCH 08/18] add test argparse --- oneflux/configs/utils.py | 85 ++++++++++++++++----------------- tests/argparse/__init__.py | 0 tests/argparse/config.yaml | 4 ++ tests/argparse/test_argparse.py | 54 +++++++++++++++++++++ 4 files changed, 100 insertions(+), 43 deletions(-) create mode 100644 tests/argparse/__init__.py create mode 100644 tests/argparse/config.yaml create mode 100644 tests/argparse/test_argparse.py diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 480bfab6..b3b314d1 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -23,54 +23,53 @@ YAML_TEMPLATE_PATH = 'oneflux/configs/config_template.yaml' YAML_DESCRIPTION_PATH ='oneflux/configs/config_description.yaml' +def argparse_from_yaml_and_cli(): + param_dest = {} + config_default = {} + with open(YAML_DESCRIPTION_PATH, 'r') as f: + config_description = yaml.safe_load(f) + with open(YAML_TEMPLATE_PATH, 'r') as f: + config_default = yaml.safe_load(f) + parser = argparse.ArgumentParser() + for parser_group_name, params in config_description.items(): + group = parser.add_argument_group(parser_group_name) + default_val = config_default.get(parser_group_name, {}) + for param_name, param_details in params.items(): + if 'type' in param_details: + param_details['type'] = _type_from_str(param_details['type']) + if 'choices' in param_details: + param_details['choices'] = _parse_choices(param_details['choices']) + if 'dest' in param_details: + param_dest[param_name] = param_details['dest'] + if param_name in default_val: + param_details['default'] = default_val[param_name] + if isinstance(param_details['default'], str) and ':' in param_details['default']: + module, class_name = default_val[param_name].split(':') + param_details['default'] = _str_to_class(module, class_name) + param_details['help'] += ' (default to {})'.format(param_details['default']) + param_name = param_name.replace('_', '-') + group.add_argument('--{}'.format(param_name), **param_details) + args = parser.parse_args() + commandline_args = [arg.strip('-_') for arg in sys.argv[1:] if '-' in arg or '--' in arg] + configfile = args.configfile + args_dict = vars(args) + if configfile: + with open(configfile, 'r') as f: + config = yaml.safe_load(f) + for _, params in config.items(): + for param_name, param_value in params.items(): + if param_name not in commandline_args: + if param_name in param_dest: + param_name = param_dest[param_name] + args_dict[param_name] = param_value + return config_default, param_dest, argparse.Namespace(**args_dict) + class ONEFluxConfig: def __init__(self): - self.param_dest = {} - self.args = self.argparse_from_yaml() + self.config_default, self.param_dest, self.args = argparse_from_yaml_and_cli() self.perc = (PERC_TO_COMPARE if self.args.perc is None else self.args.perc[0]) self.prod = (PROD_TO_COMPARE if self.args.prod is None else self.args.prod[0]) - def argparse_from_yaml(self): - with open(YAML_DESCRIPTION_PATH, 'r') as f: - config_description = yaml.safe_load(f) - with open(YAML_TEMPLATE_PATH, 'r') as f: - config_default = yaml.safe_load(f) - parser = argparse.ArgumentParser() - - self.config_default = config_default - for parser_group_name, params in config_description.items(): - group = parser.add_argument_group(parser_group_name) - default_val = config_default.get(parser_group_name, {}) - for param_name, param_details in params.items(): - if 'type' in param_details: - param_details['type'] = _type_from_str(param_details['type']) - if 'choices' in param_details: - param_details['choices'] = _parse_choices(param_details['choices']) - if 'dest' in param_details: - self.param_dest[param_name] = param_details['dest'] - if param_name in default_val: - param_details['default'] = default_val[param_name] - if isinstance(param_details['default'], str) and ':' in param_details['default']: - module, class_name = default_val[param_name].split(':') - param_details['default'] = _str_to_class(module, class_name) - param_details['help'] += ' (default to {})'.format(param_details['default']) - param_name = param_name.replace('_', '-') - group.add_argument('--{}'.format(param_name), **param_details) - args = parser.parse_args() - commandline_args = [arg for arg in sys.argv[1:] if '-' in arg or '--' in arg] - configfile = args.configfile - args_dict = vars(args) - if configfile: - with open(configfile, 'r') as f: - config = yaml.safe_load(f) - for _, params in config.items(): - for param_name, param_value in params.items(): - if param_name not in commandline_args or param_name.replace('_', '-') not in commandline_args: - if param_name in self.param_dest: - param_name = self.param_dest[param_name] - args_dict[param_name] = param_value - return argparse.Namespace(**args_dict) - def get_pipeline_params(self): params = { 'datadir': self.args.datadir, diff --git a/tests/argparse/__init__.py b/tests/argparse/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/argparse/config.yaml b/tests/argparse/config.yaml new file mode 100644 index 00000000..187b4d35 --- /dev/null +++ b/tests/argparse/config.yaml @@ -0,0 +1,4 @@ +run: + siteid: dummy_siteid + sitedir: dummy_sitedir + mcr: dummy_mcrdir/MATLAB_Runtime/v94 diff --git a/tests/argparse/test_argparse.py b/tests/argparse/test_argparse.py new file mode 100644 index 00000000..6f2270ce --- /dev/null +++ b/tests/argparse/test_argparse.py @@ -0,0 +1,54 @@ +import unittest +from unittest.mock import patch +import sys +import oneflux +from oneflux.configs.utils import ONEFluxConfig, argparse_from_yaml_and_cli + +class TestArgParse(unittest.TestCase): + def test_argparse_default(self): + with patch("sys.argv", ["python runoneflux.py"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.command, 'all') + self.assertEqual(args.datadir, './data') + self.assertEqual(args.siteid, '') + self.assertEqual(args.sitedir, '') + self.assertEqual(args.firstyear, 2005) + self.assertEqual(args.lastyear, 2006) + self.assertEqual(args.perc, None) + self.assertEqual(args.prod, None) + self.assertEqual(args.logfile, 'oneflux.log') + self.assertEqual(args.forcepy, False) + self.assertEqual(args.mcr_directory, '/MATLAB_Runtime/v94') + self.assertEqual(args.recint, 'hh') + self.assertEqual(args.versionp, oneflux.VERSION_PROCESSING) + self.assertEqual(args.versiond, oneflux.VERSION_METADATA) + self.assertEqual(args.erafy, oneflux.pipeline.common.ERA_FIRST_YEAR) + self.assertEqual(args.eraly, oneflux.pipeline.common.ERA_LAST_YEAR) + self.assertEqual(args.configfile, None) + + def test_argparse_cli_value(self): + eraly_val = 3000 + with patch("sys.argv", ["python runoneflux.py", + "--era-ly", f"{eraly_val}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.eraly, eraly_val) + + def test_argparse_yaml(self): + test_yaml_path = './tests/argparse/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.siteid, 'dummy_siteid') + self.assertEqual(args.sitedir, 'dummy_sitedir') + self.assertEqual(args.mcr_directory, 'dummy_mcrdir/MATLAB_Runtime/v94') + + def test_arg_cli_overwrite_yaml(self): + test_yaml_path = './tests/argparse/config.yaml' + overwritten_siteid = 'dummy_overwritten_siteid' + with patch("sys.argv", ["python runoneflux.py", + "--siteid", f"{overwritten_siteid}", + "--configfile", f"{test_yaml_path}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.siteid, overwritten_siteid) + self.assertEqual(args.sitedir, 'dummy_sitedir') + self.assertEqual(args.mcr_directory, 'dummy_mcrdir/MATLAB_Runtime/v94') From 68eb17e5b7e8963d0f01ff622e50b54ba4654a3c Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 14:41:41 -0800 Subject: [PATCH 09/18] update with more tests --- tests/argparse/config_faulty.yaml | 5 +++++ tests/argparse/test_argparse.py | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 tests/argparse/config_faulty.yaml diff --git a/tests/argparse/config_faulty.yaml b/tests/argparse/config_faulty.yaml new file mode 100644 index 00000000..d05cbda9 --- /dev/null +++ b/tests/argparse/config_faulty.yaml @@ -0,0 +1,5 @@ +run: + siteid: dummy_siteid + sitedir: dummy_sitedir + mcr: dummy_mcrdir/MATLAB_Runtime/v94 + wrong_arg: wrong_arg diff --git a/tests/argparse/test_argparse.py b/tests/argparse/test_argparse.py index 6f2270ce..4ae05f82 100644 --- a/tests/argparse/test_argparse.py +++ b/tests/argparse/test_argparse.py @@ -52,3 +52,20 @@ def test_arg_cli_overwrite_yaml(self): self.assertEqual(args.siteid, overwritten_siteid) self.assertEqual(args.sitedir, 'dummy_sitedir') self.assertEqual(args.mcr_directory, 'dummy_mcrdir/MATLAB_Runtime/v94') + + def test_arg_cli_wrong_args(self): + wrong_arg = 'wrong_arg' + wrong_arg_value = 'wrong_arg_value' + test_yaml_path = './tests/argparse/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + f"--{wrong_arg}", f"{wrong_arg_value}", + "--configfile", f"{test_yaml_path}"]): + with self.assertRaises(SystemExit): + _, _, args = argparse_from_yaml_and_cli() + + def test_arg_yaml_wrong_args(self): + test_yaml_path = './tests/argparse/config_faulty.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + with self.assertRaises(ValueError): + _, _, args = argparse_from_yaml_and_cli() From 790e91890fb70353c32b8f804828cf401b6636ff Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 14:58:40 -0800 Subject: [PATCH 10/18] update not avail arg in yaml --- oneflux/configs/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index b3b314d1..9c91523a 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -26,6 +26,7 @@ def argparse_from_yaml_and_cli(): param_dest = {} config_default = {} + all_params = [] with open(YAML_DESCRIPTION_PATH, 'r') as f: config_description = yaml.safe_load(f) with open(YAML_TEMPLATE_PATH, 'r') as f: @@ -35,6 +36,7 @@ def argparse_from_yaml_and_cli(): group = parser.add_argument_group(parser_group_name) default_val = config_default.get(parser_group_name, {}) for param_name, param_details in params.items(): + all_params.append(param_name) if 'type' in param_details: param_details['type'] = _type_from_str(param_details['type']) if 'choices' in param_details: @@ -58,6 +60,8 @@ def argparse_from_yaml_and_cli(): config = yaml.safe_load(f) for _, params in config.items(): for param_name, param_value in params.items(): + if param_name not in all_params: + raise ValueError(f'{param_name} is unknown: it has not been declared in the config_description.yaml and the default value in config_template.yaml') if param_name not in commandline_args: if param_name in param_dest: param_name = param_dest[param_name] From 2fd6ab6ccaffebd1a131879336c20fec389f9a71 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 15:36:31 -0800 Subject: [PATCH 11/18] update export yaml function --- oneflux/configs/utils.py | 16 +++++++++++++--- tests/argparse/output_config.yaml | 10 ++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 tests/argparse/output_config.yaml diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 9c91523a..4e0f34a6 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -117,7 +117,7 @@ def get_partition_dt_params(self): } return params - def export_to_yaml(self): + def export_to_yaml(self, dir=None, name=None, overwritten=False): saved_dict = {} args_dict = vars(self.args) for group_name, params in self.config_default.items(): @@ -131,8 +131,18 @@ def export_to_yaml(self): p = self.param_dest[param] if p in args_dict and args_dict[p] != param_value: saved_dict[group_name][param] = args_dict[p] - with open('run_params_{}.yaml'.format(dt.datetime.now().strftime('%d-%m-%Y_%H-%M-%S')), 'w') as f: - yaml.dump(saved_dict, f, default_flow_style=False) + if not name: + name = 'run_params_{}.yaml'.format(dt.datetime.now().strftime('%d-%m-%Y_%H-%M-%S')) + if not dir: + path = name + else: + path = f'{dir}/{name}' + if dir and not os.path.exists(dir): + os.makedirs(dir) + if overwritten and os.path.isfile(path): + os.remove(path) + with open(path, 'w') as f: + yaml.dump(saved_dict, f, default_flow_style=False) def log_msg(self): diff --git a/tests/argparse/output_config.yaml b/tests/argparse/output_config.yaml new file mode 100644 index 00000000..56df22a4 --- /dev/null +++ b/tests/argparse/output_config.yaml @@ -0,0 +1,10 @@ +run: + configfile: ./tests/argparse/config.yaml + era_fy: 1989 + era_ly: 2021 + mcr: dummy_mcrdir/MATLAB_Runtime/v94 + sitedir: dummy_sitedir + siteid: dummy_siteid + ts: 20221205T153559 + versiond: beta + versionp: 5 From 6170df660934de8fed3accb58428b67addceb79c Mon Sep 17 00:00:00 2001 From: toanngosy Date: Mon, 5 Dec 2022 16:23:28 -0800 Subject: [PATCH 12/18] update tests --- tests/argparse/test_argparse.py | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/argparse/test_argparse.py b/tests/argparse/test_argparse.py index 4ae05f82..654592b6 100644 --- a/tests/argparse/test_argparse.py +++ b/tests/argparse/test_argparse.py @@ -1,8 +1,10 @@ import unittest from unittest.mock import patch import sys +import yaml import oneflux from oneflux.configs.utils import ONEFluxConfig, argparse_from_yaml_and_cli +from oneflux.tools.partition_nt import PROD_TO_COMPARE, PERC_TO_COMPARE class TestArgParse(unittest.TestCase): def test_argparse_default(self): @@ -69,3 +71,44 @@ def test_arg_yaml_wrong_args(self): "--configfile", f"{test_yaml_path}"]): with self.assertRaises(ValueError): _, _, args = argparse_from_yaml_and_cli() + + def test_onefluxconfig_args(self): + test_yaml_path = './tests/argparse/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + onefluxconfig = ONEFluxConfig() + self.assertEqual(onefluxconfig.perc, PERC_TO_COMPARE) + self.assertEqual(onefluxconfig.prod, PROD_TO_COMPARE) + + def test_onefluxconfig_export_to_yaml(self): + dir = './tests/argparse' + yaml_filename = 'config.yaml' + output_yaml_filename = 'output_config.yaml' + test_yaml_path = f'{dir}/{yaml_filename}' + test_output_yaml_path = f'{dir}/{output_yaml_filename}' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", test_yaml_path]): + onefluxconfig = ONEFluxConfig() + onefluxconfig.export_to_yaml(dir, + output_yaml_filename, + overwritten=True) + with open(test_output_yaml_path) as f: + conf = yaml.safe_load(f) + self.assertEqual(conf['run']['configfile'], onefluxconfig.args.configfile) + self.assertEqual(conf['run']['era_fy'], onefluxconfig.args.erafy) + self.assertEqual(conf['run']['era_ly'], onefluxconfig.args.eraly) + self.assertEqual(conf['run']['mcr'], onefluxconfig.args.mcr_directory) + self.assertEqual(conf['run']['sitedir'], onefluxconfig.args.sitedir) + self.assertEqual(conf['run']['siteid'], onefluxconfig.args.siteid) + self.assertEqual(conf['run']['versiond'], onefluxconfig.args.versiond) + self.assertEqual(conf['run']['versionp'], onefluxconfig.args.versionp) + + with patch("sys.argv", ["python runoneflux.py", + "--configfile", test_output_yaml_path]): + new_onefluxconfig = ONEFluxConfig() + + no_check_args_list = ['timestamp', 'configfile'] + for name, val in onefluxconfig.args._get_kwargs(): + if name not in no_check_args_list: + self.assertEqual(val, + new_onefluxconfig.args.__getattribute__(name)) \ No newline at end of file From 27cc39cc92f9ef604af756c99b6256906556d6c0 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Tue, 6 Dec 2022 00:42:22 -0800 Subject: [PATCH 13/18] save seperate step params --- oneflux/configs/config_description.yaml | 41 +++++++++++++++++++++++++ oneflux/configs/config_template.yaml | 32 ++++++++++--------- oneflux/configs/utils.py | 33 ++++++++++++++++++-- tests/argparse/test_argparse.py | 2 +- 4 files changed, 91 insertions(+), 17 deletions(-) diff --git a/oneflux/configs/config_description.yaml b/oneflux/configs/config_description.yaml index b349eab0..f2a998f2 100644 --- a/oneflux/configs/config_description.yaml +++ b/oneflux/configs/config_description.yaml @@ -82,6 +82,47 @@ run: type: str dest: configfile +steps: + qc_auto_execute: + help: enable qc auto execution + type: bool + ustar_mp_execute: + help: enable ustar mp execution + type: bool + user_cp_execute: + help: enable user cp execution + type: bool + meteo_proc_execute: + help: enable meteo proc execution + type: bool + nee_proc_execute: + help: enable nee proc execution + type: bool + energy_proc_execute: + help: enable energy proc execution + type: bool + nee_partition_nt_execute: + help: enable nee partition nt execution + type: bool + nee_partition_dt_execute: + help: enable nee partition dt execution + type: bool + prepare_ure_execute: + help: enable prepare ure execution + type: bool + ure_execute: + help: enable ure execution + type: bool + fluxnet2015_execute: + help: enable qc auto execution + type: bool + fluxnet2015_site_plots: + help: enable fluxnet2015 site plots + type: bool + simulation: + help: enable simulation + type: bool + 01_qc_visual: dummy-step-1: help: dummy-step-1 diff --git a/oneflux/configs/config_template.yaml b/oneflux/configs/config_template.yaml index 8c24bfe9..2342c8cc 100644 --- a/oneflux/configs/config_template.yaml +++ b/oneflux/configs/config_template.yaml @@ -17,17 +17,21 @@ run: era_fy: oneflux.pipeline.common:ERA_FIRST_YEAR era_ly: oneflux.pipeline.common:ERA_LAST_YEAR configfile: - steps: - qc_auto_execute: true - ustar_mp_execute: true - user_cp_execute: true - meteo_proc_execute: true - nee_proc_execute: true - energy_proc_execute: true - nee_partition_nt_execute: true - nee_partition_dt_execute: true - prepare_ure_execute: true - ure_execute: true - fluxnet2015_execute: true - fluxnet2015_site_plots: true - simulation: false + +steps: + qc_auto_execute: true + ustar_mp_execute: true + user_cp_execute: true + meteo_proc_execute: true + nee_proc_execute: true + energy_proc_execute: true + nee_partition_nt_execute: true + nee_partition_dt_execute: true + prepare_ure_execute: true + ure_execute: true + fluxnet2015_execute: true + fluxnet2015_site_plots: true + simulation: false + +01_qc_visual: + dummy-step-1: dummy_step_1_test_string diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 4e0f34a6..7e36bd74 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -89,7 +89,22 @@ def get_pipeline_params(self): 'version_data': self.args.versiond, 'version_proc': self.args.versionp, 'era_first_year': self.args.erafy, - 'era_last_year': self.args.eraly + 'era_last_year': self.args.eraly, + 'steps': { + 'qc_auto_execute': self.args.qc_auto_execute, + 'ustar_mp_execute': self.args.ustar_mp_execute, + 'user_cp_execute': self.args.user_cp_execute, + 'meteo_proc_execute': self.args.meteo_proc_execute, + 'nee_proc_execute': self.args.nee_proc_execute, + 'energy_proc_execute': self.args.energy_proc_execute, + 'nee_partition_nt_execute': self.args.nee_partition_nt_execute, + 'nee_partition_dt_execute': self.args.nee_partition_dt_execute, + 'prepare_ure_execute': self.args.prepare_ure_execute, + 'ure_execute': self.args.ure_execute, + 'fluxnet2015_execute': self.args.fluxnet2015_execute, + 'fluxnet2015_site_plots': self.args.fluxnet2015_site_plots, + 'simulation': self.args.simulation + } } return params @@ -143,6 +158,20 @@ def export_to_yaml(self, dir=None, name=None, overwritten=False): os.remove(path) with open(path, 'w') as f: yaml.dump(saved_dict, f, default_flow_style=False) + step_name_list = ['01_qc_visual', '02_qc_auto', '04_ustar_mp', '05_ustar_cp', + '06_meteo_era', '07_meteo_proc', '08_nee_proc', '09_energy_proc', + '10_nee_partition_nt', '11_nee_partition_dt', '12_ure', + '99_fluxnet2015'] + for step in step_name_list: + v = {} + for arg in self.config_default.get(step, []): + v[arg] = self.args.__getattribute__(arg.replace('-', '_')) + if v: + path = f'{dir}/{step}_{name}' + if overwritten and os.path.isfile(path): + os.remove(path) + with open(path, 'w') as f: + yaml.dump(v, f, default_flow_style=False) def log_msg(self): @@ -173,7 +202,7 @@ def get_run_mode_func(self): return RUN_MODE[self.args.command] def _type_from_str(s): - s_dict = {'str': str, 'int': int, 'float': float} + s_dict = {'str': str, 'int': int, 'float': float, 'bool': bool} return s_dict[s] def _str_to_class(module, class_name): diff --git a/tests/argparse/test_argparse.py b/tests/argparse/test_argparse.py index 654592b6..166e6d34 100644 --- a/tests/argparse/test_argparse.py +++ b/tests/argparse/test_argparse.py @@ -106,7 +106,7 @@ def test_onefluxconfig_export_to_yaml(self): with patch("sys.argv", ["python runoneflux.py", "--configfile", test_output_yaml_path]): new_onefluxconfig = ONEFluxConfig() - + no_check_args_list = ['timestamp', 'configfile'] for name, val in onefluxconfig.args._get_kwargs(): if name not in no_check_args_list: From 26a07f4f5e147638116f610d07d512b3b2b87fc1 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Wed, 7 Dec 2022 10:17:08 -0800 Subject: [PATCH 14/18] rename argparse to config --- tests/configs/01_qc_visual_output_config.yaml | 1 + tests/configs/__init__.py | 0 tests/configs/config.yaml | 4 + tests/configs/config_faulty.yaml | 5 + tests/configs/output_config.yaml | 12 ++ tests/configs/test_configs.py | 114 ++++++++++++++++++ 6 files changed, 136 insertions(+) create mode 100644 tests/configs/01_qc_visual_output_config.yaml create mode 100644 tests/configs/__init__.py create mode 100644 tests/configs/config.yaml create mode 100644 tests/configs/config_faulty.yaml create mode 100644 tests/configs/output_config.yaml create mode 100644 tests/configs/test_configs.py diff --git a/tests/configs/01_qc_visual_output_config.yaml b/tests/configs/01_qc_visual_output_config.yaml new file mode 100644 index 00000000..f6cc6c0b --- /dev/null +++ b/tests/configs/01_qc_visual_output_config.yaml @@ -0,0 +1 @@ +dummy-step-1: dummy_step_1_test_string diff --git a/tests/configs/__init__.py b/tests/configs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/configs/config.yaml b/tests/configs/config.yaml new file mode 100644 index 00000000..187b4d35 --- /dev/null +++ b/tests/configs/config.yaml @@ -0,0 +1,4 @@ +run: + siteid: dummy_siteid + sitedir: dummy_sitedir + mcr: dummy_mcrdir/MATLAB_Runtime/v94 diff --git a/tests/configs/config_faulty.yaml b/tests/configs/config_faulty.yaml new file mode 100644 index 00000000..d05cbda9 --- /dev/null +++ b/tests/configs/config_faulty.yaml @@ -0,0 +1,5 @@ +run: + siteid: dummy_siteid + sitedir: dummy_sitedir + mcr: dummy_mcrdir/MATLAB_Runtime/v94 + wrong_arg: wrong_arg diff --git a/tests/configs/output_config.yaml b/tests/configs/output_config.yaml new file mode 100644 index 00000000..93338422 --- /dev/null +++ b/tests/configs/output_config.yaml @@ -0,0 +1,12 @@ +01_qc_visual: {} +run: + configfile: ./tests/configs/config.yaml + era_fy: 1989 + era_ly: 2021 + mcr: dummy_mcrdir/MATLAB_Runtime/v94 + sitedir: dummy_sitedir + siteid: dummy_siteid + ts: 20221207T101625 + versiond: beta + versionp: 5 +steps: {} diff --git a/tests/configs/test_configs.py b/tests/configs/test_configs.py new file mode 100644 index 00000000..cc73995b --- /dev/null +++ b/tests/configs/test_configs.py @@ -0,0 +1,114 @@ +import unittest +from unittest.mock import patch +import sys +import yaml +import oneflux +from oneflux.configs.utils import ONEFluxConfig, argparse_from_yaml_and_cli +from oneflux.tools.partition_nt import PROD_TO_COMPARE, PERC_TO_COMPARE + +class TestConfigs(unittest.TestCase): + def test_argparse_default(self): + with patch("sys.argv", ["python runoneflux.py"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.command, 'all') + self.assertEqual(args.datadir, './data') + self.assertEqual(args.siteid, '') + self.assertEqual(args.sitedir, '') + self.assertEqual(args.firstyear, 2005) + self.assertEqual(args.lastyear, 2006) + self.assertEqual(args.perc, None) + self.assertEqual(args.prod, None) + self.assertEqual(args.logfile, 'oneflux.log') + self.assertEqual(args.forcepy, False) + self.assertEqual(args.mcr_directory, '/MATLAB_Runtime/v94') + self.assertEqual(args.recint, 'hh') + self.assertEqual(args.versionp, oneflux.VERSION_PROCESSING) + self.assertEqual(args.versiond, oneflux.VERSION_METADATA) + self.assertEqual(args.erafy, oneflux.pipeline.common.ERA_FIRST_YEAR) + self.assertEqual(args.eraly, oneflux.pipeline.common.ERA_LAST_YEAR) + self.assertEqual(args.configfile, None) + + def test_argparse_cli_value(self): + eraly_val = 3000 + with patch("sys.argv", ["python runoneflux.py", + "--era-ly", f"{eraly_val}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.eraly, eraly_val) + + def test_argparse_yaml(self): + test_yaml_path = './tests/configs/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.siteid, 'dummy_siteid') + self.assertEqual(args.sitedir, 'dummy_sitedir') + self.assertEqual(args.mcr_directory, 'dummy_mcrdir/MATLAB_Runtime/v94') + + def test_arg_cli_overwrite_yaml(self): + test_yaml_path = './tests/configs/config.yaml' + overwritten_siteid = 'dummy_overwritten_siteid' + with patch("sys.argv", ["python runoneflux.py", + "--siteid", f"{overwritten_siteid}", + "--configfile", f"{test_yaml_path}"]): + _, _, args = argparse_from_yaml_and_cli() + self.assertEqual(args.siteid, overwritten_siteid) + self.assertEqual(args.sitedir, 'dummy_sitedir') + self.assertEqual(args.mcr_directory, 'dummy_mcrdir/MATLAB_Runtime/v94') + + def test_arg_cli_wrong_args(self): + wrong_arg = 'wrong_arg' + wrong_arg_value = 'wrong_arg_value' + test_yaml_path = './tests/configs/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + f"--{wrong_arg}", f"{wrong_arg_value}", + "--configfile", f"{test_yaml_path}"]): + with self.assertRaises(SystemExit): + _, _, args = argparse_from_yaml_and_cli() + + def test_arg_yaml_wrong_args(self): + test_yaml_path = './tests/configs/config_faulty.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + with self.assertRaises(ValueError): + _, _, args = argparse_from_yaml_and_cli() + + def test_onefluxconfig_args(self): + test_yaml_path = './tests/configs/config.yaml' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", f"{test_yaml_path}"]): + onefluxconfig = ONEFluxConfig() + self.assertEqual(onefluxconfig.perc, PERC_TO_COMPARE) + self.assertEqual(onefluxconfig.prod, PROD_TO_COMPARE) + + def test_onefluxconfig_export_to_yaml(self): + dir = './tests/configs' + yaml_filename = 'config.yaml' + output_yaml_filename = 'output_config.yaml' + test_yaml_path = f'{dir}/{yaml_filename}' + test_output_yaml_path = f'{dir}/{output_yaml_filename}' + with patch("sys.argv", ["python runoneflux.py", + "--configfile", test_yaml_path]): + onefluxconfig = ONEFluxConfig() + onefluxconfig.export_to_yaml(dir, + output_yaml_filename, + overwritten=True) + with open(test_output_yaml_path) as f: + conf = yaml.safe_load(f) + self.assertEqual(conf['run']['configfile'], onefluxconfig.args.configfile) + self.assertEqual(conf['run']['era_fy'], onefluxconfig.args.erafy) + self.assertEqual(conf['run']['era_ly'], onefluxconfig.args.eraly) + self.assertEqual(conf['run']['mcr'], onefluxconfig.args.mcr_directory) + self.assertEqual(conf['run']['sitedir'], onefluxconfig.args.sitedir) + self.assertEqual(conf['run']['siteid'], onefluxconfig.args.siteid) + self.assertEqual(conf['run']['versiond'], onefluxconfig.args.versiond) + self.assertEqual(conf['run']['versionp'], onefluxconfig.args.versionp) + + with patch("sys.argv", ["python runoneflux.py", + "--configfile", test_output_yaml_path]): + new_onefluxconfig = ONEFluxConfig() + + no_check_args_list = ['timestamp', 'configfile'] + for name, val in onefluxconfig.args._get_kwargs(): + if name not in no_check_args_list: + self.assertEqual(val, + new_onefluxconfig.args.__getattribute__(name)) \ No newline at end of file From 4ce0ecd43b5342ffffdf7370ce95e8b066283750 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Thu, 22 Dec 2022 12:47:27 -0800 Subject: [PATCH 15/18] update save partial config --- oneflux/configs/utils.py | 47 +++++++++++++-------------------- oneflux/pipeline/wrappers.py | 38 ++++++++++++++++++++------- oneflux/tools/pipeline.py | 51 ++++-------------------------------- runoneflux.py | 10 ++++--- 4 files changed, 57 insertions(+), 89 deletions(-) diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 7e36bd74..ece0ec82 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -8,21 +8,15 @@ from oneflux import ONEFluxError from oneflux.tools.partition_nt import PROD_TO_COMPARE, PERC_TO_COMPARE -from oneflux.tools.partition_nt import run_partition_nt -from oneflux.tools.partition_dt import run_partition_dt -from oneflux.tools.pipeline import run_pipeline - log = logging.getLogger(__name__) -RUN_MODE = {'all': run_pipeline, - 'partition_nt': run_partition_nt, - 'partition_dt': run_partition_dt - } - YAML_TEMPLATE_PATH = 'oneflux/configs/config_template.yaml' YAML_DESCRIPTION_PATH ='oneflux/configs/config_description.yaml' - +STEP_NAME_LIST = ['01_qc_visual', '02_qc_auto', '04_ustar_mp', '05_ustar_cp', + '06_meteo_era', '07_meteo_proc', '08_nee_proc', '09_energy_proc', + '10_nee_partition_nt', '11_nee_partition_dt', '12_ure', + '99_fluxnet2015'] def argparse_from_yaml_and_cli(): param_dest = {} config_default = {} @@ -158,20 +152,20 @@ def export_to_yaml(self, dir=None, name=None, overwritten=False): os.remove(path) with open(path, 'w') as f: yaml.dump(saved_dict, f, default_flow_style=False) - step_name_list = ['01_qc_visual', '02_qc_auto', '04_ustar_mp', '05_ustar_cp', - '06_meteo_era', '07_meteo_proc', '08_nee_proc', '09_energy_proc', - '10_nee_partition_nt', '11_nee_partition_dt', '12_ure', - '99_fluxnet2015'] - for step in step_name_list: - v = {} - for arg in self.config_default.get(step, []): - v[arg] = self.args.__getattribute__(arg.replace('-', '_')) - if v: - path = f'{dir}/{step}_{name}' - if overwritten and os.path.isfile(path): - os.remove(path) - with open(path, 'w') as f: - yaml.dump(v, f, default_flow_style=False) + + def export_step_to_yaml(self, dir=None, name='config.yaml', overwritten=False): + step = os.path.split(dir)[-1] + if step not in STEP_NAME_LIST: + raise ONEFluxError(f"Step does not exist: {step}") + v = {} + for arg in self.config_default.get(step, []): + v[arg] = self.args.__getattribute__(arg.replace('-', '_')) + if v: + path = f'{dir}/{name}' + if overwritten and os.path.isfile(path): + os.remove(path) + with open(path, 'w') as f: + yaml.dump(v, f, default_flow_style=False) def log_msg(self): @@ -193,14 +187,9 @@ def log_msg(self): log.debug(msg) def run_check(self): - if self.args.command not in RUN_MODE: - raise ONEFluxError("Unknown command: {c}".format(c=self.args.command)) if not os.path.isdir(os.path.join(self.args.datadir, self.args.sitedir)): raise ONEFluxError("Site dir not found: {d}".format(d=self.args.sitedir)) - def get_run_mode_func(self): - return RUN_MODE[self.args.command] - def _type_from_str(s): s_dict = {'str': str, 'int': int, 'float': float, 'bool': bool} return s_dict[s] diff --git a/oneflux/pipeline/wrappers.py b/oneflux/pipeline/wrappers.py index ca273b3a..3b8ac0b1 100644 --- a/oneflux/pipeline/wrappers.py +++ b/oneflux/pipeline/wrappers.py @@ -54,10 +54,13 @@ class Pipeline(object): VALIDATE_ON_CREATE = False SIMULATION = False - def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *args, **kwargs): + def __init__(self, config_obj): ''' Initializes pipeline execution object, including initialization tests (e.g., directories and initial datasets tests) ''' + self.config_obj = config_obj + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + log.info("ONEFlux Pipeline: initialization started") self.run_id = socket.getfqdn() + '_run' + timestamp @@ -68,10 +71,11 @@ def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *a ### basic checks # extra configs - if args: - log.warning("ONEFlux Pipeline: non-keyword arguments provided, ignoring: {a}".format(a=args)) + kwargs = self.config_obj.get_pipeline_params() log.debug("ONEFlux Pipeline: keyword arguments: {a}".format(a=kwargs)) self.configs = kwargs + siteid = self.configs.get('siteid') + self.configs['datadir'] = os.path.abspath(os.path.join(self.configs['datadir'], self.configs['sitedir'])) # check valid config attribute labels from defaults from classes self.driver_classes = [PipelineFPCreator, @@ -96,9 +100,9 @@ def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *a self.valid_attribute_labels = ['data_dir', 'tool_dir', 'data_dir_main', 'prod_to_compare', 'perc_to_compare', 'first_year', 'last_year'] for driver in self.driver_classes: - labels = [k.lower() for k, v in driver.__dict__.iteritems() if ((not callable(v)) and (not k.startswith('_')))] + labels = [k.lower() for k, v in driver.__dict__.items() if ((not callable(v)) and (not k.startswith('_')))] self.valid_attribute_labels.extend(labels) - labels = [k.lower() for k, v in Pipeline.__dict__.iteritems() if ((not callable(v)) and (not k.startswith('_')))] + labels = [k.lower() for k, v in Pipeline.__dict__.items() if ((not callable(v)) and (not k.startswith('_')))] self.valid_attribute_labels.extend(labels) for k in self.configs.keys(): if k not in self.valid_attribute_labels: @@ -106,7 +110,7 @@ def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *a self.first_year = self.configs.get('first_year', None) self.last_year = self.configs.get('last_year', None) self.data_dir_main = self.configs.get('data_dir_main', None) - self.site_dir = self.configs.get('site_dir', None) + self.site_dir = self.configs.get('sitedir', None) # check OS if (os.name != 'posix') and (os.name != 'nt'): @@ -126,7 +130,7 @@ def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *a log.debug("ONEFlux Pipeline: setting up for site '{v}'".format(v=self.siteid)) # main data directory (all sites) - self.data_dir = self.configs.get('data_dir', os.path.join(DATA_DIR, self.siteid)) # TODO: default should be self.site_dir? + self.data_dir = self.configs.get('datadir', os.path.join(DATA_DIR, self.siteid)) # TODO: default should be self.site_dir? log.debug("ONEFlux Pipeline: using data dir '{v}'".format(v=self.data_dir)) self.prodfile_template = os.path.join(self.data_dir, FLUXNET_PRODUCT_CLASS.FLUXNET2015_DIR, PRODFILE_TEMPLATE_F) @@ -384,6 +388,7 @@ def __init__(self, pipeline): self.qc_visual_dir_inner = self.pipeline.configs.get('qc_visual_files_dir', os.path.join(self.qc_visual_dir, self.QC_VISUAL_DIR_INNER)) self.output_file_pattern = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.output_file_patterns_inner = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_INNER] + self.pipeline.config_obj.export_step_to_yaml(self.qc_visual_dir) def pre_validate(self): ''' @@ -454,6 +459,7 @@ def __init__(self, pipeline): self.output_log = os.path.join(self.qc_auto_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -input_path="{i}" -output_path=. -ustar -graph -nee -energy -meteo -solar > "{log}"' self.cmd = self.cmd_txt.format(c=self.qc_auto_ex, i=self.input_qc_visual_dir, o=self.qc_auto_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP) + self.pipeline.config_obj.export_step_to_yaml(self.qc_auto_dir) def pre_validate(self): ''' @@ -525,6 +531,7 @@ def __init__(self, pipeline): self.qc_auto_convert_original = self._QC_AUTO_CONVERT_ORIGINAL self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.qc_auto_convert_files_to_convert = [] + def pre_validate(self): ''' @@ -672,6 +679,7 @@ def __init__(self, pipeline): self.output_log = os.path.join(self.ustar_mp_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) self.cmd_txt = 'cd "{o}" {cmd_sep} mkdir input {cmd_sep} {cp} "{i}"*_ustar_*.csv ./input/ {cmd_sep} {c} -input_path=./input/ -output_path=./ > "{log}"' self.cmd = self.cmd_txt.format(c=self.ustar_mp_ex, i=self.input_qc_auto_dir, o=self.ustar_mp_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP, cp=COPY) + self.pipeline.config_obj.export_step_to_yaml(self.ustar_mp_dir) def pre_validate(self): ''' @@ -760,6 +768,7 @@ def __init__(self, pipeline): cmd_sep=CMD_SEP, cp=COPY, dl=DELETE) + self.pipeline.config_obj.export_step_to_yaml(self.ustar_cp_dir) def pre_validate(self): ''' @@ -887,6 +896,7 @@ def __init__(self, pipeline): self.meteo_era_dir = self.pipeline.configs.get('meteo_era_dir', os.path.join(self.pipeline.data_dir, self.METEO_ERA_DIR)) self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.output_file_patterns_extra = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_EXTRA] + self.pipeline.config_obj.export_step_to_yaml(self.meteo_era_dir) def pre_validate(self): ''' @@ -1162,6 +1172,7 @@ def __init__(self, pipeline): q=self.input_qc_auto_dir, e=self.input_meteo_era_dir, log=self.output_log) + self.pipeline.config_obj.export_step_to_yaml(self.meteo_proc_dir) def pre_validate(self): ''' @@ -1276,6 +1287,7 @@ def __init__(self, pipeline): ucp=self.input_ustar_cp_dir, m=self.input_meteo_proc_dir, log=self.output_log) + self.pipeline.config_obj.export_step_to_yaml(self.nee_proc_dir) def pre_validate(self): ''' @@ -1372,6 +1384,7 @@ def __init__(self, pipeline): self.cmd_execute = self.cmd_execute_txt.format(cmd_sep=CMD_SEP, o=self.output_energy_proc_dir, c=os.path.basename(self.energy_proc_ex), log=self.output_log) self.cmd_del_tool_txt = '{delete} "{o}{c}"' self.cmd_del_tool = self.cmd_del_tool_txt.format(delete=DELETE, o=self.output_energy_proc_dir, c=os.path.basename(self.energy_proc_ex)) + self.pipeline.config_obj.export_step_to_yaml(self.energy_proc_dir) def pre_validate(self): ''' @@ -1453,6 +1466,7 @@ def __init__(self, pipeline): self.output_file_patterns_c = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_C] self.prod_to_compare = self.pipeline.configs.get('prod_to_compare', PROD_TO_COMPARE) self.perc_to_compare = self.pipeline.configs.get('perc_to_compare', PERC_TO_COMPARE) + self.pipeline.config_obj.export_step_to_yaml(self.nee_partition_nt_dir) def pre_validate(self): ''' @@ -1531,6 +1545,7 @@ def __init__(self, pipeline): self.output_file_patterns_c = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_C] self.prod_to_compare = self.pipeline.configs.get('prod_to_compare', PROD_TO_COMPARE) self.perc_to_compare = self.pipeline.configs.get('perc_to_compare', PERC_TO_COMPARE) + self.pipeline.config_obj.export_step_to_yaml(self.nee_partition_dt_dir) def pre_validate(self): ''' @@ -1738,16 +1753,16 @@ def run(self): # # TODO: finish implementation for root, _, filenames in os.walk(self.pipeline.nee_partition_nt.nee_partition_nt_dir): for f in filenames: - print os.path.join(root, f) + print(os.path.join(root, f)) for root, _, filenames in os.walk(self.pipeline.nee_partition_dt.nee_partition_dt_dir): for f in filenames: - print os.path.join(root, f) + print(os.path.join(root, f)) if os.path.isdir(self.pipeline.nee_partition_sr.nee_partition_sr_dir): for root, _, filenames in os.walk(self.pipeline.nee_partition_dt.nee_partition_sr_dir): for f in filenames: - print os.path.join(root, f) + print(os.path.join(root, f)) # execute prepare_ure step if not self.execute and not self.pipeline.simulation: @@ -2128,6 +2143,7 @@ def __init__(self, pipeline): self.output_log = os.path.join(self.ure_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -input_path={i} -output_path={o} > "{log}"' self.cmd = self.cmd_txt.format(c=self.ure_ex, i=self.input_prepare_ure_dir, o=self.ure_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP, cp=COPY) + self.pipeline.config_obj.export_step_to_yaml(self.ure_dir) def pre_validate(self): ''' @@ -2224,6 +2240,7 @@ def __init__(self, pipeline): self.zip_manifest_lines = [ZIPMANIFEST_HEADER, ] self.csv_manifest_file = os.path.join(self.fluxnet2015_dir, OUTPUT_LOG_TEMPLATE.format(t='CSV_' + self.pipeline.run_id)[:-4] + '.csv') self.zip_manifest_file = os.path.join(self.fluxnet2015_dir, OUTPUT_LOG_TEMPLATE.format(t='ZIP_' + self.pipeline.run_id)[:-4] + '.csv') + self.pipeline.config_obj.export_step_to_yaml(self.fluxnet2015_dir) def pre_validate(self): ''' @@ -2370,6 +2387,7 @@ def __init__(self, pipeline): self.zip_manifest_lines = [ZIPMANIFEST_HEADER, ] self.csv_manifest_file = os.path.join(self.fluxnet2015_dir, OUTPUT_LOG_TEMPLATE.format(t='CSV_' + self.pipeline.run_id)[:-4] + '.csv') self.zip_manifest_file = os.path.join(self.fluxnet2015_dir, OUTPUT_LOG_TEMPLATE.format(t='ZIP_' + self.pipeline.run_id)[:-4] + '.csv') + self.pipeline.config_obj.export_step_to_yaml(self.fluxnet2015_dir) def pre_validate(self): ''' diff --git a/oneflux/tools/pipeline.py b/oneflux/tools/pipeline.py index 436d12cb..9b2127f6 100644 --- a/oneflux/tools/pipeline.py +++ b/oneflux/tools/pipeline.py @@ -27,22 +27,11 @@ log = logging.getLogger(__name__) -def run_pipeline(datadir, - siteid, - sitedir, - firstyear, - lastyear, - version_data=VERSION_METADATA, - version_proc=VERSION_PROCESSING, - prod_to_compare=PROD_TO_COMPARE, - perc_to_compare=PERC_TO_COMPARE, - mcr_directory=None, - timestamp=NOW_TS, - record_interval='hh', - era_first_year=ERA_FIRST_YEAR, - era_last_year=ERA_LAST_YEAR, - steps={}): +def run_pipeline(config): + datadir = config.args.datadir + sitedir = config.args.sitedir + siteid = config.args.siteid sitedir_full = os.path.abspath(os.path.join(datadir, sitedir)) if not sitedir or not os.path.isdir(sitedir_full): msg = "Site directory for {s} not found: '{d}'".format(s=siteid, d=sitedir) @@ -51,37 +40,7 @@ def run_pipeline(datadir, log.info("Started processing site dir {d}".format(d=sitedir)) try: - pipeline = Pipeline(siteid=siteid, - data_dir=sitedir_full, - data_dir_main=os.path.abspath(datadir), - site_dir=sitedir, - tool_dir=TOOL_DIRECTORY, - first_year=firstyear, - last_year=lastyear, - prod_to_compare=prod_to_compare, - perc_to_compare=perc_to_compare, - timestamp=timestamp, - record_interval=record_interval, - fluxnet2015_first_t1=firstyear, - fluxnet2015_last_t1=lastyear, - fluxnet2015_version_data=version_data, - fluxnet2015_version_processing=version_proc, - ustar_cp_mcr_dir=mcr_directory, - era_first_timestamp_start=ERA_FIRST_TIMESTAMP_START_TEMPLATE.format(y=era_first_year), - era_last_timestamp_start=ERA_LAST_TIMESTAMP_START_TEMPLATE.format(y=era_last_year), - qc_auto_execute=steps.get('qc_auto_execute', True), - ustar_mp_execute=steps.get('ustar_mp_execute', True), - ustar_cp_execute=steps.get('ustar_cp_execute', True), - meteo_proc_execute=steps.get('meteo_proc_execute', True), - nee_proc_execute=steps.get('nee_proc_execute', True), - energy_proc_execute=steps.get('energy_proc_execute', True), - nee_partition_nt_execute=steps.get('nee_partition_nt_execute', True), - nee_partition_dt_execute=steps.get('nee_partition_dt_execute', True), - prepare_ure_execute=steps.get('prepare_ure_execute', True), - ure_execute=steps.get('ure_execute', True), - fluxnet2015_execute=steps.get('fluxnet2015_execute', True), - fluxnet2015_site_plots=steps.get('fluxnet2015_site_plots', True), - simulation=False) + pipeline = Pipeline(config) pipeline.run() #csv_manifest_entries, zip_manifest_entries = pipeline.fluxnet2015.csv_manifest_entries, pipeline.fluxnet2015.zip_manifest_entries log.info("Finished processing site dir {d}".format(d=sitedir_full)) diff --git a/runoneflux.py b/runoneflux.py index e8bc8a1d..28675b2f 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -16,6 +16,9 @@ from oneflux import log_config, log_trace from oneflux.configs.utils import ONEFluxConfig +from oneflux.tools.partition_nt import run_partition_nt +from oneflux.tools.partition_dt import run_partition_dt +from oneflux.tools.pipeline import run_pipeline log = logging.getLogger(__name__) # main function @@ -38,13 +41,12 @@ try: # run command log.info("Starting execution: {c}".format(c=config.args.command)) - run_mode_func = config.get_run_mode_func() if config.args.command == 'all': - run_mode_func(**config.get_pipeline_params()) + run_pipeline(config) elif config.args.command == 'partition_nt': - run_mode_func(**config.get_partition_nt_params()) + run_partition_nt(**config.get_partition_nt_params()) elif config.args.command == 'partition_dt': - run_mode_func(**config.get_partition_dt_params()) + run_partition_dt(**config.get_partition_dt_params()) log.info("Finished execution: {c}".format(c=config.args.command)) except Exception as e: From 0a1b0c12eb14f4de9b5cce2ca768359863474df2 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Thu, 22 Dec 2022 13:38:52 -0800 Subject: [PATCH 16/18] add saved config into data dir --- oneflux/configs/utils.py | 19 +++++++------------ oneflux/pipeline/wrappers.py | 2 ++ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index ece0ec82..6c14bbb9 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -126,7 +126,7 @@ def get_partition_dt_params(self): } return params - def export_to_yaml(self, dir=None, name=None, overwritten=False): + def export_to_yaml(self, dir='.', name='config.yaml', replaced_name=None): saved_dict = {} args_dict = vars(self.args) for group_name, params in self.config_default.items(): @@ -140,20 +140,17 @@ def export_to_yaml(self, dir=None, name=None, overwritten=False): p = self.param_dest[param] if p in args_dict and args_dict[p] != param_value: saved_dict[group_name][param] = args_dict[p] - if not name: - name = 'run_params_{}.yaml'.format(dt.datetime.now().strftime('%d-%m-%Y_%H-%M-%S')) - if not dir: - path = name - else: - path = f'{dir}/{name}' + + path = f'{dir}/{name}' + renamed_path = f'{dir}/{replaced_name}' if dir and not os.path.exists(dir): os.makedirs(dir) - if overwritten and os.path.isfile(path): - os.remove(path) + if os.path.isfile(path): + os.rename(path, renamed_path) with open(path, 'w') as f: yaml.dump(saved_dict, f, default_flow_style=False) - def export_step_to_yaml(self, dir=None, name='config.yaml', overwritten=False): + def export_step_to_yaml(self, dir=None, name='config.yaml'): step = os.path.split(dir)[-1] if step not in STEP_NAME_LIST: raise ONEFluxError(f"Step does not exist: {step}") @@ -162,8 +159,6 @@ def export_step_to_yaml(self, dir=None, name='config.yaml', overwritten=False): v[arg] = self.args.__getattribute__(arg.replace('-', '_')) if v: path = f'{dir}/{name}' - if overwritten and os.path.isfile(path): - os.remove(path) with open(path, 'w') as f: yaml.dump(v, f, default_flow_style=False) diff --git a/oneflux/pipeline/wrappers.py b/oneflux/pipeline/wrappers.py index 3b8ac0b1..6cc93910 100644 --- a/oneflux/pipeline/wrappers.py +++ b/oneflux/pipeline/wrappers.py @@ -77,6 +77,8 @@ def __init__(self, config_obj): siteid = self.configs.get('siteid') self.configs['datadir'] = os.path.abspath(os.path.join(self.configs['datadir'], self.configs['sitedir'])) + self.config_obj.export_to_yaml(dir=self.configs['datadir'], replaced_name=f'config_{self.run_id}.yaml') + # check valid config attribute labels from defaults from classes self.driver_classes = [PipelineFPCreator, PipelineQCVisual, From 98f2477f35e5c93d076dff0ee9ed44fddd327ba7 Mon Sep 17 00:00:00 2001 From: toanngosy Date: Fri, 6 Jan 2023 09:05:32 -0800 Subject: [PATCH 17/18] save configs into long and short form --- oneflux/configs/utils.py | 30 ++++++++++++++++++++---------- oneflux/pipeline/wrappers.py | 29 ++++++++++++++++------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 6c14bbb9..361fca84 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -126,20 +126,30 @@ def get_partition_dt_params(self): } return params - def export_to_yaml(self, dir='.', name='config.yaml', replaced_name=None): + def export_to_yaml(self, dir='.', name='config.yaml', replaced_name=None, is_compact=False): saved_dict = {} args_dict = vars(self.args) - for group_name, params in self.config_default.items(): - saved_dict[group_name] = {} - for param, param_value in params.items(): - if param not in self.param_dest: - p = param - if p in args_dict and args_dict[p] != param_value: - saved_dict[group_name][p] = args_dict[p] - else: - p = self.param_dest[param] + if is_compact: + for group_name, params in self.config_default.items(): + saved_dict[group_name] = {} + for param, param_value in params.items(): + if param not in self.param_dest: + p = param + else: + p = self.param_dest[param] + p = p.replace('-', '_') if p in args_dict and args_dict[p] != param_value: saved_dict[group_name][param] = args_dict[p] + else: + for group_name, params in self.config_default.items(): + saved_dict[group_name] = {} + for param, param_value in params.items(): + if param not in self.param_dest: + p = param + else: + p = self.param_dest[param] + p = p.replace('-', '_') + saved_dict[group_name][param] = args_dict[p] path = f'{dir}/{name}' renamed_path = f'{dir}/{replaced_name}' diff --git a/oneflux/pipeline/wrappers.py b/oneflux/pipeline/wrappers.py index 6cc93910..b2f7d300 100644 --- a/oneflux/pipeline/wrappers.py +++ b/oneflux/pipeline/wrappers.py @@ -42,7 +42,7 @@ from oneflux.tools.partition_dt import run_partition_dt DEFAULT_LOGGING_FILENAME = 'report_{s}_{h}_{t}.log'.format(h=HOSTNAME, t=NOW_TS, s='{s}') - +DEFAULT_CONFIG_FILE_NAME = 'config_{s}_{h}_{t}.yaml'.format(h=HOSTNAME, t=NOW_TS, s='{s}') log = logging.getLogger(__name__) @@ -75,9 +75,12 @@ def __init__(self, config_obj): log.debug("ONEFlux Pipeline: keyword arguments: {a}".format(a=kwargs)) self.configs = kwargs siteid = self.configs.get('siteid') + self.configs['data_dir_main'] = os.path.abspath(self.configs['datadir']) self.configs['datadir'] = os.path.abspath(os.path.join(self.configs['datadir'], self.configs['sitedir'])) - - self.config_obj.export_to_yaml(dir=self.configs['datadir'], replaced_name=f'config_{self.run_id}.yaml') + # export compact form + self.config_obj.export_to_yaml(dir=self.configs['datadir'], name=f'compact_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}', is_compact=True) + # export long form + self.config_obj.export_to_yaml(dir=self.configs['datadir'], name=f'full_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}') # check valid config attribute labels from defaults from classes self.driver_classes = [PipelineFPCreator, @@ -100,7 +103,7 @@ def __init__(self, config_obj): FLUXNET_PRODUCT_CLASS, ] - self.valid_attribute_labels = ['data_dir', 'tool_dir', 'data_dir_main', 'prod_to_compare', 'perc_to_compare', 'first_year', 'last_year'] + self.valid_attribute_labels = ['data_dir', 'tool_dir', 'data_dir_main', 'prod_to_compare', 'perc_to_compare', 'firstyear', 'lastyear'] for driver in self.driver_classes: labels = [k.lower() for k, v in driver.__dict__.items() if ((not callable(v)) and (not k.startswith('_')))] self.valid_attribute_labels.extend(labels) @@ -108,9 +111,9 @@ def __init__(self, config_obj): self.valid_attribute_labels.extend(labels) for k in self.configs.keys(): if k not in self.valid_attribute_labels: - log.error("Pipeline: unknown config attribute: '{p}'".format(p=k)) - self.first_year = self.configs.get('first_year', None) - self.last_year = self.configs.get('last_year', None) + log.warning("Pipeline: unknown config attribute: '{p}'".format(p=k)) + self.first_year = self.configs.get('firstyear', None) + self.last_year = self.configs.get('lastyear', None) self.data_dir_main = self.configs.get('data_dir_main', None) self.site_dir = self.configs.get('sitedir', None) @@ -458,7 +461,7 @@ def __init__(self, pipeline): self.qc_auto_dir_fmt = self.qc_auto_dir + os.sep self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.input_qc_visual_dir = '..' + os.sep + os.path.basename(self.pipeline.qc_visual.qc_visual_dir) + os.sep + os.path.basename(self.pipeline.qc_visual.qc_visual_dir_inner) + os.sep - self.output_log = os.path.join(self.qc_auto_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.qc_auto_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -input_path="{i}" -output_path=. -ustar -graph -nee -energy -meteo -solar > "{log}"' self.cmd = self.cmd_txt.format(c=self.qc_auto_ex, i=self.input_qc_visual_dir, o=self.qc_auto_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP) self.pipeline.config_obj.export_step_to_yaml(self.qc_auto_dir) @@ -678,7 +681,7 @@ def __init__(self, pipeline): self.ustar_mp_dir_fmt = self.ustar_mp_dir + os.sep self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.input_qc_auto_dir = self.pipeline.qc_auto.qc_auto_dir + os.sep - self.output_log = os.path.join(self.ustar_mp_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.ustar_mp_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = 'cd "{o}" {cmd_sep} mkdir input {cmd_sep} {cp} "{i}"*_ustar_*.csv ./input/ {cmd_sep} {c} -input_path=./input/ -output_path=./ > "{log}"' self.cmd = self.cmd_txt.format(c=self.ustar_mp_ex, i=self.input_qc_auto_dir, o=self.ustar_mp_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP, cp=COPY) self.pipeline.config_obj.export_step_to_yaml(self.ustar_mp_dir) @@ -752,7 +755,7 @@ def __init__(self, pipeline): self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS] self.input_qc_auto_dir = self.pipeline.qc_auto.qc_auto_dir + os.sep self.ustar_cp_mcr_dir = self.pipeline.configs.get('ustar_cp_mcr_dir', self.USTAR_CP_MCR_DIR) - self.output_log = os.path.join(self.ustar_cp_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.ustar_cp_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = '' self.cmd_txt += 'cd "{o}"' + ' {cmd_sep} ' self.cmd_txt += 'mkdir input' + ' {cmd_sep} ' @@ -1166,7 +1169,7 @@ def __init__(self, pipeline): self.input_meteo_era_dir = '..' + os.sep + os.path.basename(self.pipeline.meteo_era.meteo_era_dir) + os.sep self.input_qc_auto_dir = '..' + os.sep + os.path.basename(self.pipeline.qc_auto.qc_auto_dir) + os.sep self.output_meteo_proc_dir = self.meteo_proc_dir + os.sep - self.output_log = os.path.join(self.meteo_proc_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.meteo_proc_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -qc_auto_path="{q}" -era_path="{e}" -output_path=. > "{log}"' self.cmd = self.cmd_txt.format(o=self.output_meteo_proc_dir, cmd_sep=CMD_SEP, @@ -1279,7 +1282,7 @@ def __init__(self, pipeline): self.input_ustar_cp_dir = '..' + os.sep + os.path.basename(self.pipeline.ustar_cp.ustar_cp_dir) + os.sep self.input_meteo_proc_dir = '..' + os.sep + os.path.basename(self.pipeline.meteo_proc.meteo_proc_dir) + os.sep self.output_nee_proc_dir = self.nee_proc_dir + os.sep - self.output_log = os.path.join(self.nee_proc_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.nee_proc_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -qc_auto_path="{q}" -ustar_mp_path="{ump}" -ustar_cp_path="{ucp}" -meteo_path="{m}" -output_path=. > "{log}"' self.cmd = self.cmd_txt.format(o=self.output_nee_proc_dir, cmd_sep=CMD_SEP, @@ -2142,7 +2145,7 @@ def __init__(self, pipeline): self.output_file_patterns_info = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_INFO] self.output_file_patterns_mef = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_MEF] self.input_prepare_ure_dir = self.pipeline.prepare_ure.prepare_ure_dir_fmt - self.output_log = os.path.join(self.ure_dir, 'report_{t}.txt'.format(t=self.pipeline.run_id)) + self.output_log = os.path.join(self.ure_dir, 'report_{s}_{t}.txt'.format(t=self.pipeline.run_id, s=self.pipeline.siteid)) self.cmd_txt = 'cd "{o}" {cmd_sep} {c} -input_path={i} -output_path={o} > "{log}"' self.cmd = self.cmd_txt.format(c=self.ure_ex, i=self.input_prepare_ure_dir, o=self.ure_dir_fmt, log=self.output_log, cmd_sep=CMD_SEP, cp=COPY) self.pipeline.config_obj.export_step_to_yaml(self.ure_dir) From 6f01085c0bc6424f17c87d9cc792bec26554298c Mon Sep 17 00:00:00 2001 From: toanngosy Date: Thu, 19 Jan 2023 10:23:37 -0800 Subject: [PATCH 18/18] update config pipeline --- oneflux/configs/config_description.yaml | 4 +-- oneflux/configs/config_template.yaml | 2 +- oneflux/configs/utils.py | 35 ++++++++++++++++--------- oneflux/pipeline/wrappers.py | 28 ++++++++++---------- runoneflux.py | 1 - 5 files changed, 39 insertions(+), 31 deletions(-) diff --git a/oneflux/configs/config_description.yaml b/oneflux/configs/config_description.yaml index f2a998f2..e8634eb6 100644 --- a/oneflux/configs/config_description.yaml +++ b/oneflux/configs/config_description.yaml @@ -89,8 +89,8 @@ steps: ustar_mp_execute: help: enable ustar mp execution type: bool - user_cp_execute: - help: enable user cp execution + ustar_cp_execute: + help: enable ustar cp execution type: bool meteo_proc_execute: help: enable meteo proc execution diff --git a/oneflux/configs/config_template.yaml b/oneflux/configs/config_template.yaml index 2342c8cc..83785674 100644 --- a/oneflux/configs/config_template.yaml +++ b/oneflux/configs/config_template.yaml @@ -21,7 +21,7 @@ run: steps: qc_auto_execute: true ustar_mp_execute: true - user_cp_execute: true + ustar_cp_execute: true meteo_proc_execute: true nee_proc_execute: true energy_proc_execute: true diff --git a/oneflux/configs/utils.py b/oneflux/configs/utils.py index 361fca84..294719ff 100644 --- a/oneflux/configs/utils.py +++ b/oneflux/configs/utils.py @@ -7,6 +7,8 @@ import yaml from oneflux import ONEFluxError +from oneflux.pipeline.common import TOOL_DIRECTORY, \ + ERA_FIRST_TIMESTAMP_START_TEMPLATE, ERA_LAST_TIMESTAMP_START_TEMPLATE from oneflux.tools.partition_nt import PROD_TO_COMPARE, PERC_TO_COMPARE log = logging.getLogger(__name__) @@ -17,6 +19,7 @@ '06_meteo_era', '07_meteo_proc', '08_nee_proc', '09_energy_proc', '10_nee_partition_nt', '11_nee_partition_dt', '12_ure', '99_fluxnet2015'] + def argparse_from_yaml_and_cli(): param_dest = {} config_default = {} @@ -69,25 +72,31 @@ def __init__(self): self.prod = (PROD_TO_COMPARE if self.args.prod is None else self.args.prod[0]) def get_pipeline_params(self): + if self.args.timestamp: + timestamp = self.args.timestamp + else: + timestamp = dt.datetime.now().strftime("%Y%m%d%H%M%S") params = { - 'datadir': self.args.datadir, - 'siteid': self.args.siteid, - 'sitedir': self.args.sitedir, - 'firstyear': self.args.firstyear, - 'lastyear': self.args.lastyear, + 'data_dir': os.path.abspath(os.path.join(self.args.datadir, self.args.sitedir)), + 'data_dir_main': os.path.abspath(self.args.datadir), + 'site_dir': self.args.sitedir, + 'tool_dir': TOOL_DIRECTORY, + 'first_year': self.args.firstyear, + 'last_year': self.args.lastyear, 'prod_to_compare': self.prod, 'perc_to_compare': self.perc, - 'mcr_directory': self.args.mcr_directory, - 'timestamp': self.args.timestamp, + 'ustar_cp_mcr_dir': self.args.mcr_directory, 'record_interval': self.args.recint, - 'version_data': self.args.versiond, - 'version_proc': self.args.versionp, - 'era_first_year': self.args.erafy, - 'era_last_year': self.args.eraly, + 'fluxnet2015_first_t1': self.args.firstyear, + 'fluxnet2015_last_t1': self.args.lastyear, + 'fluxnet2015_version_data': self.args.versiond, + 'fluxnet2015_version_processing': self.args.versionp, + 'era_first_timestamp_start': ERA_FIRST_TIMESTAMP_START_TEMPLATE.format(y=self.args.erafy), + 'era_last_timestamp_start': ERA_LAST_TIMESTAMP_START_TEMPLATE.format(y=self.args.eraly), 'steps': { 'qc_auto_execute': self.args.qc_auto_execute, 'ustar_mp_execute': self.args.ustar_mp_execute, - 'user_cp_execute': self.args.user_cp_execute, + 'ustar_cp_execute': self.args.ustar_cp_execute, 'meteo_proc_execute': self.args.meteo_proc_execute, 'nee_proc_execute': self.args.nee_proc_execute, 'energy_proc_execute': self.args.energy_proc_execute, @@ -100,7 +109,7 @@ def get_pipeline_params(self): 'simulation': self.args.simulation } } - return params + return self.args.siteid, timestamp, params def get_partition_nt_params(self): params = { diff --git a/oneflux/pipeline/wrappers.py b/oneflux/pipeline/wrappers.py index b2f7d300..e5f1e368 100644 --- a/oneflux/pipeline/wrappers.py +++ b/oneflux/pipeline/wrappers.py @@ -59,7 +59,7 @@ def __init__(self, config_obj): Initializes pipeline execution object, including initialization tests (e.g., directories and initial datasets tests) ''' self.config_obj = config_obj - timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + siteid, timestamp, kwargs = self.config_obj.get_pipeline_params() log.info("ONEFlux Pipeline: initialization started") self.run_id = socket.getfqdn() + '_run' + timestamp @@ -71,16 +71,12 @@ def __init__(self, config_obj): ### basic checks # extra configs - kwargs = self.config_obj.get_pipeline_params() log.debug("ONEFlux Pipeline: keyword arguments: {a}".format(a=kwargs)) self.configs = kwargs - siteid = self.configs.get('siteid') - self.configs['data_dir_main'] = os.path.abspath(self.configs['datadir']) - self.configs['datadir'] = os.path.abspath(os.path.join(self.configs['datadir'], self.configs['sitedir'])) # export compact form - self.config_obj.export_to_yaml(dir=self.configs['datadir'], name=f'compact_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}', is_compact=True) + self.config_obj.export_to_yaml(dir=self.configs['data_dir'], name=f'compact_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}', is_compact=True) # export long form - self.config_obj.export_to_yaml(dir=self.configs['datadir'], name=f'full_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}') + self.config_obj.export_to_yaml(dir=self.configs['data_dir'], name=f'full_{DEFAULT_CONFIG_FILE_NAME.format(s=siteid)}') # check valid config attribute labels from defaults from classes self.driver_classes = [PipelineFPCreator, @@ -103,19 +99,23 @@ def __init__(self, config_obj): FLUXNET_PRODUCT_CLASS, ] - self.valid_attribute_labels = ['data_dir', 'tool_dir', 'data_dir_main', 'prod_to_compare', 'perc_to_compare', 'firstyear', 'lastyear'] + self.valid_attribute_labels = ['data_dir', 'tool_dir', 'site_dir', 'data_dir_main', 'prod_to_compare', 'perc_to_compare', 'first_year', 'last_year'] for driver in self.driver_classes: labels = [k.lower() for k, v in driver.__dict__.items() if ((not callable(v)) and (not k.startswith('_')))] self.valid_attribute_labels.extend(labels) labels = [k.lower() for k, v in Pipeline.__dict__.items() if ((not callable(v)) and (not k.startswith('_')))] self.valid_attribute_labels.extend(labels) for k in self.configs.keys(): - if k not in self.valid_attribute_labels: - log.warning("Pipeline: unknown config attribute: '{p}'".format(p=k)) - self.first_year = self.configs.get('firstyear', None) - self.last_year = self.configs.get('lastyear', None) + if k == 'steps': + for sk in self.configs[k]: + if sk not in self.valid_attribute_labels: + log.error("Pipeline: unknown config attribute: '{p}'".format(p=sk)) + elif k not in self.valid_attribute_labels: + log.error("Pipeline: unknown config attribute: '{p}'".format(p=k)) + self.first_year = self.configs.get('first_year', None) + self.last_year = self.configs.get('last_year', None) self.data_dir_main = self.configs.get('data_dir_main', None) - self.site_dir = self.configs.get('sitedir', None) + self.site_dir = self.configs.get('site_dir', None) # check OS if (os.name != 'posix') and (os.name != 'nt'): @@ -135,7 +135,7 @@ def __init__(self, config_obj): log.debug("ONEFlux Pipeline: setting up for site '{v}'".format(v=self.siteid)) # main data directory (all sites) - self.data_dir = self.configs.get('datadir', os.path.join(DATA_DIR, self.siteid)) # TODO: default should be self.site_dir? + self.data_dir = self.configs.get('data_dir', os.path.join(DATA_DIR, self.siteid)) # TODO: default should be self.site_dir? log.debug("ONEFlux Pipeline: using data dir '{v}'".format(v=self.data_dir)) self.prodfile_template = os.path.join(self.data_dir, FLUXNET_PRODUCT_CLASS.FLUXNET2015_DIR, PRODFILE_TEMPLATE_F) diff --git a/runoneflux.py b/runoneflux.py index 28675b2f..c63c3312 100644 --- a/runoneflux.py +++ b/runoneflux.py @@ -36,7 +36,6 @@ # set defaults if no perc or prod config.log_msg() config.run_check() - config.export_to_yaml() # start execution try: # run command