Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate downscaling code into main ONEFlux pipeline #72

Merged
merged 11 commits into from
Nov 13, 2024
35 changes: 34 additions & 1 deletion oneflux/pipeline/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
HOME_DIRECTORY = os.path.expanduser('~')
TOOL_DIRECTORY = os.path.join(HOME_DIRECTORY, 'bin', 'oneflux')
MCR_DIRECTORY = os.path.join(HOME_DIRECTORY, 'bin', 'MATLAB_Compiler_Runtime', 'v717')
WORKING_DIRECTORY = os.path.join(HOME_DIRECTORY, 'data', 'fluxnet', 'FLUXNET2015')
ERA_SOURCE_DIRECTORY = os.path.join(HOME_DIRECTORY, 'data', 'ERA')
WORKING_DIRECTORY = os.path.join(HOME_DIRECTORY, 'data', 'fluxnet', 'FLUXNET')
WORKING_DIRECTORY_SITE = os.path.join(WORKING_DIRECTORY, '{sd}')
QCDIR = os.path.join(WORKING_DIRECTORY_SITE, "01_qc_visual", "qcv_files") # NEW FOR APRIL2016
MPDIR = os.path.join(WORKING_DIRECTORY_SITE, "04_ustar_mp")
Expand Down Expand Up @@ -446,6 +447,38 @@ def create_and_empty_dir(tdir, label, suffix=datetime.now().strftime("%Y%m%d%H%M
log.debug("Created '{d}'".format(d=tdir))
return True

def copy_files_pattern(src_dir, tgt_dir, file_pattern='*', label='common.copy_files_pattern', simulation=False):
"""
Copy all files matching pattern inside src_dir into tgt_dir (non-recursive)

:param src_dir: Source directory for files to be copied from
:type src_dir: str
:param tgt_dir: Target directory for files to be copied into
:type tgt_dir: str
:param file_pattern: Filename pattern to be matched, defaults to '*'
:type file_pattern: str, optional
:param simulation: If true, copies files from source into target, defaults to False
:type simulation: bool, optional
"""
if (not os.path.isdir(src_dir)) or (not os.access(src_dir, os.R_OK)):
msg = "Cannot read from pipeline {l} directory '{d}'".format(l=label, d=src_dir)
log.critical(msg)
raise ONEFluxPipelineError(msg)
if (not os.path.isdir(tgt_dir)) or (not os.access(tgt_dir, os.W_OK)):
msg = "Cannot write to pipeline {l} directory '{d}'".format(l=label, d=tgt_dir)
log.critical(msg)
raise ONEFluxPipelineError(msg)
filelist = test_pattern(tdir=src_dir, tpattern=file_pattern, label='Copy files from {s} to {t}'.format(s=src_dir, t=tgt_dir))
log.debug('Found files in source dir ({s}): {f}'.format(s=src_dir, f=filelist))
if not simulation:
for filename in [os.path.join(src_dir, f) for f in filelist]:
if os.path.isfile(filename):
shutil.copy(filename, tgt_dir)
log.debug('Copied {f} to {t}'.format(f=filename, t=tgt_dir))
else:
log.error('Cannot copy {f} into {t}'.format(f=filename, t=tgt_dir))
return True

def get_headers(filename):
"""
Parse headers from FPFileV2 format and returns list
Expand Down
51 changes: 39 additions & 12 deletions oneflux/pipeline/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
from oneflux.pipeline.common import CSVMANIFEST_HEADER, ZIPMANIFEST_HEADER, ONEFluxPipelineError, \
run_command, test_dir, test_file, test_file_list, test_file_list_or, \
test_create_dir, create_replace_dir, create_and_empty_dir, test_pattern, \
check_headers_fluxnet2015, get_empty_array_year, \
check_headers_fluxnet2015, get_empty_array_year, copy_files_pattern,\
PRODFILE_TEMPLATE_F, PRODFILE_AUX_TEMPLATE_F, PRODFILE_YEARS_TEMPLATE_F, \
PRODFILE_FIGURE_TEMPLATE_F, ZIPFILE_TEMPLATE_F, NEE_PERC_USTAR_VUT_PATTERN, \
NEE_PERC_USTAR_CUT_PATTERN, UNC_INFO_F, UNC_INFO_ALT_F, NEE_PERC_NEE_F, \
METEO_INFO_F, NEE_INFO_F, \
HOSTNAME, NOW_TS, ERA_FIRST_TIMESTAMP_START, ERA_LAST_TIMESTAMP_START,\
MODE_ISSUER, MODE_PRODUCT
MODE_ISSUER, MODE_PRODUCT, ERA_SOURCE_DIRECTORY
from oneflux.partition.library import PARTITIONING_DT_ERROR_FILE, EXTRA_FILENAME
from oneflux.downscaling.rundownscaling import run as run_downscaling
from oneflux.partition.auxiliary import nan, nan_ext, NAN, NAN_TEST
from oneflux.partition.daytime import ONEFluxPartitionBrokenOptError
from oneflux.pipeline.site_plots import gen_site_plots
Expand Down Expand Up @@ -129,6 +130,10 @@ def __init__(self, siteid, timestamp=datetime.now().strftime("%Y%m%d%H%M%S"), *a
self.data_dir = self.configs.get('data_dir', os.path.join(DATA_DIR, self.siteid)) # TODO: default should be self.site_dir?
log.debug("ONEFlux Pipeline: using data dir '{v}'".format(v=self.data_dir))

# ERA pre-extracted, unit adjusted, data files for pixel(s) corresponding to site location
self.era_source_dir = self.configs.get('era_source_dir', os.path.join(ERA_SOURCE_DIRECTORY, self.siteid))
log.debug("ONEFlux Pipeline: using ERA dir '{v}'".format(v=self.era_source_dir))

self.prodfile_template = os.path.join(self.data_dir, FLUXNET_PRODUCT_CLASS.FLUXNET2015_DIR, PRODFILE_TEMPLATE_F)
self.prodfile_aux_template = os.path.join(self.data_dir, FLUXNET_PRODUCT_CLASS.FLUXNET2015_DIR, PRODFILE_AUX_TEMPLATE_F)
self.prodfile_years_template = os.path.join(self.data_dir, FLUXNET_PRODUCT_CLASS.FLUXNET2015_DIR, PRODFILE_YEARS_TEMPLATE_F)
Expand Down Expand Up @@ -853,16 +858,18 @@ class PipelineMeteoERA(object):

N.B.: Step dependent on external Python code to be integrated in future releases.
'''
METEO_ERA_EXECUTE = False # TODO: change default when method implemented
METEO_ERA_EXECUTE = True
METEO_ERA_DIR = "06_meteo_era"
METEO_ERA_DIR_INPUT = "reanalysis_input"
_INPUT_SOURCE_FILE_PATTERN = "{s}__ERA5__reanalysis-era5-single-levels__????__*.csv"
_OUTPUT_FILE_PATTERNS = [
"{s}_????.csv",
"stat_{s}.txt",
"stat30_{s}_nocorr.txt",
"stat30_{s}.txt",
]
_OUTPUT_FILE_PATTERNS_EXTRA = [
"{s}_????-????.nc",
"{s}_????-????.nc", # missing for some sites
"{s}_LWin_????-????.pdf", # missing for some sites
"{s}_LWin_calc_????-????.pdf", # missing for some sites
"{s}_nocorr_????.csv",
Expand All @@ -883,16 +890,25 @@ def __init__(self, pipeline):
'''
self.pipeline = pipeline
self.execute = self.pipeline.configs.get('meteo_era_execute', self.METEO_ERA_EXECUTE)
self.execute = self.METEO_ERA_EXECUTE # TODO: remove when method implemented
self.meteo_era_dir = self.pipeline.configs.get('meteo_era_dir', os.path.join(self.pipeline.data_dir, self.METEO_ERA_DIR))
self.meteo_era_input_dir = self.pipeline.configs.get('era_input_dir', os.path.join(self.meteo_era_dir, self.METEO_ERA_DIR_INPUT))
self.meteo_era_source_dir = self.pipeline.configs.get('era_source_dir', self.pipeline.era_source_dir)
self.input_source_file_pattern = self._INPUT_SOURCE_FILE_PATTERN.format(s=self.pipeline.siteid)
self.output_file_patterns = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS]
self.output_file_patterns_extra = [i.format(s=self.pipeline.siteid) for i in self._OUTPUT_FILE_PATTERNS_EXTRA]

def pre_validate(self):
'''
Validate pre-execution requirements
'''
pass
# check ERA source directory
test_dir(tdir=self.meteo_era_source_dir, label='meteo_era.pre_validate')

# check ERA input source files
test_file_list(file_list=[self.input_source_file_pattern,], tdir=self.meteo_era_source_dir, label='meteo_era.pre_validate', log_only=False)

# check dependency steps
self.pipeline.qc_auto.post_validate()

def post_validate(self):
'''
Expand All @@ -902,7 +918,7 @@ def post_validate(self):
test_dir(tdir=self.meteo_era_dir, label='meteo_era.post_validate')

# check output files and result report (log)
test_file_list(file_list=self.output_file_patterns, tdir=self.meteo_era_dir, label='meteo_era.post_validate', log_only=True)
test_file_list(file_list=self.output_file_patterns, tdir=self.meteo_era_dir, label='meteo_era.post_validate', log_only=False)
test_file_list(file_list=self.output_file_patterns_extra, tdir=self.meteo_era_dir, label='meteo_era.post_validate', log_only=True)

# test "stat_{s}.txt" file for 100.0 % missing data on variables
Expand Down Expand Up @@ -1004,8 +1020,19 @@ def run(self):
self.pre_validate()

create_replace_dir(tdir=self.meteo_era_dir, label='meteo_era.run', suffix=self.pipeline.run_id, simulation=self.pipeline.simulation)

# TODO: implement run

# copy ERA source files into meteo_era folder for running downscaling
create_replace_dir(tdir=self.meteo_era_input_dir, label='meteo_era.run', suffix=self.pipeline.run_id, simulation=self.pipeline.simulation)
copy_files_pattern(src_dir=self.meteo_era_source_dir,
tgt_dir=self.meteo_era_input_dir,
file_pattern=self.input_source_file_pattern,
label='meteo_era.run',
simulation=self.pipeline.simulation)

# run downscaling
run_downscaling(dir_era5_co=self.meteo_era_input_dir,
dir_input=self.pipeline.qc_auto.qc_auto_dir,
dir_output=self.meteo_era_dir)

self.post_validate()
log.info("Pipeline meteo_era execution finished")
Expand Down Expand Up @@ -1738,16 +1765,16 @@ def run(self):
# # TODO: finish implementation
for root, _, filenames in os.walk(self.pipeline.nee_partition_nt.nee_partition_nt_dir):
for f in filenames:
print os.path.join(root, f)
print(os.path.join(root, f))

for root, _, filenames in os.walk(self.pipeline.nee_partition_dt.nee_partition_dt_dir):
for f in filenames:
print os.path.join(root, f)
print(os.path.join(root, f))

if os.path.isdir(self.pipeline.nee_partition_sr.nee_partition_sr_dir):
for root, _, filenames in os.walk(self.pipeline.nee_partition_dt.nee_partition_sr_dir):
for f in filenames:
print os.path.join(root, f)
print(os.path.join(root, f))

# execute prepare_ure step
if not self.execute and not self.pipeline.simulation:
Expand Down
3 changes: 3 additions & 0 deletions oneflux/tools/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
def run_pipeline(datadir,
siteid,
sitedir,
era_source_dir,
firstyear,
lastyear,
version_data=VERSION_METADATA,
Expand All @@ -55,6 +56,7 @@ def run_pipeline(datadir,
data_dir=sitedir_full,
data_dir_main=os.path.abspath(datadir),
site_dir=sitedir,
era_source_dir=era_source_dir,
tool_dir=TOOL_DIRECTORY,
first_year=firstyear,
last_year=lastyear,
Expand All @@ -72,6 +74,7 @@ def run_pipeline(datadir,
qc_auto_execute=steps.get('qc_auto_execute', True),
ustar_mp_execute=steps.get('ustar_mp_execute', True),
ustar_cp_execute=steps.get('ustar_cp_execute', True),
meteo_era_execute=steps.get('meteo_era_execute', (False if era_source_dir is None else True)),
meteo_proc_execute=steps.get('meteo_proc_execute', True),
nee_proc_execute=steps.get('nee_proc_execute', True),
energy_proc_execute=steps.get('energy_proc_execute', True),
Expand Down
9 changes: 6 additions & 3 deletions runoneflux.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
parser.add_argument('--versiond', help="Version of data (hardcoded default)", type=str, dest='versiond', default=str(VERSION_METADATA))
parser.add_argument('--era-fy', help="ERA first year of data (default {y})".format(y=ERA_FIRST_YEAR), type=int, dest='erafy', default=int(ERA_FIRST_YEAR))
parser.add_argument('--era-ly', help="ERA last year of data (default {y})".format(y=ERA_LAST_YEAR), type=int, dest='eraly', default=int(ERA_LAST_YEAR))
parser.add_argument('--era-source', help="Absolute path to directory with ERA pre-extracted, unit adjusted, data files for pixel(s)", type=str, dest='erasource', default=None)
args = parser.parse_args()

# setup logging file and stdout
Expand Down Expand Up @@ -76,13 +77,15 @@
msg += ", versiond ({i})".format(i=args.versiond)
msg += ", era-fy ({i})".format(i=args.erafy)
msg += ", era-ly ({i})".format(i=args.eraly)
msg += ", era-source ({i})".format(i=args.erasource)
log.debug(msg)

# start execution
try:
# check arguments
print os.path.join(args.datadir, args.sitedir)
if not os.path.isdir(os.path.join(args.datadir, args.sitedir)):
site_dir = os.path.join(args.datadir, args.sitedir)
log.debug('Using site dir: {s}'.format(s=site_dir))
if not os.path.isdir(site_dir):
raise ONEFluxError("Site dir not found: {d}".format(d=args.sitedir))

# run command
Expand All @@ -91,7 +94,7 @@
run_pipeline(datadir=args.datadir, siteid=args.siteid, sitedir=args.sitedir, firstyear=firstyear, lastyear=lastyear,
prod_to_compare=prod, perc_to_compare=perc, mcr_directory=args.mcr_directory, timestamp=args.timestamp,
record_interval=args.recint, version_data=args.versiond, version_proc=args.versionp,
era_first_year=args.erafy, era_last_year=args.eraly)
era_first_year=args.erafy, era_last_year=args.eraly, era_source_dir=args.erasource)
elif args.command == 'partition_nt':
run_partition_nt(datadir=args.datadir, siteid=args.siteid, sitedir=args.sitedir, years_to_compare=range(firstyear, lastyear + 1),
py_remove_old=args.forcepy, prod_to_compare=prod, perc_to_compare=perc)
Expand Down