Skip to content

Commit

Permalink
Merge branch 'feature/parallel-deposited-exposures' into 'master'
Browse files Browse the repository at this point in the history
model: compute deposited exposures in parallel

See merge request caimira/caimira!490
  • Loading branch information
lrdossan committed Mar 26, 2024
2 parents 4ed285d + ddf7684 commit 5042f5d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
6 changes: 5 additions & 1 deletion caimira/apps/calculator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ async def post(self) -> None:
timeout=300,
)
model = form.build_model()
report_data_task = executor.submit(calculate_report_data, form, model)
report_data_task = executor.submit(calculate_report_data, form, model,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor,
self.settings['report_generation_parallelism'],
),)
report_data: dict = await asyncio.wrap_future(report_data_task)
await self.finish(report_data)

Expand Down
52 changes: 37 additions & 15 deletions caimira/apps/calculator/report_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,17 @@ def concentrations_with_sr_breathing(form: VirusFormData, model: models.Exposure
lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations

def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name

def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name

def _calculate_co2_concentration(CO2_model, time, fn_name=None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name

@profile
def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
Expand All @@ -127,20 +135,34 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> t
]
lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals)

cumulative_doses = np.cumsum([
np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(times[:-1], times[1:])
])
long_range_cumulative_doses = np.cumsum([
np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(times[:-1], times[1:])
])

CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
CO2_concentrations = {'CO₂': {'concentrations': [
np.array(CO2_model.concentration(float(time))).mean()
for time in times
]}}

# compute deposited exposures and CO2 concentrations in parallel to increase performance
deposited_exposures = []
long_range_deposited_exposures = []
CO2_concentrations = []

tasks = []
with executor_factory() as executor:
for time1, time2 in zip(times[:-1], times[1:]):
tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
# co2 concentration: takes each time as param, not the interval
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
# co2 concentration: calculate the last time too
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2"))

for task in tasks:
result, fn_name = task.result()
if fn_name == "de":
deposited_exposures.append(result)
elif fn_name == "lr":
long_range_deposited_exposures.append(result)
elif fn_name == "co2":
CO2_concentrations.append(result)

cumulative_doses = np.cumsum(deposited_exposures)
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)

prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
Expand Down Expand Up @@ -513,7 +535,7 @@ def prepare_context(
}

scenario_sample_times = interesting_times(model)
report_data = calculate_report_data(form, model)
report_data = calculate_report_data(form, model, executor_factory=executor_factory)
context.update(report_data)

alternative_scenarios = manufacture_alternative_scenarios(form)
Expand Down

0 comments on commit 5042f5d

Please sign in to comment.