Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dei 109 duration analysis group #40

Merged
merged 8 commits into from
Aug 17, 2023
111 changes: 89 additions & 22 deletions decoimpact/business/entities/rules/time_aggregation_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import List

import xarray as _xr
import numpy as _np
from xarray.core.resample import DataArrayResample

from decoimpact.business.entities.rules.i_array_based_rule import IArrayBasedRule
Expand Down Expand Up @@ -126,6 +127,12 @@ def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataAr
Returns:
DataArray: Values of operation type
"""
period_operations = [
TimeOperationType.COUNT_PERIODS,
TimeOperationType.MAX_DURATION_PERIODS,
TimeOperationType.AVG_DURATION_PERIODS
]

if self._operation_type is TimeOperationType.ADD:
result = aggregated_values.sum()

Expand All @@ -141,8 +148,8 @@ def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataAr
elif self._operation_type is TimeOperationType.MEDIAN:
result = aggregated_values.median()

elif self._operation_type is TimeOperationType.COUNT_PERIODS:
result = aggregated_values.reduce(self.count_groups, dim="time")
elif self._operation_type in period_operations:
result = aggregated_values.reduce(self.analyze_groups, dim="time")

else:
raise NotImplementedError(
Expand All @@ -152,37 +159,97 @@ def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataAr

return _xr.DataArray(result)

def count_groups(self, elem, axis, **kwargs):
"""In an array with 0 and 1, count the amount of times the
groups of 1 occur.
def count_groups(self, elem):
"""
Count the amount of times the groups of 1 occur.

Args:
elem (Array): the data array in N-dimensions

Returns:
List: list with the counted periods
"""
# in case of an example array with 5 values [1,1,0,1,0]:
# subtract last 4 values from the first 4 values: [1,0,1,0] - [1,1,0,1]:
# (the result of this example differences: [0,-1,1,0])
differences = _np.diff(elem)
# First add the first element of the array to the difference array (as this
# could also indicate a beginning of a group or not and the diff is calculated
# from the second element)
# when the difference of two neighbouring elements is 1, this indicates the
# start of a group. to count the number of groups: count the occurences of
# difference == 1: (the result of this examples: 1 + 1 = 2)
differences = _np.append(differences, elem[0])
return _np.count_nonzero(differences == 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the description says that the first value is added after counting the occurences. The code does the opposite, first append, then count. Which is correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the code was changed and forgot to update the text. The first element of the array is either 0 or 1, so it is an addition to the sum, could be done first or second, but because I made a switch to use count_nonzero I moved the adding to the line above. Will adjust the comment


def duration_groups(self, elem):
"""
Create an array that cumulative sums the values of the groups in the array,
but restarts when a 0 occurs. For example the array: [0, 1, 1, 0, 1, 1, 1, 0, 1]
This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]

Args:
elem (List): the data array in N-dimensions

Returns:
List: List with the duration of the periods
"""
# Function to create a cumsum over the groups (where the elements in elem are 1)
cumsum_groups = _np.frompyfunc(lambda a, b: a + b if b == 1 else 0, 2, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leuke functie, maar ik snap het niet goed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an example in the description of the duration_groups function explaining what the result is of this function. a and b are consecutive values when looping over the array in this function.

return cumsum_groups.accumulate(elem)

def analyze_groups(self, elem, axis, **kwargs):
"""This function analyzes the input array (N-dimensional array containing 0
and 1) The function will reduce the array over the time axis, depending on a
certain time operation type. Below are the operation types with what this
function will do to this example input array: [0, 1, 1, 0, 1, 0]. A period
is all consecutive 1 values.
- COUNT_PERIODS: count the amount of periods (result: 2)
- MAX_DURATION_PERIODS: gives the longest period (result: 2)
- AVG_DURATION_PERIODS: gives the average of periods (result: 1.5)

Args:
elem (Array): the data array in N-dimensions
axis (integer): the number of axis of the array
axis (integer): the value describing the time axis

Returns:
array: array with the counted periods, with the same dimensions as elem
array: array with the analyzed periods, with the same dimensions as elem
"""
# in case of 1 dimension:
if axis == 0:
# in case of an example array with 5 values [1,1,0,1,0]:
# subtract last 4 values from the first 4 values: [1,0,1,0] - [1,1,0,1]:
# (the result of this example differences: [0,-1,1,0])
differences = elem[1:] - elem[:-1]
# when the difference of two neighbouring elements is 1,
# this indicates the start of a group; to count the number of groups:
# count the occurences of difference=1, and then add the first value:
# (the result of this examples: 1 + 1 = 2)
group_count = sum(map(lambda x: x == 1, differences)) + elem[0]
no_axis = len(_np.shape(elem))

# The reduce function that calls this analyze_groups function should be reduces
# over the time axis. The argument axis in this function gives a number of which
# axis is in fact the time axis. This axis needs to move to the last position,
# because we need to reduce the N-dimensional arary to a 1D array with all the
# values in time for a specific cell in order to do the calculation for that
# cell. Because we are looping over the N-dimensional array iteratively, we
# should only move the time axis the first time this function is called (so when
# the axis is not yet set to -1!)
if axis != -1:
elem = _np.moveaxis(elem, axis, -1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this exactly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put in a comment

axis = -1

# in case of 1 dimension:
if no_axis == 1:
if self._operation_type is TimeOperationType.COUNT_PERIODS:
group_result = self.count_groups(elem)
elif self._operation_type is TimeOperationType.MAX_DURATION_PERIODS:
group_result = _np.max((self.duration_groups(elem)))
elif self._operation_type is TimeOperationType.AVG_DURATION_PERIODS:
period = _np.sum(elem)
group_count = self.count_groups(elem)
group_result = period / group_count

# in case of multiple dimensions:
else:
group_count = []
group_result = []
for sub_elem in elem:
# loop through this recursive function, determine output per axis:
group_count_row = self.count_groups(sub_elem, axis - 1)
group_result_row = self.analyze_groups(sub_elem, axis)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I think I understand it a bit more

# add the result to the list of results, per axis:
group_count.append(group_count_row)
return group_count
group_result.append(group_result_row)

return group_result

def _get_time_dimension_name(self, variable: _xr.DataArray, logger: ILogger) -> str:
"""Retrieves the dimension name
Expand Down
2 changes: 2 additions & 0 deletions decoimpact/data/api/time_operation_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ class TimeOperationType(IntEnum):
AVERAGE = 4
MEDIAN = 5
COUNT_PERIODS = 6
MAX_DURATION_PERIODS = 7
AVG_DURATION_PERIODS = 8
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""
Tests for time aggregation rule
for operation types:
- COUNT_PERIODS
- MAX_DURATION_PERIODS
- AVG_DURATION_PERIODS
"""
import numpy as _np
import pytest
Expand Down Expand Up @@ -66,7 +70,7 @@ def test_validation_when_not_valid():
assert not valid


def test_count_groups_function_not_only_1_and_0():
def test_analyze_groups_function_not_only_1_and_0():
"""Test whether it gives an error if the data array contains
other values than 0 and 1"""
logger = Mock(ILogger)
Expand Down Expand Up @@ -115,7 +119,15 @@ def test_count_groups_function_not_only_1_and_0():
assert exception_raised.args[0] == expected_message


def test_count_groups_function():
@pytest.mark.parametrize(
"operation_type, expected_result_data",
[
("COUNT_PERIODS", [2, 2, 2, 2]),
("MAX_DURATION_PERIODS", [2, 2, 3, 3]),
("AVG_DURATION_PERIODS", [1.5, 1.5, 2, 2])
],
)
def test_analyze_groups_function(operation_type, expected_result_data):
"""Test the count_groups to count groups for several examples.

This function is being used when 'count_periods' is given
Expand All @@ -127,7 +139,7 @@ def test_count_groups_function():
rule = TimeAggregationRule(
name="test",
input_variable_names=["foo"],
operation_type=TimeOperationType.COUNT_PERIODS,
operation_type=TimeOperationType[operation_type],
)
t_data = [0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1]
t_time = [
Expand All @@ -154,25 +166,32 @@ def test_count_groups_function():
]
t_time = [_np.datetime64(t) for t in t_time]
input_array = _xr.DataArray(t_data, coords=[t_time], dims=["time"])
result = input_array.resample(time="Y").reduce(rule.count_groups)
result = input_array.resample(time="Y").reduce(rule.analyze_groups)

# expected results
expected_result_time = ["2000-12-31", "2001-12-31", "2002-12-31", "2003-12-31"]
expected_result_time = [_np.datetime64(t) for t in expected_result_time]
expected_result_data = [2, 2, 2, 2]
expected_result = _xr.DataArray(
expected_result_data, coords=[expected_result_time], dims=["time"]
)

assert _xr.testing.assert_equal(expected_result, result) is None


def test_count_groups_function_2d():
@pytest.mark.parametrize(
"operation_type, expected_result_data",
[
("COUNT_PERIODS", [[2, 2, 2, 2], [1, 2, 2, 2], [2, 1, 2, 2]]),
("MAX_DURATION_PERIODS", [[2, 2, 3, 3], [1, 2, 3, 3], [2, 2, 3, 3]]),
("AVG_DURATION_PERIODS", [[1.5, 1.5, 2, 2], [1, 1.5, 2, 2], [1.5, 2, 2, 2]])
],
)
def test_analyze_groups_function_2d(operation_type, expected_result_data):
"""Test if functional for 2d arrays"""
rule = TimeAggregationRule(
name="test",
input_variable_names=["foo"],
operation_type=TimeOperationType.COUNT_PERIODS,
operation_type=TimeOperationType[operation_type],
)

t_data = [
Expand Down Expand Up @@ -207,16 +226,11 @@ def test_count_groups_function_2d():
input_array = _xr.DataArray(
t_data, coords=[t_cells, t_time], dims=["cells", "time"]
)
result = input_array.resample(time="Y").reduce(rule.count_groups)
result = input_array.resample(time="Y").reduce(rule.analyze_groups)

# expected results
expected_result_time = ["2000-12-31", "2001-12-31", "2002-12-31", "2003-12-31"]
expected_result_time = [_np.datetime64(t) for t in expected_result_time]
expected_result_data = [
[2, 2, 2, 2],
[1, 2, 2, 2],
[2, 1, 2, 2],
]
expected_result = _xr.DataArray(
expected_result_data,
coords=[t_cells, expected_result_time],
Expand All @@ -226,12 +240,29 @@ def test_count_groups_function_2d():
assert _xr.testing.assert_equal(expected_result, result) is None


def test_count_groups_function_3d():
@pytest.mark.parametrize(
"operation_type, expected_result_data",
[
("COUNT_PERIODS", [
[[2, 2, 2, 2], [1, 2, 2, 2], [2, 1, 2, 2]],
[[2, 2, 2, 2], [1, 2, 2, 2], [2, 1, 2, 2]]
]),
("MAX_DURATION_PERIODS", [
[[2, 2, 3, 3], [1, 2, 3, 3], [2, 2, 3, 3]],
[[2, 2, 3, 3], [1, 2, 3, 3], [2, 2, 3, 3]]
]),
("AVG_DURATION_PERIODS", [
[[1.5, 1.5, 2, 2], [1, 1.5, 2, 2], [1.5, 2, 2, 2]],
[[1.5, 1.5, 2, 2], [1, 1.5, 2, 2], [1.5, 2, 2, 2]]
])
],
)
def test_count_groups_function_3d(operation_type, expected_result_data):
"""Test if functional for multiple dimensions"""
rule = TimeAggregationRule(
name="test",
input_variable_names=["foo"],
operation_type=TimeOperationType.COUNT_PERIODS,
operation_type=TimeOperationType[operation_type],
)

t_data = [[
Expand Down Expand Up @@ -271,20 +302,11 @@ def test_count_groups_function_3d():
input_array = _xr.DataArray(
t_data, coords=[t_cols, t_cells, t_time], dims=["cols", "cells", "time"]
)
result = input_array.resample(time="Y").reduce(rule.count_groups)
result = input_array.resample(time="Y").reduce(rule.analyze_groups)

# expected results
expected_result_time = ["2000-12-31", "2001-12-31", "2002-12-31", "2003-12-31"]
expected_result_time = [_np.datetime64(t) for t in expected_result_time]
expected_result_data = [[
[2, 2, 2, 2],
[1, 2, 2, 2],
[2, 1, 2, 2],
], [
[2, 2, 2, 2],
[1, 2, 2, 2],
[2, 1, 2, 2],
]]
expected_result = _xr.DataArray(
expected_result_data,
coords=[t_cols, t_cells, expected_result_time],
Expand Down