Skip to content

Commit

Permalink
Use parallel processing to speed up obs processing (#733)
Browse files Browse the repository at this point in the history
Using python multiprocessing to generate obs in parallel. The current
list of obs goes from 10+ minutes to completing in ~5.5 minutes.
  • Loading branch information
CoryMartin-NOAA authored Nov 16, 2023
1 parent 5c305aa commit 9fc4d73
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions ush/ioda/bufr2ioda/run_bufr2ioda.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#!/usr/bin/env python3
import argparse
import glob
import multiprocessing as mp
import os
import shutil
from itertools import repeat
from pathlib import Path
from gen_bufr2ioda_json import gen_bufr_json
from gen_bufr2ioda_yaml import gen_bufr_yaml
Expand All @@ -12,6 +14,24 @@
# Initialize root logger
logger = Logger('run_bufr2ioda.py', level='INFO', colored_log=True)

# get parallel processing info
num_cores = mp.cpu_count()


def mp_bufr_py(script, infile):
cmd = Executable(script)
cmd.add_default_arg('-c')
cmd.add_default_arg(infile)
logger.info(f"Executing {cmd}")
cmd()


def mp_bufr_yaml(bufr2iodaexe, yamlfile):
cmd = Executable(bufr2iodaexe)
cmd.add_default_arg(yamlfile)
logger.info(f"Executing {cmd}")
cmd()


@logit(logger)
def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):
Expand Down Expand Up @@ -44,7 +64,8 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):
BUFR_py_files = [os.path.basename(f) for f in BUFR_py_files]
BUFR_py = [f.replace('bufr2ioda_', '').replace('.py', '') for f in BUFR_py_files]

# NOTE or TODO - how to parallelize these loops????
json_files = []
scripts = []
for obtype in BUFR_py:
logger.info(f"Convert {obtype}...")
json_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.json")
Expand All @@ -54,21 +75,25 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):

# Use the converter script for the ob type
bufr2iodapy = USH_IODA + '/bufr2ioda_' + obtype + ".py"
cmd = Executable(bufr2iodapy)
cmd.add_default_arg('-c')
cmd.add_default_arg(json_output_file)
logger.info(f"Executing {cmd}")
cmd()

# append the values to the lists
json_files.append(json_output_file)
scripts.append(bufr2iodapy)

# Check if the converter was successful
# if os.path.exists(json_output_file):
# rm_p(json_output_file)

# run all python scripts in parallel
with mp.Pool(num_cores) as pool:
pool.starmap(mp_bufr_py, zip(scripts, json_files))

# Specify observation types to be processed by the bufr2ioda executable
BUFR_yaml_files = glob.glob(os.path.join(config_template_dir, '*.yaml'))
BUFR_yaml_files = [os.path.basename(f) for f in BUFR_yaml_files]
BUFR_yaml = [f.replace('bufr2ioda_', '').replace('.yaml', '') for f in BUFR_yaml_files]

yaml_files = []
for obtype in BUFR_yaml:
logger.info(f"Convert {obtype}...")
yaml_output_file = os.path.join(COM_OBS, f"{obtype}_{datetime_to_YMDH(current_cycle)}.yaml")
Expand All @@ -78,15 +103,18 @@ def bufr2ioda(current_cycle, RUN, DMPDIR, config_template_dir, COM_OBS):

# use the bufr2ioda executable for the ob type
bufr2iodaexe = BIN_GDAS + '/bufr2ioda.x'
cmd = Executable(bufr2iodaexe)
cmd.add_default_arg(yaml_output_file)
logger.info(f"Executing {cmd}")
cmd()

# append the values to the lists
yaml_files.append(yaml_output_file)

# Check if the converter was successful
# if os.path.exists(yaml_output_file):
# rm_p(yaml_output_file)

# run all bufr2ioda yamls in parallel
with mp.Pool(num_cores) as pool:
pool.starmap(mp_bufr_yaml, zip(repeat(bufr2iodaexe), yaml_files))


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert bufr dump file to ioda format')
Expand Down

0 comments on commit 9fc4d73

Please sign in to comment.