-
Notifications
You must be signed in to change notification settings - Fork 7
/
util.py
512 lines (420 loc) · 23.9 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
"""This file contains helpful utility functions."""
import hashlib
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
import numpy as np
import pandas as pd
from pandas import DataFrame, DateOffset
from tlo import Population, Property, Types
# Default mother_id value, assigned to individuals initialised as adults at the start of the simulation.
DEFAULT_MOTHER_ID = -1e7
def create_age_range_lookup(min_age: int, max_age: int, range_size: int = 5) -> (list, Dict[int, str]):
"""Create age-range categories and a dictionary that will map all whole years to age-range categories
If the minimum age is not zero then a below minimum age category will be made,
then age ranges until maximum age will be made by the range size,
all other ages will map to the greater than maximum age category.
:param min_age: Minimum age for categories,
:param max_age: Maximum age for categories, a greater than maximum age category will be made
:param range_size: Size of each category between minimum and maximum ages
:returns:
age_categories: ordered list of age categories available
lookup: Default dict of integers to maximum age mapping to the age categories
"""
def chunks(items, n):
"""Takes a list and divides it into parts of size n"""
for index in range(0, len(items), n):
yield items[index:index + n]
# split all the ages from min to limit
parts = chunks(range(min_age, max_age), range_size)
default_category = f'{max_age}+'
lookup = defaultdict(lambda: default_category)
age_categories = []
# create category for minimum age
if min_age > 0:
under_min_age_category = f'0-{min_age}'
age_categories.append(under_min_age_category)
for i in range(0, min_age):
lookup[i] = under_min_age_category
# loop over each range and map all ages falling within the range to the range
for part in parts:
start = part.start
end = part.stop - 1
value = f'{start}-{end}'
age_categories.append(value)
for i in range(start, part.stop):
lookup[i] = value
age_categories.append(default_category)
return age_categories, lookup
def transition_states(initial_series: pd.Series, prob_matrix: pd.DataFrame, rng: np.random.RandomState) -> pd.Series:
"""Transition a series of states based on probability matrix
This should carry out all state transitions for a Series (i.e. column in DataFrame)
based on the probability of state-transition matrix.
Timing values for 1M rows per state, 4 states, 100 times:
- Looping through groups: [59.5, 58.7, 59.5]
- Using apply: [84.2, 83.3, 84.4]
Because of this, looping through the groups was chosen
:param Series initial_series: the initial state series
:param DataFrame prob_matrix: DataFrame of state-transition probabilities
columns are the original state, rows are the new state. values are the probabilities
:param RandomState rng: RandomState from the disease module
:return: Series with states changed according to probabilities
"""
# Create final_series with index so that we are sure it's the same size as the original
final_states = pd.Series(None, index=initial_series.index, dtype=initial_series.dtype)
# for each state, get the random choice states and add to the final_states Series
state_indexes = initial_series.groupby(initial_series).groups
all_states = prob_matrix.index.tolist()
for state, state_index in state_indexes.items():
if not state_index.empty:
new_states = rng.choice(all_states, len(state_index), p=prob_matrix[state])
final_states[state_index] = new_states
return final_states
def sample_outcome(probs: pd.DataFrame, rng: np.random.RandomState):
""" Helper function to randomly sample an outcome for each individual in a population from a set of probabilities
that are specific to each individual.
:param probs: Each row of this dataframe represents the person and the columns are the possible outcomes. The
values are the probability of that outcome for that individual. For each individual, the probabilities of each
outcome are assumed to be independent and mutually exclusive (but not necessarily exhaustive). If they sum to more
than 1.0, then they are (silently) scaled so that they do sum to 1.0.
:param rng: Random Number state to use for the generation of random numbers.
:return: A dict of the form {<index>:<outcome>} where an outcome is selected.
"""
# Scaling to ensure that the sum in each row not exceed 1.0
probs = probs.apply(lambda row: (row / row.sum() if row.sum() >= 1.0 else row), axis=1)
assert (probs.sum(axis=1) < (1.0 + 1e-6)).all(), "Probabilities across columns cannot sum to more than 1.0"
# Compare uniform deviate to cumulative sum across columns, after including a "null" column (for no event).
_probs = probs.copy()
_probs['_'] = 1.0 - _probs.sum(axis=1) # add implied "none of these events" category
cumsum = _probs.cumsum(axis=1)
draws = pd.Series(rng.rand(len(cumsum)), index=cumsum.index)
y = cumsum.gt(draws, axis=0)
outcome = y.idxmax(axis=1)
# return as a dict of form {person_id: outcome} only in those cases where the outcome is one of the events.
return outcome.loc[outcome != '_'].to_dict()
BitsetDType = Property.PANDAS_TYPE_MAP[Types.BITSET]
class BitsetHandler:
"""Provides methods to operate on int column(s) in the population dataframe as a bitset"""
def __init__(self, population: Population, column: Optional[str], elements: List[str]):
"""
:param population: The TLO Population object (not the props dataframe).
:param column: The integer property column that will be used as a bitset. If
set to ``None`` then the optional `columns` argument to methods which act
on the population dataframe __must__ be specified.
:param elements: A list of strings specifying the elements of the bitset.
:returns: Instance of BitsetHandler for supplied arguments.
"""
assert isinstance(population, Population), (
'First argument is the population object (not the `props` dataframe)'
)
dtype_bitwidth = BitsetDType(0).nbytes * 8
assert len(elements) <= dtype_bitwidth, (
f"A maximum of {dtype_bitwidth} elements are supported"
)
self._elements = elements
self._element_to_int_map = {el: 2 ** i for i, el in enumerate(elements)}
self._population = population
if column is not None:
assert column in population.props.columns, (
'Column not found in population dataframe'
)
assert population.props[column].dtype == BitsetDType, (
f'Column must be of {BitsetDType} type'
)
self._column = column
@property
def df(self) -> pd.DataFrame:
return self._population.props
def element_repr(self, *elements: str) -> BitsetDType:
"""Returns integer representation of the specified element(s)"""
return BitsetDType(sum(self._element_to_int_map[el] for el in elements))
def to_strings(self, integer: BitsetDType) -> Set[str]:
"""Given an integer value, returns the corresponding set of strings.
:param integer: The integer value for the bitset.
:return: Set of strings corresponding to integer value.
"""
bin_repr = format(integer, 'b')
return {
self._elements[index]
for index, bit in enumerate(reversed(bin_repr)) if bit == '1'
}
def _get_columns(self, columns):
if columns is None and self._column is None:
raise ValueError(
'columns argument must be specified as not set when constructing handler'
)
return self._column if columns is None else columns
def set(self, where, *elements: str, columns: Optional[Union[str, List[str]]] = None):
"""Set (i.e. set to True) the bits corersponding to the specified elements.
The where argument is used verbatim as the first item in a `df.loc[x, y]` call.
It can be index items, a boolean logical condition, or list of row indices e.g. "[0]".
The elements are one of more valid items from the list of elements for this bitset.
:param where: Condition to filter rows that will be set.
:param elements: One or more elements to set to True.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
"""
self.df.loc[where, self._get_columns(columns)] |= self.element_repr(*elements)
def unset(self, where, *elements: str, columns: Optional[Union[str, List[str]]] = None):
"""Unset (i.e. set to False) the bits corresponding the specified elements.
The where argument is used verbatim as the first item in a `df.loc[x, y]` call.
It can be index items, a boolean logical condition, or list of row indices e.g. "[0]".
The elements are one of more valid items from the list of elements for this bitset.
:param where: Condition to filter rows that will be unset.
:param elements: one or more elements to set to False.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
"""
self.df.loc[where, self._get_columns(columns)] &= ~self.element_repr(*elements) # pylint: disable=E1130
def clear(self, where, columns: Optional[Union[str, List[str]]] = None):
"""Clears all the bits for the specified rows.
:param where: Condition to filter rows that will cleared.
:param columns: Optional argument specifying column(s) containing bitsets to
clear. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
"""
self.df.loc[where, self._get_columns(columns)] = 0
def has(
self,
where,
element: str,
first: bool = False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, bool]:
"""Test whether bit(s) for a specified element are set.
:param where: Condition to filter rows that will checked.
:param element: Element string to test if bit is set for.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Boolean value(s) indicating whether element bit(s) are set.
"""
int_repr = self._element_to_int_map[element]
matched = (self.df.loc[where, self._get_columns(columns)] & int_repr) != 0
return matched.iloc[0] if first else matched
def has_all(
self,
where,
*elements: str,
first: bool = False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, bool]:
"""Check whether individual(s) have all the elements given set to True.
The where argument is used verbatim as the first item in a `df.loc[x, y]` call.
It can be index items, a boolean logical condition, or list of row indices e.g. "[0]"
The elements are one of more valid items from the list of elements for this bitset.
:param where: Condition to filter rows that will checked.
:param elements: One or more elements to set to True.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Boolean value(s) indicating whether all element bit(s) are set.
"""
int_repr = self.element_repr(*elements)
matched = (self.df.loc[where, self._get_columns(columns)] & int_repr) == int_repr
return matched.iloc[0] if first else matched
def has_any(
self,
where,
*elements: str,
first: bool = False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, bool]:
"""Check whether individual(s) have any of the elements given set to True.
The where argument is used verbatim as the first item in a `df.loc[x, y]` call.
It can be index items, a boolean logical condition, or list of row indices e.g. "[0]"
The elements are one of more valid items from the list of elements for this bitset.
:param where: Condition to filter rows that will checked.
:param elements: One or more elements to set to True.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Boolean value(s) indicating whether any element bit(s) are set.
"""
int_repr = self.element_repr(*elements)
matched = (self.df.loc[where, self._get_columns(columns)] & int_repr) != 0
return matched.iloc[0] if first else matched
def get(
self,
where,
first: bool = False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, Set[str]]:
"""Returns a series or dataframe with set of string elements where bit is True.
The where argument is used verbatim as the first item in a `df.loc[x, y]` call.
It can be index items, a boolean logical condition, or list of row indices e.g. "[0]"
The elements are one of more valid items from the list of elements for this bitset
:param where: Condition to filter rows that will returned.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Set(s) of strings corresponding to elements with bits set to True.
"""
columns = self._get_columns(columns)
if isinstance(columns, str):
sets = self.df.loc[where, columns].apply(self.to_strings)
else:
sets = self.df.loc[where, columns].applymap(self.to_strings)
return sets.iloc[0] if first else sets
def uncompress(
self,
where=None,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""Returns an exploded representation of the bitset(s).
Each element bit becomes a column and each column is a bool indicating whether
the bit is set for the element.
:param where: Condition to filter rows that an exploded representation will be
returned for.
:param columns: Optional argument specifying column(s) containing bitsets to
return exploded representation for. If set to ``None`` (the default) a
``column`` argument must have been specified when constructing the
``BitsetHandler`` object.
:return: If ``columns`` is not set or is set to a single string, then a
dataframe is returned with a column for each element in set and boolean
values indicating whether the corresponding bit is set; if ``columns`` is
specified as a list of multiple column names a dictionary keyed by column
name and with the corresponding value a dataframe corresponding to the
exploded representation of the column bitset is returned.
"""
if where is None:
where = self.df.index
columns = self._get_columns(columns)
uncompressed = {}
for column in [columns] if isinstance(columns, str) else columns:
collect = dict()
for element in self._elements:
collect[element] = self.has(where, element, columns=column)
uncompressed[column] = pd.DataFrame(collect)
return uncompressed[columns] if isinstance(columns, str) else uncompressed
def not_empty(
self,
where,
first=False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, bool]:
"""Returns Series of bool indicating whether the BitSet entry is not empty.
True is set is not empty, False otherwise.
:param where: Condition to filter rows that will checked.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Boolean value(s) indicating whether any elements bits are set.
"""
return ~self.is_empty(where, first=first, columns=columns)
def is_empty(
self,
where,
first=False,
columns: Optional[Union[str, List[str]]] = None
) -> Union[pd.DataFrame, pd.Series, bool]:
"""Returns Series of bool indicating whether the BitSet entry is empty.
True if the set is empty, False otherwise.
:param where: Condition to filter rows that will checked.
:param first: Boolean keyword argument specifying whether to return only the
first item / row in the computed column / dataframe.
:param columns: Optional argument specifying column(s) containing bitsets to
update. If set to ``None`` (the default) a ``column`` argument must have
been specified when constructing the ``BitsetHandler`` object.
:return: Boolean value(s) indicating whether all elements bits are not set.
"""
empty = self.df.loc[where, self._get_columns(columns)] == 0
return empty.iloc[0] if first else empty
def random_date(start, end, rng):
if start >= end:
raise ValueError("End date equal to or earlier than start date")
return start + DateOffset(days=rng.randint(0, (end - start).days))
def str_to_pandas_date(date_string):
"""Convert a string with the format YYYY-MM-DD to a pandas Timestamp (aka TLO Date) object."""
return pd.to_datetime(date_string, format="%Y-%m-%d")
def hash_dataframe(dataframe: pd.DataFrame):
def coerce_lists_to_tuples(df: pd.DataFrame) -> pd.DataFrame:
"""Coerce columns in a pd.DataFrame that are lists to tuples. This step is needed before hashing a pd.DataFrame
as list are not hashable."""
return df.applymap(lambda x: tuple(x) if isinstance(x, list) else x)
return hashlib.sha1(pd.util.hash_pandas_object(coerce_lists_to_tuples(dataframe)).values).hexdigest()
def get_person_id_to_inherit_from(child_id, mother_id, population_dataframe, rng):
"""Get index of person to inherit properties from.
"""
if mother_id == DEFAULT_MOTHER_ID:
# Get indices of alive persons and try to drop child_id from these indices if
# present, ignoring any errors if child_id not currently in population dataframe
alive_persons_not_including_child = population_dataframe.index[
population_dataframe.is_alive
].drop(child_id, errors="ignore")
return rng.choice(alive_persons_not_including_child)
elif 0 > mother_id > DEFAULT_MOTHER_ID:
return abs(mother_id)
elif mother_id >= 0:
return mother_id
def convert_excel_files_to_csv(folder: Path, files: Optional[list[str]] = None, *, delete_excel_files: bool = False) -> None:
""" convert Excel files to csv files.
:param folder: Folder containing Excel files.
:param files: List of Excel file names to convert to csv files. When `None`, all Excel files in the folder and
subsequent folders within this folder will be converted to csv files with Excel file name becoming
folder name and sheet names becoming csv file names.
:param delete_excel_files: When true, the Excel file we are generating csv files from will get deleted.
"""
# get path to Excel files
if files is None:
excel_file_paths = sorted(folder.rglob("*.xlsx"))
else:
excel_file_paths = [folder / file for file in files]
# exit function if no Excel file is given or found within the path
if excel_file_paths is None:
return
for excel_file_path in excel_file_paths:
sheet_dataframes: dict[Any, DataFrame] = pd.read_excel(excel_file_path, sheet_name=None)
excel_file_directory: Path = excel_file_path.with_suffix("")
# Create a container directory for per sheet CSVs
if excel_file_directory.exists():
print(f"Directory {excel_file_directory} already exists")
else:
excel_file_directory.mkdir()
# Write a CSV for each worksheet
for sheet_name, dataframe in sheet_dataframes.items():
dataframe.to_csv(f'{excel_file_directory / sheet_name}.csv', index=False)
if delete_excel_files:
# Remove no longer needed Excel file
Path(folder/excel_file_path).unlink()
def read_csv_files(folder: Path, files: Optional[list[str]] = None) -> DataFrame | dict[str, DataFrame]:
"""
A function to read CSV files in a similar way pandas reads Excel files (:py:func:`pandas.read_excel`).
NB: Converting Excel files to csv files caused all columns that had no relevant data to simulation (i.e.
parameter descriptions or data references) to be named `Unnamed1, Unnamed2, ....., UnnamedN` in the csv files.
We are therefore using :py:func:`pandas.filter` to track all unnamed columns and silently drop them using
:py:func:`pandas.drop`.
:param folder: Path to folder containing CSV files to read.
:param files: preferred csv file name(s). This is the same as sheet names in Excel file. Note that if None(no files
selected) then all files in the containing folder will be loaded
"""
all_data: dict[str, DataFrame] = {} # dataframes dictionary
def clean_dataframe(dataframes_dict: dict[str, DataFrame]) -> None:
""" silently drop all columns that have no relevant data to simulation (all columns with a name starting with
Unnamed
:param dataframes_dict: Dictionary of dataframes to clean
"""
for _key, dataframe in dataframes_dict.items():
all_data[_key] = dataframe.drop(dataframe.filter(like='Unnamed'), axis=1) # filter and drop Unnamed columns
if files is None:
for f_name in folder.rglob("*.csv"):
all_data[f_name.stem] = pd.read_csv(f_name)
else:
for f_name in files:
all_data[f_name] = pd.read_csv((folder / f_name).with_suffix(".csv"))
# clean and return the dataframe dictionary
clean_dataframe(all_data)
# If only one file loaded return dataframe directly rather than dict
return next(iter(all_data.values())) if len(all_data) == 1 else all_data