From a762f4c62f6412289243a9d5783dee8ce7dbd44b Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Thu, 3 Aug 2023 22:46:02 +0300 Subject: [PATCH 01/13] Add multiprocessing to Quidel indicator --- .../delphi_quidel_covidtest/run.py | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index b74d617d7..69213cb67 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -6,6 +6,7 @@ """ import atexit from datetime import datetime +from multiprocessing import Pool, cpu_count import time from typing import Dict, Any @@ -126,19 +127,28 @@ def run_module(params: Dict[str, Any]): for geo_res in NONPARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) geo_groups = geo_data.groupby(res_key) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - state_df = generate_sensor_for_nonparent_geo( - geo_groups, res_key, smooth=smoothers[sensor][1], - device=smoothers[sensor][0], first_date=first_date, - last_date=last_date, suffix=agegroup) + # Parallelize generate_sensor_for_nonparent_geo calls + n_cpu = min(8, cpu_count()) + with Pool(n_cpu) as pool: + pool_results = [] + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name) + pool_results.append(( + pool.apply_async( + generate_sensor_for_nonparent_geo, + args=(geo_groups, res_key, smoothers[sensor][1], smoothers[sensor][0], first_date, last_date, agegroup) + ), + sensor_name + )) + pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] + for state_df, sensor_name in pool_results: dates = create_export_csv( state_df, geo_res=geo_res, @@ -152,24 +162,33 @@ def run_module(params: Dict[str, Any]): # County/HRR/MSA level for geo_res in PARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - res_df = generate_sensor_for_parent_geo( - geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], - device=smoothers[sensor][0], first_date=first_date, - last_date=last_date, suffix=agegroup) + # Parallelize generate_sensor_for_parent_geo calls + n_cpu = min(8, cpu_count()) + with Pool(n_cpu) as pool: + pool_results = [] + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name) + pool_results.append(( + pool.apply_async( + generate_sensor_for_parent_geo, + args=(geo_groups, geo_data, res_key, smoothers[sensor][1], smoothers[sensor][0], first_date, last_date, agegroup) + ), + sensor_name + )) + pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] + for res_df, sensor_name in pool_results: dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date, - remove_null_samples=True) + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + remove_null_samples=True) if len(dates) > 0: stats.append((max(dates), len(dates))) From 915bd7cf9680ada381ddaea9a3c48aa823a36469 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Thu, 3 Aug 2023 23:19:21 +0300 Subject: [PATCH 02/13] Fix linting --- .../delphi_quidel_covidtest/run.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 69213cb67..8f90f7a05 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -124,11 +124,11 @@ def run_module(params: Dict[str, Any]): wip_signal=params["indicator"]["wip_signal"], prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) + n_cpu = min(8, cpu_count()) # for parallelization for geo_res in NONPARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) geo_groups = geo_data.groupby(res_key) # Parallelize generate_sensor_for_nonparent_geo calls - n_cpu = min(8, cpu_count()) with Pool(n_cpu) as pool: pool_results = [] for agegroup in AGE_GROUPS: @@ -143,19 +143,19 @@ def run_module(params: Dict[str, Any]): pool_results.append(( pool.apply_async( generate_sensor_for_nonparent_geo, - args=(geo_groups, res_key, smoothers[sensor][1], smoothers[sensor][0], first_date, last_date, agegroup) + args=(geo_groups, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup) ), sensor_name )) pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] for state_df, sensor_name in pool_results: - dates = create_export_csv( - state_df, - geo_res=geo_res, - sensor=sensor_name, - export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date) + dates = create_export_csv(state_df, geo_res=geo_res, + sensor=sensor_name, + export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date) if len(dates) > 0: stats.append((max(dates), len(dates))) assert geo_res == "state" # Make sure geo_groups is for state level @@ -163,7 +163,6 @@ def run_module(params: Dict[str, Any]): for geo_res in PARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) # Parallelize generate_sensor_for_parent_geo calls - n_cpu = min(8, cpu_count()) with Pool(n_cpu) as pool: pool_results = [] for agegroup in AGE_GROUPS: @@ -178,7 +177,9 @@ def run_module(params: Dict[str, Any]): pool_results.append(( pool.apply_async( generate_sensor_for_parent_geo, - args=(geo_groups, geo_data, res_key, smoothers[sensor][1], smoothers[sensor][0], first_date, last_date, agegroup) + args=(geo_groups, geo_data, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup) ), sensor_name )) From 92f51de16c1ade0ebafb07b9620ab018b1e495b3 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Thu, 3 Aug 2023 23:27:34 +0300 Subject: [PATCH 03/13] Spacing --- quidel_covidtest/delphi_quidel_covidtest/run.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 8f90f7a05..c2de93d0c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -152,10 +152,10 @@ def run_module(params: Dict[str, Any]): pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] for state_df, sensor_name in pool_results: dates = create_export_csv(state_df, geo_res=geo_res, - sensor=sensor_name, - export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date) + sensor=sensor_name, + export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date) if len(dates) > 0: stats.append((max(dates), len(dates))) assert geo_res == "state" # Make sure geo_groups is for state level @@ -186,10 +186,10 @@ def run_module(params: Dict[str, Any]): pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] for res_df, sensor_name in pool_results: dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date, - remove_null_samples=True) + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + remove_null_samples=True) if len(dates) > 0: stats.append((max(dates), len(dates))) From f2b42934dd585f9272db99cb37b233c7b2a10510 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Thu, 3 Aug 2023 23:36:03 +0300 Subject: [PATCH 04/13] suppress lint warning --- quidel_covidtest/delphi_quidel_covidtest/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index c2de93d0c..7d008c57d 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -56,7 +56,7 @@ def get_smooth_info(sensors, _SMOOTHERS): smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) return smoothers -def run_module(params: Dict[str, Any]): +def run_module(params: Dict[str, Any]): # pylint: disable=too-many-statements """Run the quidel_covidtest indicator. The `params` argument is expected to have the following structure: From bf9f6216af38c4d9b025de28231885419e1b6c0d Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Fri, 4 Aug 2023 19:42:29 +0300 Subject: [PATCH 05/13] Rework --- .../delphi_quidel_covidtest/run.py | 139 +++++++++++------- 1 file changed, 88 insertions(+), 51 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 7d008c57d..49d299d40 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -7,6 +7,7 @@ import atexit from datetime import datetime from multiprocessing import Pool, cpu_count +import os import time from typing import Dict, Any @@ -56,7 +57,46 @@ def get_smooth_info(sensors, _SMOOTHERS): smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) return smoothers -def run_module(params: Dict[str, Any]): # pylint: disable=too-many-statements +def generate_and_export_for_nonparent_geo(state_groups, res_key, smooth, device, + first_date, last_date, suffix, # generate args + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, # export args + log_filename, log_exceptions): # logger args + """Generate sensors, create export CSV then return stats.""" + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=os.getpid()) + res_df = generate_sensor_for_nonparent_geo(state_groups, res_key, smooth, device, + first_date, last_date, suffix) + dates = create_export_csv(res_df, geo_res=geo_res, + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date) + return dates + +def generate_and_export_for_parent_geo(state_groups, geo_data, res_key, smooth, device, + first_date, last_date, suffix, # generate args + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, # export args + log_filename, log_exceptions): # logger args + """Generate sensors, create export CSV then return stats.""" + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=os.getpid()) + res_df = generate_sensor_for_parent_geo(state_groups, geo_data, res_key, smooth, device, + first_date, last_date, suffix) + dates = create_export_csv(res_df, geo_res=geo_res, + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + remove_null_samples=True) + return dates + +def run_module(params: Dict[str, Any]): """Run the quidel_covidtest indicator. The `params` argument is expected to have the following structure: @@ -125,46 +165,39 @@ def run_module(params: Dict[str, Any]): # pylint: disable=too-many-statements prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) n_cpu = min(8, cpu_count()) # for parallelization - for geo_res in NONPARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - geo_groups = geo_data.groupby(res_key) - # Parallelize generate_sensor_for_nonparent_geo calls - with Pool(n_cpu) as pool: - pool_results = [] + logger.info("Parallelizing sensor generation", n_cpu=n_cpu) + with Pool(n_cpu) as pool: + pool_results = [] + for geo_res in NONPARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) + geo_groups = geo_data.groupby(res_key) for agegroup in AGE_GROUPS: for sensor in sensors: if agegroup == "total": sensor_name = sensor else: sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - pool_results.append(( + pool_results.append( pool.apply_async( - generate_sensor_for_nonparent_geo, - args=(geo_groups, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup) - ), - sensor_name - )) - pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] - for state_df, sensor_name in pool_results: - dates = create_export_csv(state_df, geo_res=geo_res, - sensor=sensor_name, - export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - assert geo_res == "state" # Make sure geo_groups is for state level - # County/HRR/MSA level - for geo_res in PARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - # Parallelize generate_sensor_for_parent_geo calls - with Pool(n_cpu) as pool: - pool_results = [] + generate_and_export_for_nonparent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) + ) + ) + assert geo_res == "state" # Make sure geo_groups is for state level + # County/HRR/MSA level + for geo_res in PARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) for agegroup in AGE_GROUPS: for sensor in sensors: if agegroup == "total": @@ -174,24 +207,28 @@ def run_module(params: Dict[str, Any]): # pylint: disable=too-many-statements logger.info("Generating signal and exporting to CSV", geo_res=geo_res, sensor=sensor_name) - pool_results.append(( + pool_results.append( pool.apply_async( - generate_sensor_for_parent_geo, - args=(geo_groups, geo_data, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup) - ), - sensor_name - )) - pool_results = [(proc.get(), sensor_name) for (proc, sensor_name) in pool_results] - for res_df, sensor_name in pool_results: - dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date, - remove_null_samples=True) - if len(dates) > 0: - stats.append((max(dates), len(dates))) + generate_and_export_for_parent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, geo_data, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) + ) + ) + pool_results = [proc.get() for proc in pool_results] + for dates in pool_results: + if len(dates) > 0: + stats.append((max(dates), len(dates))) + # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file From 4bd32ff85d581ffd48999a98e4506ec72e8ddfc6 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Fri, 4 Aug 2023 19:44:11 +0300 Subject: [PATCH 06/13] More consistent param names --- quidel_covidtest/delphi_quidel_covidtest/run.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 49d299d40..c914fffde 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -57,7 +57,7 @@ def get_smooth_info(sensors, _SMOOTHERS): smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) return smoothers -def generate_and_export_for_nonparent_geo(state_groups, res_key, smooth, device, +def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args @@ -68,7 +68,7 @@ def generate_and_export_for_nonparent_geo(state_groups, res_key, smooth, device, geo_res=geo_res, sensor=sensor_name, pid=os.getpid()) - res_df = generate_sensor_for_nonparent_geo(state_groups, res_key, smooth, device, + res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, sensor=sensor_name, export_dir=export_dir, @@ -76,7 +76,7 @@ def generate_and_export_for_nonparent_geo(state_groups, res_key, smooth, device, end_date=export_end_date) return dates -def generate_and_export_for_parent_geo(state_groups, geo_data, res_key, smooth, device, +def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args @@ -87,7 +87,7 @@ def generate_and_export_for_parent_geo(state_groups, geo_data, res_key, smooth, geo_res=geo_res, sensor=sensor_name, pid=os.getpid()) - res_df = generate_sensor_for_parent_geo(state_groups, geo_data, res_key, smooth, device, + res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, sensor=sensor_name, export_dir=export_dir, From 17aaf173620dd422f181698c9157d5d98d60d016 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Fri, 4 Aug 2023 19:46:13 +0300 Subject: [PATCH 07/13] newline --- quidel_covidtest/delphi_quidel_covidtest/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index c914fffde..db7df47cb 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -229,7 +229,6 @@ def run_module(params: Dict[str, Any]): if len(dates) > 0: stats.append((max(dates), len(dates))) - # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file update_cache_file(df, _end_date, cache_dir) From 62a6b23e3f3b52f6b5f04bd43078a50e7c09d5c6 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Fri, 4 Aug 2023 19:57:18 +0300 Subject: [PATCH 08/13] remove extra logging --- quidel_covidtest/delphi_quidel_covidtest/run.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index db7df47cb..d88e75870 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -204,9 +204,6 @@ def run_module(params: Dict[str, Any]): sensor_name = sensor else: sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) pool_results.append( pool.apply_async( generate_and_export_for_parent_geo, From 4689206e5bf924f719ca9618a126850c717138b7 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Mon, 7 Aug 2023 13:59:32 +0300 Subject: [PATCH 09/13] Review tweaks --- .../delphi_quidel_covidtest/run.py | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index d88e75870..9af3f20d6 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -6,7 +6,8 @@ """ import atexit from datetime import datetime -from multiprocessing import Pool, cpu_count +import multiprocessing +from multiprocessing import Manager, Pool, cpu_count import os import time from typing import Dict, Any @@ -61,13 +62,15 @@ def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args - log_filename, log_exceptions): # logger args + lock, log_filename, log_exceptions): # logger args """Generate sensors, create export CSV then return stats.""" - logger = get_structured_logger(__name__, log_filename, log_exceptions) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name, - pid=os.getpid()) + # logger cannot be passed to child processes, so has to be recreated + with lock: + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=multiprocessing.current_process().pid) res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, @@ -80,20 +83,22 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args - log_filename, log_exceptions): # logger args + lock, log_filename, log_exceptions): # logger args """Generate sensors, create export CSV then return stats.""" - logger = get_structured_logger(__name__, log_filename, log_exceptions) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name, - pid=os.getpid()) + # logger cannot be passed to child processes, so has to be recreated + with lock: + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=multiprocessing.current_process().pid) res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, sensor=sensor_name, export_dir=export_dir, start_date=export_start_date, end_date=export_end_date, - remove_null_samples=True) + remove_null_samples=True) # for parent geo, remove null sample size return dates def run_module(params: Dict[str, Any]): @@ -165,6 +170,7 @@ def run_module(params: Dict[str, Any]): prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) n_cpu = min(8, cpu_count()) # for parallelization + lock = Manager().Lock() # for using loggers in multiple threads logger.info("Parallelizing sensor generation", n_cpu=n_cpu) with Pool(n_cpu) as pool: pool_results = [] @@ -189,6 +195,7 @@ def run_module(params: Dict[str, Any]): geo_res, sensor_name, export_dir, export_start_date, export_end_date, # logger params + lock, params["common"].get("log_filename"), params["common"].get("log_exceptions", True) ) @@ -197,7 +204,7 @@ def run_module(params: Dict[str, Any]): assert geo_res == "state" # Make sure geo_groups is for state level # County/HRR/MSA level for geo_res in PARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) + geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups for agegroup in AGE_GROUPS: for sensor in sensors: if agegroup == "total": @@ -216,6 +223,7 @@ def run_module(params: Dict[str, Any]): geo_res, sensor_name, export_dir, export_start_date, export_end_date, # logger params + lock, params["common"].get("log_filename"), params["common"].get("log_exceptions", True) ) From 20abd8c8873159f30759021bb77bc2068aabc814 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Mon, 7 Aug 2023 14:01:00 +0300 Subject: [PATCH 10/13] Better imports --- quidel_covidtest/delphi_quidel_covidtest/run.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 9af3f20d6..a502bfd9a 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -6,8 +6,7 @@ """ import atexit from datetime import datetime -import multiprocessing -from multiprocessing import Manager, Pool, cpu_count +from multiprocessing import Manager, Pool, cpu_count, current_process import os import time from typing import Dict, Any @@ -70,7 +69,7 @@ def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, logger.info("Generating signal and exporting to CSV", geo_res=geo_res, sensor=sensor_name, - pid=multiprocessing.current_process().pid) + pid=current_process().pid) res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, @@ -91,7 +90,7 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de logger.info("Generating signal and exporting to CSV", geo_res=geo_res, sensor=sensor_name, - pid=multiprocessing.current_process().pid) + pid=current_process().pid) res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, From b90e9415f1cfffd06f2949c8892a5b35f4fa106c Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Mon, 7 Aug 2023 14:07:10 +0300 Subject: [PATCH 11/13] Linting --- .../delphi_quidel_covidtest/run.py | 121 +++++++++--------- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index a502bfd9a..758e66a4f 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -7,7 +7,6 @@ import atexit from datetime import datetime from multiprocessing import Manager, Pool, cpu_count, current_process -import os import time from typing import Dict, Any @@ -169,69 +168,71 @@ def run_module(params: Dict[str, Any]): prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) n_cpu = min(8, cpu_count()) # for parallelization - lock = Manager().Lock() # for using loggers in multiple threads - logger.info("Parallelizing sensor generation", n_cpu=n_cpu) - with Pool(n_cpu) as pool: - pool_results = [] - for geo_res in NONPARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - geo_groups = geo_data.groupby(res_key) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - pool_results.append( - pool.apply_async( - generate_and_export_for_nonparent_geo, - args=( - # generate_sensors_for_parent_geo - geo_groups, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup, - # create_export_csv - geo_res, sensor_name, export_dir, - export_start_date, export_end_date, - # logger params - lock, - params["common"].get("log_filename"), - params["common"].get("log_exceptions", True) + with Manager() as manager: + # for using loggers in multiple threads + lock = manager.Lock() # pylint: disable=no-member + logger.info("Parallelizing sensor generation", n_cpu=n_cpu) + with Pool(n_cpu) as pool: + pool_results = [] + for geo_res in NONPARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) + geo_groups = geo_data.groupby(res_key) + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_nonparent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + lock, + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) ) ) - ) - assert geo_res == "state" # Make sure geo_groups is for state level - # County/HRR/MSA level - for geo_res in PARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - pool_results.append( - pool.apply_async( - generate_and_export_for_parent_geo, - args=( - # generate_sensors_for_parent_geo - geo_groups, geo_data, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup, - # create_export_csv - geo_res, sensor_name, export_dir, - export_start_date, export_end_date, - # logger params - lock, - params["common"].get("log_filename"), - params["common"].get("log_exceptions", True) + assert geo_res == "state" # Make sure geo_groups is for state level + # County/HRR/MSA level + for geo_res in PARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_parent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, geo_data, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + lock, + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) ) ) - ) - pool_results = [proc.get() for proc in pool_results] - for dates in pool_results: - if len(dates) > 0: - stats.append((max(dates), len(dates))) + pool_results = [proc.get() for proc in pool_results] + for dates in pool_results: + if len(dates) > 0: + stats.append((max(dates), len(dates))) # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file From dfdf0aa49e8dfbe25acccf8a0341be3eade83345 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Tue, 8 Aug 2023 21:44:07 +0300 Subject: [PATCH 12/13] Clarify pylint suppression --- quidel_covidtest/delphi_quidel_covidtest/run.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 758e66a4f..314d8b567 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -169,7 +169,8 @@ def run_module(params: Dict[str, Any]): smoothers = get_smooth_info(sensors, SMOOTHERS) n_cpu = min(8, cpu_count()) # for parallelization with Manager() as manager: - # for using loggers in multiple threads + # for using loggers in multiple threads + # disabled due to a Pylint bug, resolved by version bump (#1886) lock = manager.Lock() # pylint: disable=no-member logger.info("Parallelizing sensor generation", n_cpu=n_cpu) with Pool(n_cpu) as pool: From 43a041541ddb70b6179ffa404ebef7f0abec0a1c Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Tue, 8 Aug 2023 21:48:40 +0300 Subject: [PATCH 13/13] Add useless-suppression --- quidel_covidtest/.pylintrc | 1 + quidel_covidtest/delphi_quidel_covidtest/geo_maps.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/quidel_covidtest/.pylintrc b/quidel_covidtest/.pylintrc index 854cf38d2..29bd9aac2 100644 --- a/quidel_covidtest/.pylintrc +++ b/quidel_covidtest/.pylintrc @@ -9,6 +9,7 @@ disable=logging-format-interpolation, no-self-use, # Allow pytest classes to have one test. too-few-public-methods +enable=useless-suppression [BASIC] diff --git a/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py b/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py index d59dab692..0c5ac4f9b 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py +++ b/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py @@ -75,11 +75,11 @@ def add_parent_state(data, geo_res, geo_key): """ fips_to_state = GMPR.get_crosswalk(from_code="fips", to_code="state") if geo_res == "county": - mix_map = fips_to_state[["fips", "state_id"]] # pylint: disable=unsubscriptable-object + mix_map = fips_to_state[["fips", "state_id"]] else: fips_to_geo_res = GMPR.get_crosswalk(from_code="fips", to_code=geo_res) mix_map = fips_to_geo_res[["fips", geo_res]].merge( - fips_to_state[["fips", "state_id"]], # pylint: disable=unsubscriptable-object + fips_to_state[["fips", "state_id"]], on="fips", how="inner") mix_map = GMPR.add_population_column(mix_map, "fips").groupby(