From ee6cb442cc49b4d7a24f927935e50fdb08932b22 Mon Sep 17 00:00:00 2001 From: matt bowen Date: Wed, 17 Aug 2022 09:48:32 -0400 Subject: [PATCH] Add flag for memory intensive ETLs (#1780) --- .../data_pipeline/etl/constants.py | 29 +++++++++++++++++++ .../data-pipeline/data_pipeline/etl/runner.py | 8 ++--- .../data_pipeline/etl/sources/eamlis/etl.py | 1 - 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/data/data-pipeline/data_pipeline/etl/constants.py b/data/data-pipeline/data_pipeline/etl/constants.py index 4363b9a73..6657cb44f 100644 --- a/data/data-pipeline/data_pipeline/etl/constants.py +++ b/data/data-pipeline/data_pipeline/etl/constants.py @@ -3,36 +3,43 @@ "name": "cdc_places", "module_dir": "cdc_places", "class_name": "CDCPlacesETL", + "is_memory_intensive": False, }, { "name": "national_risk_index", "module_dir": "national_risk_index", "class_name": "NationalRiskIndexETL", + "is_memory_intensive": False, }, { "name": "travel_composite", "module_dir": "dot_travel_composite", "class_name": "TravelCompositeETL", + "is_memory_intensive": False, }, { "name": "tree_equity_score", "module_dir": "tree_equity_score", "class_name": "TreeEquityScoreETL", + "is_memory_intensive": False, }, { "name": "census_decennial", "module_dir": "census_decennial", "class_name": "CensusDecennialETL", + "is_memory_intensive": False, }, { "name": "housing_and_transportation", "module_dir": "housing_and_transportation", "class_name": "HousingTransportationETL", + "is_memory_intensive": False, }, { "name": "mapping_for_ej", "module_dir": "mapping_for_ej", "class_name": "MappingForEJETL", + "is_memory_intensive": False, }, { "name": "fsf_flood_risk", @@ -48,112 +55,134 @@ "name": "ejscreen", "module_dir": "ejscreen", "class_name": "EJSCREENETL", + "is_memory_intensive": False, }, { "name": "hud_housing", "module_dir": "hud_housing", "class_name": "HudHousingETL", + "is_memory_intensive": False, }, { "name": "census_acs_median_income", "module_dir": "census_acs_median_income", "class_name": "CensusACSMedianIncomeETL", + "is_memory_intensive": False, }, { "name": "cdc_life_expectancy", "module_dir": "cdc_life_expectancy", "class_name": "CDCLifeExpectancy", + "is_memory_intensive": False, }, { "name": "doe_energy_burden", "module_dir": "doe_energy_burden", "class_name": "DOEEnergyBurden", + "is_memory_intensive": False, }, { "name": "geocorr", "module_dir": "geocorr", "class_name": "GeoCorrETL", + "is_memory_intensive": False, }, { "name": "child_opportunity_index", "module_dir": "child_opportunity_index", "class_name": "ChildOpportunityIndex", + "is_memory_intensive": False, }, { "name": "mapping_inequality", "module_dir": "mapping_inequality", "class_name": "MappingInequalityETL", + "is_memory_intensive": False, }, { "name": "persistent_poverty", "module_dir": "persistent_poverty", "class_name": "PersistentPovertyETL", + "is_memory_intensive": False, }, { "name": "ejscreen_areas_of_concern", "module_dir": "ejscreen_areas_of_concern", "class_name": "EJSCREENAreasOfConcernETL", + "is_memory_intensive": False, }, { "name": "calenviroscreen", "module_dir": "calenviroscreen", "class_name": "CalEnviroScreenETL", + "is_memory_intensive": False, }, { "name": "hud_recap", "module_dir": "hud_recap", "class_name": "HudRecapETL", + "is_memory_intensive": False, }, { "name": "epa_rsei", "module_dir": "epa_rsei", "class_name": "EPARiskScreeningEnvironmentalIndicatorsETL", + "is_memory_intensive": False, }, { "name": "energy_definition_alternative_draft", "module_dir": "energy_definition_alternative_draft", "class_name": "EnergyDefinitionAlternativeDraft", + "is_memory_intensive": False, }, { "name": "michigan_ejscreen", "module_dir": "michigan_ejscreen", "class_name": "MichiganEnviroScreenETL", + "is_memory_intensive": False, }, { "name": "cdc_svi_index", "module_dir": "cdc_svi_index", "class_name": "CDCSVIIndex", + "is_memory_intensive": False, }, { "name": "maryland_ejscreen", "module_dir": "maryland_ejscreen", "class_name": "MarylandEJScreenETL", + "is_memory_intensive": False, }, { "name": "historic_redlining", "module_dir": "historic_redlining", "class_name": "HistoricRedliningETL", + "is_memory_intensive": False, }, # This has to come after us.json exists { "name": "census_acs", "module_dir": "census_acs", "class_name": "CensusACSETL", + "is_memory_intensive": False, }, { "name": "census_acs_2010", "module_dir": "census_acs_2010", "class_name": "CensusACS2010ETL", + "is_memory_intensive": False, }, { "name": "us_army_fuds", "module_dir": "us_army_fuds", "class_name": "USArmyFUDS", + "is_memory_intensive": True, }, { "name": "eamlis", "module_dir": "eamlis", "class_name": "AbandonedMineETL", + "is_memory_intensive": True, }, ] diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 1b814b8f4..11f64a9ce 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -1,7 +1,6 @@ import importlib import concurrent.futures import typing -import os from data_pipeline.etl.score.etl_score import ScoreETL from data_pipeline.etl.score.etl_score_geo import GeoScoreETL @@ -77,10 +76,10 @@ def etl_runner(dataset_to_run: str = None) -> None: None """ dataset_list = _get_datasets_to_run(dataset_to_run) - # try running the high memory tasks separately - concurrent_datasets = dataset_list[:-2] - high_memory_datasets = dataset_list[-2:] + concurrent_datasets = [dataset for dataset in dataset_list if not dataset['is_memory_intensive']] + high_memory_datasets = [dataset for dataset in dataset_list if dataset['is_memory_intensive']] + logger.info("Running concurrent jobs") with concurrent.futures.ThreadPoolExecutor() as executor: futures = { @@ -92,6 +91,7 @@ def etl_runner(dataset_to_run: str = None) -> None: # Calling result will raise an exception if one occurred. # Otherwise, the exceptions are silently ignored. fut.result() + logger.info("Running high-memory jobs") for dataset in high_memory_datasets: _run_one_dataset(dataset=dataset) diff --git a/data/data-pipeline/data_pipeline/etl/sources/eamlis/etl.py b/data/data-pipeline/data_pipeline/etl/sources/eamlis/etl.py index 8fc34673e..0c09b7118 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/eamlis/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/eamlis/etl.py @@ -60,4 +60,3 @@ def transform(self) -> None: gdf_tracts = gdf_tracts.drop_duplicates(self.GEOID_TRACT_FIELD_NAME) gdf_tracts[self.AML_BOOLEAN] = True self.output_df = gdf_tracts[self.COLUMNS_TO_KEEP] -