Skip to content

Commit

Permalink
Merge pull request #300 from dcs4cop/forman-299-ts_perf_drop
Browse files Browse the repository at this point in the history
Fixes #299
  • Loading branch information
forman authored May 5, 2020
2 parents 22974c8 + 871781f commit 714593d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 34 deletions.
8 changes: 6 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### New in 0.5.0.dev2

* `xcube serve CUBE` will now use the last path component of `CUBE`as dataset title.
* `xcube serve CUBE` will now use the last path component of `CUBE` as dataset title.

### New in 0.5.0.dev1

Expand All @@ -17,7 +17,11 @@
the cube stored in bucket with URL `CUBE` will be accessed using the
credentials found in environment variables `AWS_ACCESS_KEY_ID` and
`AWS_SECRET_ACCESS_KEY`.


## Changes in 0.4.1

* Fixed time-series performance drop (#299).

## Changes in 0.4.0

### New
Expand Down
47 changes: 47 additions & 0 deletions test/webapi/controllers/test_timeseries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import unittest

import numpy as np
Expand All @@ -7,6 +8,7 @@

from test.mixins import AlmostEqualDeepMixin
from test.webapi.helpers import new_test_service_context
from xcube.webapi.context import ServiceContext
from xcube.webapi.controllers.timeseries import get_time_series, _collect_timeseries_result


Expand Down Expand Up @@ -273,3 +275,48 @@ def test_count(self):
{'count': 78, 'count_tot': 82, 'time': '2010-04-06T00:00:00Z'},
{'count': 74, 'count_tot': 82, 'time': '2010-04-07T00:00:00Z'}],
result)


@unittest.skipUnless(os.environ.get('XCUBE_TS_PERF_TEST') == '1', 'XCUBE_TS_PERF_TEST is not 1')
class TsPerfTest(unittest.TestCase):

def test_point_ts_perf(self):
TEST_CUBE = 'ts_test.zarr'

if not os.path.isdir(TEST_CUBE):
from xcube.core.new import new_cube
cube = new_cube(time_periods=2000, variables=dict(analysed_sst=280.4))
cube = cube.chunk(dict(time=1, lon=90, lat=90))
cube.to_zarr(TEST_CUBE)

ctx = ServiceContext(
base_dir='.',
config=dict(
Datasets=[
dict(Identifier='ts_test',
FileSystem='local',
Path=TEST_CUBE,
Format='zarr')
]
))

N = 5
import random
import time
time_sum = 0.0
for i in range(N):
lon = -180 + 360 * random.random()
lat = -90 + 180 * random.random()

t1 = time.perf_counter()
result = get_time_series(ctx, 'ts_test', 'analysed_sst', dict(type='Point', coordinates=[lon, lat]))
t2 = time.perf_counter()

self.assertIsInstance(result, list)
self.assertEqual(2000, len(result))

time_delta = t2 - t1
time_sum += time_delta
print(f'test {i + 1} took {time_delta} seconds')

print(f'all tests took {time_sum / N} seconds in average')
22 changes: 11 additions & 11 deletions xcube/core/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ def get_time_series(cube: xr.Dataset,

geometry = convert_geometry(geometry)

agg_methods = normalize_agg_methods(agg_methods)
if include_count:
warnings.warn("keyword argument 'include_count' has been deprecated, "
f"use 'agg_methods=[{AGG_COUNT!r}, ...]' instead")
agg_methods.add(AGG_COUNT)
if include_stdev:
warnings.warn("keyword argument 'include_stdev' has been deprecated, "
f"use 'agg_methods=[{AGG_STD!r}, ...]' instead")
agg_methods.add(AGG_STD)

dataset = select_variables_subset(cube, var_names)
if len(dataset.data_vars) == 0:
return None
Expand All @@ -127,6 +117,16 @@ def get_time_series(cube: xr.Dataset,
dataset = dataset.sel(lon=geometry.x, lat=geometry.y, method='Nearest')
return dataset.assign_attrs(max_number_of_observations=1)

agg_methods = normalize_agg_methods(agg_methods)
if include_count:
warnings.warn("keyword argument 'include_count' has been deprecated, "
f"use 'agg_methods=[{AGG_COUNT!r}, ...]' instead")
agg_methods.add(AGG_COUNT)
if include_stdev:
warnings.warn("keyword argument 'include_stdev' has been deprecated, "
f"use 'agg_methods=[{AGG_STD!r}, ...]' instead")
agg_methods.add(AGG_STD)

if geometry is not None:
dataset = mask_dataset_by_geometry(dataset, geometry, save_geometry_mask='__mask__')
if dataset is None:
Expand Down Expand Up @@ -170,7 +170,7 @@ def get_time_series(cube: xr.Dataset,


def normalize_agg_methods(agg_methods: Union[str, Sequence[str]],
exception_type = ValueError) -> Set[str]:
exception_type=ValueError) -> Set[str]:
agg_methods = agg_methods or [AGG_MEAN]
if isinstance(agg_methods, str):
agg_methods = [agg_methods]
Expand Down
44 changes: 23 additions & 21 deletions xcube/webapi/controllers/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
from typing import Dict, List, Optional, Union, Sequence, Any, Set, Tuple

import numpy as np
import pandas as pd
import shapely.geometry
import xarray as xr

from xcube.constants import LOG
from xcube.core import timeseries
from xcube.core.ancvar import find_ancillary_var_names
from xcube.core.timecoord import timestamp_to_iso_string
from xcube.util.geojson import GeoJSON
from xcube.util.perf import measure_time
from xcube.webapi.context import ServiceContext
Expand Down Expand Up @@ -207,14 +207,27 @@ def _collect_timeseries_result(time_series_ds: xr.Dataset,
if not (max_valids is None or max_valids == -1 or max_valids > 0):
raise ValueError('max_valids must be either None, -1 or positive')

vars = {key: time_series_ds[var_name] for key, var_name in key_to_var_names.items()}
time = time_series_ds.time
var_values_map = dict()
for key, var_name in key_to_var_names.items():
values = time_series_ds[var_name].values
if np.issubdtype(values.dtype, np.floating):
num_type = float
elif np.issubdtype(values.dtype, np.integer):
num_type = int
elif np.issubdtype(values.dtype, np.dtype(bool)):
num_type = bool
else:
raise ValueError(f'cannot convert {values.dtype} into JSON-convertible value')
var_values_map[key] = [(num_type(v) if f else None) for f, v in zip(np.isfinite(values), values)]

time_values = [t.isoformat() + 'Z' for t in pd.DatetimeIndex(time_series_ds.time.values).round('s')]

max_number_of_observations = time_series_ds.attrs.get('max_number_of_observations', 1)
num_times = time.size
num_times = len(time_values)
time_series = []

max_valids_is_pos = max_valids is not None and max_valids > 0
if max_valids_is_pos:
max_valids_is_positive = max_valids is not None and max_valids > 0
if max_valids_is_positive:
time_indexes = range(num_times - 1, -1, -1)
else:
time_indexes = range(num_times)
Expand All @@ -226,35 +239,24 @@ def _collect_timeseries_result(time_series_ds: xr.Dataset,

time_series_value = dict()
all_null = True
for key, var in vars.items():
var_values = var.values
for key, var_values in var_values_map.items():
var_value = var_values[time_index]
if np.isfinite(var_value):
if var_value is not None:
all_null = False
if np.issubdtype(var_value.dtype, np.floating):
var_value = float(var_value)
elif np.issubdtype(var_value.dtype, np.integer):
var_value = int(var_value)
elif np.issubdtype(var_value.dtype, np.dtype(bool)):
var_value = bool(var_value)
else:
raise ValueError(f'cannot convert {var_value.dtype} into JSON-convertible value')
else:
var_value = None
time_series_value[key] = var_value

has_count = 'count' in time_series_value
no_obs = all_null or (has_count and time_series_value['count'] == 0)
if no_obs and max_valids is not None:
continue

time_series_value['time'] = timestamp_to_iso_string(time[time_index].values)
time_series_value['time'] = time_values[time_index]
if has_count:
time_series_value['count_tot'] = max_number_of_observations

time_series.append(time_series_value)

if max_valids_is_pos:
if max_valids_is_positive:
time_series = time_series[::-1]

return time_series
Expand Down

0 comments on commit 714593d

Please sign in to comment.