-
Notifications
You must be signed in to change notification settings - Fork 124
/
base.py
1160 lines (994 loc) · 45.4 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Hazard.
"""
__all__ = ['Hazard']
import copy
import datetime as dt
import logging
from typing import Optional,List
import warnings
import geopandas as gpd
import numpy as np
from pathos.pools import ProcessPool as Pool
from scipy import sparse
from climada import CONFIG
from climada.hazard.plot import HazardPlot
from climada.hazard.io import HazardIO
from climada.hazard.centroids.centr import Centroids
import climada.util.checker as u_check
import climada.util.constants as u_const
import climada.util.coordinates as u_coord
import climada.util.dates_times as u_dt
LOGGER = logging.getLogger(__name__)
class Hazard(HazardIO, HazardPlot):
"""
Contains events of some hazard type defined at centroids. Loads from
files with format defined in FILE_EXT.
Attention
---------
This class uses instances of
`scipy.sparse.csr_matrix
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html>`_
to store :py:attr:`intensity` and :py:attr:`fraction`. This data types comes with
its particular pitfalls. Depending on how the objects are instantiated and modified,
a matrix might end up in a "non-canonical" state. In this state, its ``.data``
attribute does not necessarily represent the values apparent in the final matrix.
In particular, a "non-canonical" matrix may store "duplicates", i.e. multiple values
that map to the same matrix position. This is supported, and the default behavior is
to sum up these values. To avoid any inconsistencies, call :py:meth:`check_matrices`
before accessing the ``data`` attribute of either matrix. This will explicitly sum
all values at the same matrix position and eliminate explicit zeros.
Attributes
----------
haz_type : str
two-letters hazard-type string, e.g., "TC" (tropical cyclone), "RF" (river flood) or "WF"
(wild fire).
Note: The acronym is used as reference to the hazard when centroids of multiple hazards
are assigned to an ``Exposures`` object.
units : str
units of the intensity
centroids : Centroids
centroids of the events
event_id : np.array
id (>0) of each event
event_name : list(str)
name of each event (default: event_id)
date : np.array
integer date corresponding to the proleptic
Gregorian ordinal, where January 1 of year 1 has ordinal 1
(ordinal format of datetime library)
orig : np.array
flags indicating historical events (True)
or probabilistic (False)
frequency : np.array
frequency of each event
frequency_unit : str
unit of the frequency (default: "1/year")
intensity : sparse.csr_matrix
intensity of the events at centroids
fraction : sparse.csr_matrix
fraction of affected exposures for each event at each centroid.
If empty (all 0), it is ignored in the impact computations
(i.e., is equivalent to fraction is 1 everywhere).
"""
intensity_thres = 10
"""Intensity threshold per hazard used to filter lower intensities. To be
set for every hazard type"""
vars_oblig = {'units',
'centroids',
'event_id',
'frequency',
'intensity',
'fraction'
}
"""Name of the variables needed to compute the impact. Types: scalar, str,
list, 1dim np.array of size num_events, scipy.sparse matrix of shape
num_events x num_centroids, Centroids."""
vars_def = {'date',
'orig',
'event_name',
'frequency_unit'
}
"""Name of the variables used in impact calculation whose value is
descriptive and can therefore be set with default values. Types: scalar,
string, list, 1dim np.array of size num_events.
"""
vars_opt = set()
"""Name of the variables that aren't need to compute the impact. Types:
scalar, string, list, 1dim np.array of size num_events."""
def __init__(self,
haz_type: str = "",
pool: Optional[Pool] = None,
units: str = "",
centroids: Optional[Centroids] = None,
event_id: Optional[np.ndarray] = None,
frequency: Optional[np.ndarray] = None,
frequency_unit: str = u_const.DEF_FREQ_UNIT,
event_name: Optional[List[str]] = None,
date: Optional[np.ndarray] = None,
orig: Optional[np.ndarray] = None,
intensity: Optional[sparse.csr_matrix] = None,
fraction: Optional[sparse.csr_matrix] = None):
"""
Initialize values.
Parameters
----------
haz_type : str, optional
acronym of the hazard type (e.g. 'TC').
pool : pathos.pool, optional
Pool that will be used for parallel computation when applicable. Default: None
units : str, optional
units of the intensity. Defaults to empty string.
centroids : Centroids, optional
centroids of the events. Defaults to empty Centroids object.
event_id : np.array, optional
id (>0) of each event. Defaults to empty array.
event_name : list(str), optional
name of each event (default: event_id). Defaults to empty list.
date : np.array, optional
integer date corresponding to the proleptic
Gregorian ordinal, where January 1 of year 1 has ordinal 1
(ordinal format of datetime library). Defaults to empty array.
orig : np.array, optional
flags indicating historical events (True)
or probabilistic (False). Defaults to empty array.
frequency : np.array, optional
frequency of each event. Defaults to empty array.
frequency_unit : str, optional
unit of the frequency (default: "1/year").
intensity : sparse.csr_matrix, optional
intensity of the events at centroids. Defaults to empty matrix.
fraction : sparse.csr_matrix, optional
fraction of affected exposures for each event at each centroid. Defaults to
empty matrix.
Examples
--------
Initialize using keyword arguments:
>>> haz = Hazard('TC', intensity=sparse.csr_matrix(np.zeros((2, 2))))
Take hazard values from file:
>>> haz = Hazard.from_mat(HAZ_DEMO_MAT, 'demo')
"""
self.haz_type = haz_type
self.units = units
self.centroids = centroids if centroids is not None else Centroids(
lat=np.empty(0), lon=np.empty(0))
# following values are defined for each event
self.event_id = event_id if event_id is not None else np.array([], int)
self.frequency = frequency if frequency is not None else np.array(
[], float)
self.frequency_unit = frequency_unit
self.event_name = event_name if event_name is not None else list()
self.date = date if date is not None else np.array([], int)
self.orig = orig if orig is not None else np.array([], bool)
# following values are defined for each event and centroid
self.intensity = intensity if intensity is not None else sparse.csr_matrix(
np.empty((0, 0))) # events x centroids
self.fraction = fraction if fraction is not None else sparse.csr_matrix(
self.intensity.shape) # events x centroids
self.pool = pool
if self.pool:
LOGGER.info('Using %s CPUs.', self.pool.ncpus)
def check_matrices(self):
"""Ensure that matrices are consistently shaped and stored
It is good practice to call this method before accessing the ``data`` attribute
of either :py:attr:`intensity` or :py:attr:`fraction`.
See Also
--------
:py:func:`climada.util.checker.prune_csr_matrix`
Todo
-----
* Check consistency with centroids
Raises
------
ValueError
If matrices are ill-formed or ill-shaped in relation to each other
"""
u_check.prune_csr_matrix(self.intensity)
u_check.prune_csr_matrix(self.fraction)
if self.fraction.nnz > 0:
if self.intensity.shape != self.fraction.shape:
raise ValueError(
"Intensity and fraction matrices must have the same shape"
)
@classmethod
def get_default(cls, attribute):
"""Get the Hazard type default for a given attribute.
Parameters
----------
attribute : str
attribute name
Returns
------
Any
"""
return {
'frequency_unit': u_const.DEF_FREQ_UNIT,
}.get(attribute)
def check(self):
"""Check dimension of attributes.
Raises
------
ValueError
"""
self._check_events()
def reproject_vector(self, dst_crs):
"""Change current point data to a a given projection
Parameters
----------
dst_crs : crs
reproject to given crs
"""
self.centroids.gdf.to_crs(dst_crs, inplace=True)
self.check()
def select(self, event_names=None, event_id=None, date=None, orig=None,
reg_id=None, extent=None, reset_frequency=False):
"""Select events matching provided criteria
The frequency of events may need to be recomputed (see `reset_frequency`)!
Parameters
----------
event_names : list of str, optional
Names of events.
event_id : list of int, optional
Id of events. Default is None.
date : array-like of length 2 containing str or int, optional
(initial date, final date) in string ISO format ('2011-01-02') or datetime
ordinal integer.
orig : bool, optional
Select only historical (True) or only synthetic (False) events.
reg_id : int, optional
Region identifier of the centroids' region_id attibute.
extent: tuple(float, float, float, float), optional
Extent of centroids as (min_lon, max_lon, min_lat, max_lat).
The default is None.
reset_frequency : bool, optional
Change frequency of events proportional to difference between first and last
year (old and new). Default: False.
Returns
-------
haz : Hazard or None
If no event matching the specified criteria is found, None is returned.
"""
# pylint: disable=unidiomatic-typecheck
if type(self) is Hazard:
haz = Hazard(self.haz_type)
else:
haz = self.__class__()
#filter events
sel_ev = np.ones(self.event_id.size, dtype=bool)
# filter events by date
if date is not None:
date_ini, date_end = date
if isinstance(date_ini, str):
date_ini = u_dt.str_to_date(date[0])
date_end = u_dt.str_to_date(date[1])
sel_ev &= (date_ini <= self.date) & (self.date <= date_end)
if not np.any(sel_ev):
LOGGER.info('No hazard in date range %s.', date)
return None
# filter events hist/synthetic
if orig is not None:
sel_ev &= (self.orig.astype(bool) == orig)
if not np.any(sel_ev):
LOGGER.info('No hazard with %s original events.', str(orig))
return None
# filter events based on name
sel_ev = np.argwhere(sel_ev).reshape(-1)
if event_names is not None:
filtered_events = [self.event_name[i] for i in sel_ev]
try:
new_sel = [filtered_events.index(n) for n in event_names]
except ValueError as err:
name = str(err).replace(" is not in list", "")
LOGGER.info('No hazard with name %s', name)
return None
sel_ev = sel_ev[new_sel]
# filter events based on id
if event_id is not None:
# preserves order of event_id
sel_ev = np.array([
np.argwhere(self.event_id == n)[0,0]
for n in event_id
if n in self.event_id[sel_ev]
])
# filter centroids
sel_cen = self.centroids.select_mask(reg_id=reg_id, extent=extent)
if not np.any(sel_cen):
LOGGER.info('No hazard centroids within extent and region')
return None
# Sanitize fraction, because we check non-zero entries later
self.fraction.eliminate_zeros()
# Perform attribute selection
for (var_name, var_val) in self.__dict__.items():
if isinstance(var_val, np.ndarray) and var_val.ndim == 1 \
and var_val.size > 0:
setattr(haz, var_name, var_val[sel_ev])
elif isinstance(var_val, sparse.csr_matrix):
setattr(haz, var_name, var_val[sel_ev, :][:, sel_cen])
elif isinstance(var_val, list) and var_val:
setattr(haz, var_name, [var_val[idx] for idx in sel_ev])
elif var_name == 'centroids':
if reg_id is None and extent is None:
new_cent = var_val
else:
new_cent = var_val.select(sel_cen=sel_cen)
setattr(haz, var_name, new_cent)
else:
setattr(haz, var_name, var_val)
# reset frequency if date span has changed (optional):
if reset_frequency:
if self.frequency_unit not in ['1/year', 'annual', '1/y', '1/a']:
LOGGER.warning("Resetting the frequency is based on the calendar year of given"
" dates but the frequency unit here is %s. Consider setting the frequency"
" manually for the selection or changing the frequency unit to %s.",
self.frequency_unit, u_const.DEF_FREQ_UNIT)
year_span_old = np.abs(dt.datetime.fromordinal(self.date.max()).year -
dt.datetime.fromordinal(self.date.min()).year) + 1
year_span_new = np.abs(dt.datetime.fromordinal(haz.date.max()).year -
dt.datetime.fromordinal(haz.date.min()).year) + 1
haz.frequency = haz.frequency * year_span_old / year_span_new
# Check if new fraction is zero everywhere
if self._get_fraction() is not None and haz._get_fraction() is None:
raise RuntimeError(
"Your selection created a Hazard object where the fraction matrix is "
"zero everywhere. This hazard will have zero impact everywhere. "
"We are catching this condition because of an implementation detail: "
"A fraction matrix without nonzero-valued entries will be completely "
"ignored. This is surely not what you intended. If you really want to, "
"you can circumvent this error by setting your original fraction "
"matrix to zero everywhere, but there probably is no point in doing so."
)
haz.sanitize_event_ids()
return haz
def select_tight(self, buffer=u_coord.NEAREST_NEIGHBOR_THRESHOLD / u_const.ONE_LAT_KM,
val='intensity'):
"""
Reduce hazard to those centroids spanning a minimal box which
contains all non-zero intensity or fraction points.
Parameters
----------
buffer : float, optional
Buffer of box in the units of the centroids.
The default is approximately equal to the default threshold
from the assign_centroids method (works if centroids in
lat/lon)
val: string, optional
Select tight by non-zero 'intensity' or 'fraction'. The
default is 'intensity'.
Returns
-------
Hazard
Copy of the Hazard with centroids reduced to minimal box. All other
hazard properties are carried over without changes.
See also
--------
self.select: Method to select centroids by lat/lon extent
util.coordinates.match_coordinates: algorithm to match centroids.
"""
if val == 'intensity':
cent_nz = (self.intensity != 0).sum(axis=0).nonzero()[1]
if val == 'fraction':
cent_nz = (self.fraction != 0).sum(axis=0).nonzero()[1]
lon_nz = self.centroids.lon[cent_nz]
lat_nz = self.centroids.lat[cent_nz]
return self.select(extent=u_coord.toggle_extent_bounds(
u_coord.latlon_bounds(lat=lat_nz, lon=lon_nz, buffer=buffer)
))
def local_exceedance_inten(self, return_periods=(25, 50, 100, 250)):
"""Compute exceedance intensity map for given return periods.
Parameters
----------
return_periods : np.array
return periods to consider
Returns
-------
inten_stats: np.array
"""
# warn if return period is above return period of rarest event:
for period in return_periods:
if period > 1 / self.frequency.min():
LOGGER.warning('Return period %1.1f exceeds max. event return period.', period)
LOGGER.info('Computing exceedance intenstiy map for return periods: %s',
return_periods)
num_cen = self.intensity.shape[1]
inten_stats = np.zeros((len(return_periods), num_cen))
cen_step = CONFIG.max_matrix_size.int() // self.intensity.shape[0]
if not cen_step:
raise ValueError('Increase max_matrix_size configuration parameter to >'
f' {self.intensity.shape[0]}')
# separte in chunks
chk = -1
for chk in range(int(num_cen / cen_step)):
self._loc_return_inten(
np.array(return_periods),
self.intensity[:, chk * cen_step:(chk + 1) * cen_step].toarray(),
inten_stats[:, chk * cen_step:(chk + 1) * cen_step])
self._loc_return_inten(
np.array(return_periods),
self.intensity[:, (chk + 1) * cen_step:].toarray(),
inten_stats[:, (chk + 1) * cen_step:])
# set values below 0 to zero if minimum of hazard.intensity >= 0:
if np.min(inten_stats) < 0 <= self.intensity.min():
LOGGER.warning('Exceedance intenstiy values below 0 are set to 0. \
Reason: no negative intensity values were found in hazard.')
inten_stats[inten_stats < 0] = 0
return inten_stats
def sanitize_event_ids(self):
"""Make sure that event ids are unique"""
if np.unique(self.event_id).size != self.event_id.size:
LOGGER.debug('Resetting event_id.')
self.event_id = np.arange(1, self.event_id.size + 1)
def local_return_period(self, threshold_intensities=(5., 10., 20.)):
"""Compute local return periods for given hazard intensities. The used method
is fitting the ordered intensitites per centroid to the corresponding cummulated
frequency with a step function.
Parameters
----------
threshold_intensities : np.array
User-specified hazard intensities for which the return period should be calculated
locally (at each centroid). Defaults to (5, 10, 20)
Returns
-------
gdf : gpd.GeoDataFrame
GeoDataFrame containing return periods for given threshold intensities. Each column
corresponds to a threshold_intensity value, each row corresponds to a centroid. Values
in the gdf correspond to the return period for the given centroid and
threshold_intensity value
label : str
GeoDataFrame label, for reporting and plotting
column_label : function
Column-label-generating function, for reporting and plotting
"""
#check frequency unit
if self.frequency_unit in ['1/year', 'annual', '1/y', '1/a']:
rp_unit = 'Years'
elif self.frequency_unit in ['1/month', 'monthly', '1/m']:
rp_unit = 'Months'
elif self.frequency_unit in ['1/week', 'weekly', '1/w']:
rp_unit = 'Weeks'
else:
LOGGER.warning("Hazard's frequency unit %s is not known, "
"years will be used as return period unit.", self.frequency_unit)
rp_unit = 'Years'
# Ensure threshold_intensities is a numpy array
threshold_intensities = np.array(threshold_intensities)
num_cen = self.intensity.shape[1]
return_periods = np.zeros((len(threshold_intensities), num_cen))
# batch_centroids = number of centroids that are handled in parallel:
# batch_centroids = maximal matrix size // number of events
batch_centroids = CONFIG.max_matrix_size.int() // self.intensity.shape[0]
if batch_centroids < 1:
raise ValueError('Increase max_matrix_size configuration parameter to >'
f'{self.intensity.shape[0]}')
# Process the intensities in chunks of centroids
for start_col in range(0, num_cen, batch_centroids):
end_col = min(start_col + batch_centroids, num_cen)
return_periods[:, start_col:end_col] = self._loc_return_period(
threshold_intensities,
self.intensity[:, start_col:end_col].toarray()
)
# create the output GeoDataFrame
gdf = gpd.GeoDataFrame(geometry = self.centroids.gdf['geometry'],
crs = self.centroids.gdf.crs)
col_names = [f'{tresh_inten}' for tresh_inten in threshold_intensities]
gdf[col_names] = return_periods.T
# create label and column_label
label = f'Return Periods ({rp_unit})'
column_label = lambda column_names: [f'Threshold Intensity: {col} {self.units}'
for col in column_names]
return gdf, label, column_label
def get_event_id(self, event_name):
"""Get an event id from its name. Several events might have the same
name.
Parameters
----------
event_name: str
Event name
Returns
-------
list_id: np.array(int)
"""
list_id = self.event_id[[i_name for i_name, val_name in enumerate(self.event_name)
if val_name == event_name]]
if list_id.size == 0:
raise ValueError(f"No event with name: {event_name}")
return list_id
def get_event_name(self, event_id):
"""Get the name of an event id.
Parameters
----------
event_id: int
id of the event
Returns
-------
str
Raises
------
ValueError
"""
try:
return self.event_name[np.argwhere(
self.event_id == event_id)[0][0]]
except IndexError as err:
raise ValueError(f"No event with id: {event_id}") from err
def get_event_date(self, event=None):
"""Return list of date strings for given event or for all events,
if no event provided.
Parameters
----------
event: str or int, optional
event name or id.
Returns
-------
l_dates: list(str)
"""
if event is None:
l_dates = [u_dt.date_to_str(date) for date in self.date]
elif isinstance(event, str):
ev_ids = self.get_event_id(event)
l_dates = [
u_dt.date_to_str(self.date[np.argwhere(self.event_id == ev_id)[0][0]])
for ev_id in ev_ids]
else:
ev_idx = np.argwhere(self.event_id == event)[0][0]
l_dates = [u_dt.date_to_str(self.date[ev_idx])]
return l_dates
def calc_year_set(self):
"""From the dates of the original events, get number yearly events.
Returns
-------
orig_yearset: dict
key are years, values array with event_ids of that year
"""
orig_year = np.array([dt.datetime.fromordinal(date).year
for date in self.date[self.orig]])
orig_yearset = {}
for year in np.unique(orig_year):
orig_yearset[year] = self.event_id[self.orig][orig_year == year]
return orig_yearset
def remove_duplicates(self):
"""Remove duplicate events (events with same name and date)."""
events = list(zip(self.event_name, self.date))
set_ev = set(events)
if len(set_ev) == self.event_id.size:
return
unique_pos = sorted([events.index(event) for event in set_ev])
for var_name, var_val in vars(self).items():
if isinstance(var_val, sparse.csr_matrix):
setattr(self, var_name, var_val[unique_pos, :])
elif isinstance(var_val, np.ndarray) and var_val.ndim == 1:
setattr(self, var_name, var_val[unique_pos])
elif isinstance(var_val, list) and len(var_val) > 0:
setattr(self, var_name, [var_val[p] for p in unique_pos])
def set_frequency(self, yearrange=None):
"""Set hazard frequency from yearrange or intensity matrix.
Parameters
----------
yearrange: tuple or list, optional
year range to be used to compute frequency
per event. If yearrange is not given (None), the year range is
derived from self.date
"""
if self.frequency_unit not in ['1/year', 'annual', '1/y', '1/a']:
LOGGER.warning("setting the frequency on a hazard object who's frequency unit"
"is %s and not %s will most likely lead to unexpected results",
self.frequency_unit, u_const.DEF_FREQ_UNIT)
if not yearrange:
delta_time = dt.datetime.fromordinal(int(np.max(self.date))).year - \
dt.datetime.fromordinal(int(np.min(self.date))).year + 1
else:
delta_time = max(yearrange) - min(yearrange) + 1
num_orig = self.orig.nonzero()[0].size
if num_orig > 0:
ens_size = self.event_id.size / num_orig
else:
ens_size = 1
self.frequency = np.ones(self.event_id.size) / delta_time / ens_size
@property
def size(self):
"""Return number of events."""
return self.event_id.size
def _events_set(self):
"""Generate set of tuples with (event_name, event_date)"""
ev_set = set()
for ev_name, ev_date in zip(self.event_name, self.date):
ev_set.add((ev_name, ev_date))
return ev_set
def _loc_return_inten(self, return_periods, inten, exc_inten):
"""Compute local exceedence intensity for given return period.
Parameters
----------
return_periods: np.array
return periods to consider
cen_pos: int
centroid position
Returns
-------
np.array
"""
# sorted intensity
sort_pos = np.argsort(inten, axis=0)[::-1, :]
columns = np.ones(inten.shape, int)
# pylint: disable=unsubscriptable-object # pylint/issues/3139
columns *= np.arange(columns.shape[1])
inten_sort = inten[sort_pos, columns]
# cummulative frequency at sorted intensity
freq_sort = self.frequency[sort_pos]
np.cumsum(freq_sort, axis=0, out=freq_sort)
for cen_idx in range(inten.shape[1]):
exc_inten[:, cen_idx] = self._cen_return_inten(
inten_sort[:, cen_idx], freq_sort[:, cen_idx],
self.intensity_thres, return_periods)
def _loc_return_period(self, threshold_intensities, inten):
"""Compute local return periods for user-specified threshold intensities
for a subset of hazard centroids
Parameters
----------
threshold_intensities: np.array
User-specified hazard intensities for which the return period should be calculated
locally (at each centroid).
inten: np.array
subarray of full hazard intensities corresponding to a subset of the centroids
(rows corresponds to events, columns correspond to centroids)
Returns
-------
np.array
(rows corresponds to threshold_intensities, columns correspond to centroids)
"""
# Assuming inten is sorted and calculating cumulative frequency
sort_pos = np.argsort(inten, axis=0)[::-1, :]
inten_sort = inten[sort_pos, np.arange(inten.shape[1])]
freq_sort = self.frequency[sort_pos]
freq_sort = np.cumsum(freq_sort, axis=0)
return_periods = np.zeros((len(threshold_intensities), inten.shape[1]))
for cen_idx in range(inten.shape[1]):
sorted_inten_cen = inten_sort[:, cen_idx]
cum_freq_cen = freq_sort[:, cen_idx]
for i, intensity in enumerate(threshold_intensities):
# Find the first occurrence where the intensity is less than the sorted intensities
exceedance_index = np.searchsorted(sorted_inten_cen[::-1], intensity, side='left')
# Calculate exceedance probability
if exceedance_index < len(cum_freq_cen):
exceedance_probability = cum_freq_cen[-exceedance_index - 1]
else:
exceedance_probability = 0 # Or set a default minimal probability
# Calculate and store return period
if exceedance_probability > 0:
return_periods[i, cen_idx] = 1 / exceedance_probability
else:
return_periods[i, cen_idx] = np.nan
return return_periods
def _check_events(self):
"""Check that all attributes but centroids contain consistent data.
Put default date, event_name and orig if not provided. Check not
repeated events (i.e. with same date and name)
Raises
------
ValueError
"""
num_ev = len(self.event_id)
num_cen = self.centroids.size
if np.unique(self.event_id).size != num_ev:
raise ValueError("There are events with the same identifier.")
u_check.check_obligatories(self.__dict__, self.vars_oblig, 'Hazard.',
num_ev, num_ev, num_cen)
u_check.check_optionals(self.__dict__, self.vars_opt, 'Hazard.', num_ev)
self.event_name = u_check.array_default(num_ev, self.event_name,
'Hazard.event_name', list(self.event_id))
self.date = u_check.array_default(num_ev, self.date, 'Hazard.date',
np.ones(self.event_id.shape, dtype=int))
self.orig = u_check.array_default(num_ev, self.orig, 'Hazard.orig',
np.zeros(self.event_id.shape, dtype=bool))
if len(self._events_set()) != num_ev:
raise ValueError("There are events with same date and name.")
@staticmethod
def _cen_return_inten(inten, freq, inten_th, return_periods):
"""From ordered intensity and cummulative frequency at centroid, get
exceedance intensity at input return periods.
Parameters
----------
inten: np.array
sorted intensity at centroid
freq: np.array
cummulative frequency at centroid
inten_th: float
intensity threshold
return_periods: np.array
return periods
Returns
-------
np.array
"""
inten_th = np.asarray(inten > inten_th).squeeze()
inten_cen = inten[inten_th]
freq_cen = freq[inten_th]
if not inten_cen.size:
return np.zeros((return_periods.size,))
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=1)
except ValueError:
pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=0)
inten_fit = np.polyval(pol_coef, np.log(1 / return_periods))
wrong_inten = (return_periods > np.max(1 / freq_cen)) & np.isnan(inten_fit)
inten_fit[wrong_inten] = 0.
return inten_fit
def append(self, *others):
"""Append the events and centroids to this hazard object.
All of the given hazards must be of the same type and use the same units as self. The
centroids of all hazards must have the same CRS.
The following kinds of object attributes are processed:
- All centroids are combined together using `Centroids.union`.
- Lists, 1-dimensional arrays (NumPy) and sparse CSR matrices (SciPy) are concatenated.
Sparse matrices are concatenated along the first (vertical) axis.
For any other type of attribute: A ValueError is raised if an attribute of that name is
not defined in all of the non-empty hazards at least. However, there is no check that the
attribute value is identical among the given hazard objects. The initial attribute value of
`self` will not be modified.
Note: Each of the hazard's `centroids` attributes might be modified in place in the sense
that missing properties are added, but existing ones are not overwritten. In case of raster
centroids, conversion to point centroids is applied so that raster information (meta) is
lost. For more information, see `Centroids.union`.
Parameters
----------
others : one or more climada.hazard.Hazard objects
Hazard instances to append to self
Raises
------
TypeError, ValueError
See Also
--------
Hazard.concat : concatenate 2 or more hazards
Centroids.union : combine centroids
"""
# pylint: disable=no-member, protected-access
if len(others) == 0:
return
haz_list = [self] + list(others)
haz_list_nonempty = [haz for haz in haz_list if haz.size > 0]
for haz in haz_list:
haz._check_events()
# check type, unit, and attribute consistency among hazards
haz_types = {haz.haz_type for haz in haz_list if haz.haz_type != ''}
if len(haz_types) > 1:
raise ValueError(f"The given hazards are of different types: {haz_types}. "
"The hazards are incompatible and cannot be concatenated.")
self.haz_type = haz_types.pop()
haz_classes = {type(haz) for haz in haz_list}
if len(haz_classes) > 1:
raise TypeError(f"The given hazards are of different classes: {haz_classes}. "
"The hazards are incompatible and cannot be concatenated.")
freq_units = {haz.frequency_unit for haz in haz_list}
if len(freq_units) > 1:
raise ValueError(f"The given hazards have different frequency units: {freq_units}. "
"The hazards are incompatible and cannot be concatenated.")
self.frequency_unit = freq_units.pop()
units = {haz.units for haz in haz_list if haz.units != ''}
if len(units) > 1:
raise ValueError(f"The given hazards use different units: {units}. "
"The hazards are incompatible and cannot be concatenated.")
if len(units) == 0:
units = {''}
self.units = units.pop()
attributes = sorted(set.union(*[set(vars(haz).keys()) for haz in haz_list]))
for attr_name in attributes:
if not all(hasattr(haz, attr_name) for haz in haz_list_nonempty):
raise ValueError(f"Attribute {attr_name} is not shared by all hazards. "
"The hazards are incompatible and cannot be concatenated.")
# map individual centroids objects to union
centroids = Centroids.union(*[haz.centroids for haz in haz_list])
hazcent_in_cent_idx_list = [
u_coord.match_coordinates(haz.centroids.coord, centroids.coord, threshold=0)
for haz in haz_list_nonempty
]
# concatenate array and list attributes of non-empty hazards
for attr_name in attributes:
attr_val_list = [getattr(haz, attr_name) for haz in haz_list_nonempty]
if isinstance(attr_val_list[0], sparse.csr_matrix):
# map sparse matrix onto centroids
setattr(self, attr_name, sparse.vstack([
sparse.csr_matrix(
(matrix.data, cent_idx[matrix.indices], matrix.indptr),
shape=(matrix.shape[0], centroids.size)
)
for matrix, cent_idx in zip(attr_val_list, hazcent_in_cent_idx_list)
], format='csr'))
elif isinstance(attr_val_list[0], np.ndarray) and attr_val_list[0].ndim == 1:
setattr(self, attr_name, np.hstack(attr_val_list))
elif isinstance(attr_val_list[0], list):
setattr(self, attr_name, sum(attr_val_list, []))
self.centroids = centroids
self.sanitize_event_ids()
@classmethod
def concat(cls, haz_list):
"""
Concatenate events of several hazards of same type.
This function creates a new hazard of the same class as the first hazard in the given list
and then applies the `append` method. Please refer to the docs of `Hazard.append` for
caveats and limitations of the concatenation procedure.
For centroids, lists, arrays and sparse matrices, the remarks in `Hazard.append`
apply. All other attributes are copied from the first object in `haz_list`.
Note that `Hazard.concat` can be used to concatenate hazards of a subclass. The result's
type will be the subclass. However, calling `concat([])` (with an empty list) is equivalent
to instantiation without init parameters. So, `Hazard.concat([])` is equivalent to
`Hazard()`. If `HazardB` is a subclass of `Hazard`, then `HazardB.concat([])` is equivalent
to `HazardB()` (unless `HazardB` overrides the `concat` method).
Parameters
----------
haz_list : list of climada.hazard.Hazard objects
Hazard instances of the same hazard type (subclass).
Returns
-------
haz_concat : instance of climada.hazard.Hazard
This will be of the same type (subclass) as all the hazards in `haz_list`.
See Also
--------
Hazard.append : append hazards to a hazard in place
Centroids.union : combine centroids
"""
if len(haz_list) == 0:
return cls()
haz_concat = haz_list[0].__class__(centroids=Centroids(lat=[], lon=[],
crs=haz_list[0].centroids.crs))
for attr_name, attr_val in vars(haz_list[0]).items():
# to save memory, only copy simple attributes like
# "units" that are not explicitly handled by Hazard.append
if not (isinstance(attr_val, (list, np.ndarray, sparse.csr_matrix))
or attr_name in ["centroids"]):
setattr(haz_concat, attr_name, copy.deepcopy(attr_val))
haz_concat.append(*haz_list)
return haz_concat
def change_centroids(self, centroids, threshold=u_coord.NEAREST_NEIGHBOR_THRESHOLD):
"""
Assign (new) centroids to hazard.
Centoids of the hazard not in centroids are mapped onto the nearest
point. Fails if a point is further than threshold from the closest
centroid.
The centroids must have the same CRS as self.centroids.
Parameters
----------
haz: Hazard