Skip to content

Commit

Permalink
Merge pull request #8 from M3Works/qa
Browse files Browse the repository at this point in the history
Start of QA Events
  • Loading branch information
micah-prime authored Nov 28, 2023
2 parents 58782db + 0bde1f6 commit a2e1c0a
Show file tree
Hide file tree
Showing 4 changed files with 834 additions and 15 deletions.
290 changes: 276 additions & 14 deletions metevents/events.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from .periods import CumulativePeriod
import logging
import numpy as np
import pandas as pd
from datetime import timedelta
from metloom.pointdata import CDECPointData, SnotelPointData, MesowestPointData
from pandas.tseries.frequencies import to_offset
from .utilities import determine_freq
from scipy.signal import find_peaks

# local imports
from metevents.periods import CumulativePeriod, BaseTimePeriod
from metevents.utilities import determine_freq


LOG = logging.getLogger(__name__)


class BaseEvents:
Expand Down Expand Up @@ -32,7 +40,8 @@ def find(self, *args, **kwargs):
def group_condition_by_time(ind):
ind_sum = ind.eq(False).cumsum()

# Isolate the ind_sum by positions that are True and group them together
# Isolate the ind_sum by positions that are
# True and group them together
time_groups = ind_sum.loc[ind.eq(True)].groupby(ind_sum)
groups = time_groups.groups
return groups, ind_sum
Expand All @@ -44,17 +53,17 @@ def from_station(cls, station_id, start, end):

class StormEvents(BaseEvents):

def find(self, instant_mass_to_start=0.1, min_storm_total=0.5, hours_to_stop=24,
max_storm_hours=336):
def find(self, instant_mass_to_start=0.1, min_storm_total=0.5,
hours_to_stop=24, max_storm_hours=336):
"""
Find all the storms that are initiated by a mass greater than the
instant_mass_to_start and receive less than that threshold for at
least hours_to_stop to end it. Storm delineation is further bounded by
min_storm_total and max_storm_hours.
Args:
instant_mass_to_start: mass per time step to consider the beginning of a
storm
instant_mass_to_start: mass per time step to consider the
beginning of a storm
min_storm_total: Total storm mass to be considered a complete storm
hours_to_stop: minimum hours of mass less than instant threshold to
end a storm
Expand Down Expand Up @@ -97,7 +106,9 @@ def find(self, instant_mass_to_start=0.1, min_storm_total=0.5, hours_to_stop=24,
storm_duration_too_long = duration > max_storm
# Has enough mass accumulated to be considered a storm
enough_storm_mass = total >= min_storm_total
base_condition = (enough_hours_wo_precip or storm_duration_too_long)
base_condition = (
enough_hours_wo_precip or storm_duration_too_long
)
condition = (base_condition and enough_storm_mass)

if condition or nx_idx == N_groups:
Expand All @@ -120,7 +131,8 @@ def from_station(cls, station_id, start, stop, station_name='unknown',
station_id: string id of the station of interest
start: Datetime object when to start looking for data
stop: Datetime object when to stop looking for data
source: Network/datasource to search for data options: NRCS, mesowest, CDEC
source: Network/datasource to search for data options:
NRCS, mesowest, CDEC
station_name: String name of the station to pass to pointdata
"""
pnt = None
Expand All @@ -131,19 +143,269 @@ def from_station(cls, station_id, start, stop, station_name='unknown',
break

if pnt is None:
raise ValueError(f'Datasource {source} is invalid. Use '
f'{", ".join([c.DATASOURCE for c in pnt_classes])}')
raise ValueError(
f'Datasource {source} is invalid. Use '
f'{", ".join([c.DATASOURCE for c in pnt_classes])}'
)

# Pull data
variable = pnt.ALLOWED_VARIABLES.PRECIPITATIONACCUM

df = pnt.get_daily_data(start, stop, [variable])

if df is None:
raise ValueError(f'The combination of pulling precip from {station_id} '
f'during {start}-{stop} produced no data. Check station '
f'is real and has precip data between specified dates.')
raise ValueError(
f'The combination of pulling precip from {station_id} '
f'during {start}-{stop} produced no data. Check station '
f'is real and has precip data between specified dates.'
)
else:
df = df.reset_index().set_index('datetime')

return cls(df[variable.name].diff())


class SpikeValleyEvent(BaseEvents):

def find(
self, height=None, threshold=None, prominence=100.0,
width=None
):
"""
Find instances of spikes or valleys within a timeseries
Args:
height: Required height of peaks
threshold: Required relative height to neighboring peaks
prominence: Required prominence of peaks
width: required width. Default is a min of 0 and max of 3 (0,3)
"""
ind = self.detect_spikes_using_find_peaks(
self.data, height=height, threshold=threshold,
prominence=prominence, width=width
)
# Group the events
groups, _ = self.group_condition_by_time(ind)
group_list = sorted(list(groups.items()))

# Build the list of events
for i, (event_id, curr_group) in enumerate(group_list):
curr_start = curr_group.min()
curr_stop = curr_group.max()
event = BaseTimePeriod(self.data.loc[curr_start:curr_stop])
self._events.append(event)

@staticmethod
def detect_spikes_using_find_peaks(
series, height=None, threshold=None, prominence=100.0,
width=None
):
"""
Detect spikes in time series data using the scipy find_peaks function
https://docs.scipy.org/doc/scipy/reference/generated/
scipy.signal.find_peaks.html
Args:
series: A pandas Series representing time series data.
height: Required height of peaks
threshold: Required relative height to neighboring peaks
prominence: Required prominence of peaks
width: required width. Default is a min of 0 and max of 3 (0,3)
Returns:
"""
width = width or (0, 3)

# find peaks
peaks, peak_info = find_peaks(
series, height=height, threshold=threshold, prominence=prominence,
width=width
)
# get the index span
peak_width_values = peak_info["widths"]

# find valleys
valleys, valley_info = find_peaks(
# Data gets flipped around y axis
series * -1.0,
height=height, threshold=threshold, prominence=prominence,
width=width
)
valley_width_values = valley_info["widths"]

spike_index = pd.Series(index=series.index, data=[False] * len(series))
# Set true for the width of the peak surrounding the center
for p, w in zip(peaks, peak_width_values):
p1 = int(p - w)
p2 = int(p + w) + 1
spike_index.iloc[p1:p2] = True
for p, w in zip(valleys, valley_width_values):
p1 = int(p - w)
p2 = int(p + w) + 1
spike_index.iloc[p1:p2] = True
return spike_index


class DataGapEvent(BaseEvents):

def find(self, min_len=3, expected_frequency="1D"):
"""
Find instances of data gaps
Args:
min_len: minimum length of a gap
expected_frequency: expected frequency of timeseries
"""
# find all nan indices
ind = pd.isna(self.data)

# Ensure the dataframe is sorted by index
self.data = self.data.sort_index()

# Calculate differences between consecutive timestamps
differences = self.data.index.to_series().diff()

# Assume a daily frequency for this example
expected_difference = pd.Timedelta(expected_frequency)

# Group the nan data events
groups, _ = self.group_condition_by_time(ind)

# Identify gap start points
# gap_starts = self.data.index[differences > expected_difference]
gaps = differences[differences > expected_difference].dropna()
for idg, gap in zip(gaps.index, gaps):
# TODO: this logic makes missing 4 days into a 6 day gap
gap_iloc = ind.index.get_loc(idg)
# Create a group of the missing indices
groups[gap_iloc] = pd.DatetimeIndex(
[idg - gap, idg]
)

# sort the group list
group_list = sorted(list(groups.items()))

# Build the list of events
for event_id, curr_group in group_list:
curr_start = curr_group.min()
curr_stop = curr_group.max()
event = BaseTimePeriod(self.data.loc[curr_start:curr_stop])
# only keep events that are longer than what is configured
if event.duration >= min_len * expected_difference:
self._events.append(event)


class FlatLineEvent(BaseEvents):

def find(self, min_len=5, slope_thresh=0.0):
"""
Find instances of flat-lined data
Args:
min_len: minimum length of a flat value
slope_thresh: slope threshold for flatline. Anything absolute
value of slope <=slope thresh will be flagged
"""
# find the slope
diff = self.data.diff()
# find the absolute slope within our threshold
ind = np.abs(diff) <= slope_thresh

# Group the nan data events
groups, _ = self.group_condition_by_time(ind)
# sort the group list
group_list = sorted(list(groups.items()))

# Build the list of events
for event_id, curr_group in group_list:
curr_start = curr_group.min()
curr_stop = curr_group.max()
event = BaseTimePeriod(self.data.loc[curr_start:curr_stop])
# only keep events that are longer than what is configured
if len(event.data) >= min_len:
self._events.append(event)


class ExtremeValueEvent(BaseEvents):

def find(self, expected_max=600.0, expected_min=0.0):
"""
Find events where the values are outside of the expected range
Args:
expected_max: Maximum expected value in the data
expected_min: minimum expected value in the data
"""
# Indices where data is outside of expected range
ind = (self.data > expected_max) | (self.data < expected_min)

# Group the nan data events
groups, _ = self.group_condition_by_time(ind)
# sort the group list
group_list = sorted(list(groups.items()))

# Build the list of events
for event_id, curr_group in group_list:
curr_start = curr_group.min()
curr_stop = curr_group.max()
event = BaseTimePeriod(self.data.loc[curr_start:curr_stop])
# store the events
self._events.append(event)


class ExtremeChangeEvent(BaseEvents):
"""
Find where the slope of the data is larger than expected over a certain
period of time
"""

def find(
self, min_len=1, positive_slope_thresh=None,
negative_slope_thresh=-3.0
):
"""
Find instances of excessive rate of change
Args:
min_len: minimum length of the event
positive_slope_thresh: Anything absolute
value of slope >=slope thresh will be flagged
negative_slope_thresh: slope threshold for flatline. Anything absolute
value of slope >=slope thresh will be flagged
"""

if positive_slope_thresh is None and negative_slope_thresh is None:
raise ValueError("One slope threshold must be provided")

# find the slope
diff = self.data.diff()

ind_pos = pd.Series([False] * len(diff), index=diff.index)
ind_neg = pd.Series([False] * len(diff), index=diff.index)
if positive_slope_thresh is not None:
ind_pos = diff >= positive_slope_thresh
if negative_slope_thresh is not None:
ind_neg = diff <= negative_slope_thresh

# join the index
ind = ind_pos | ind_neg

# Group the nan data events
groups, _ = self.group_condition_by_time(ind)
# sort the group list
group_list = sorted(list(groups.items()))

# Build the list of events
for event_id, curr_group in group_list:
curr_start = curr_group.min()
curr_stop = curr_group.max()
event = BaseTimePeriod(self.data.loc[curr_start:curr_stop])
# only keep events that are longer than what is configured
if len(event.data) >= min_len:
self._events.append(event)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

requirements = [
'metloom>=0.3.0,<1.0.0',
'scipy>=1.7.1,<2.0'
]

test_requirements = ['pytest>=3', ]
Expand Down
Loading

0 comments on commit a2e1c0a

Please sign in to comment.