Skip to content
Damien Irving edited this page Jun 4, 2021 · 29 revisions

Relevant code and packages

Relevant papers

Parallel processing on NCI

A local Dask cluster can be setup as follows:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

You can set up a Dask cluster under the OpenOnDemand environment across multiple nodes using the cloud-based SLURM cluster.

from dask.distributed import Client,Scheduler
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=16, memory="31GB")
client = Client(cluster)
cluster.scale(cores=32)

You can also use climtas.nci.GadiClient() following these instructions.

Data for playing around

Project xv83 on NCI (zarr format), e.g:
/g/data/xv83/ds0092/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-19841101/ZARR/atmos_isobaric_daily.zarr.zip

The control run that the simulation branched from (c5), data assimilation method (d60), perturbation method (pX) are indicated. They are the defining features of the f5 dataset.

The f5 dataset has 10 ensemble members and was run over a longer period of time, whereas f6 has 96 members over a shorter period.

Processing steps

Step 1: Pre-processing

  • Data extraction (for examples see Dougie's prepare*.ipynb files)
  • Region extraction (using regionmask)
  • Index calculation (possibly with ability to pass a custom function)
  • Bias correction (additive or multiplicative method)
get_data(files, region)
get_index()
bias_correct(model_data, obs_data, method="additive")

Some of these functions might need to write to file, because dask gets confused if many operations pile up. Outputs are typically a single zipped zarr file.

Step 2: Fidelity testing

Returns a mask (to remove gird points that fail the test) or a test statistic/s for each grid point corresponding to different alpha levels (e.g. 0.99, 0.95) that a mask could be created from.

Dougie's paper applies the Kolmogorov–Smirnov test (the 1D function comes from SciPy, the 2D he wrote himself) and other papers apply the Multiple-ooment test (1D only).

An issue is data selection where you want the same number of samples for each lead time (e.g. for a dataset where 10-year forecasts are initiated every year from 2005-2020, you don't get full overlap until 2014). Dougie has written a function stack_super_ensemble to subset the overlap data, but it's inefficient.

ks_test_1d()
ks_test_2d()
moment_test()

Step 3: The easier stuff

i.e. counting to determine the likelihood of a particular event.

Development notes

Design choices

Define functions that can be used to construct workflows in a Jupyter notebook or linked together in the main() function of Python scripts.

Patch extraction

A common operation involves taking observational datasets and converting the time axis to a dual initial date / lead time coordinate. This basically involves duplicating the data via overlapping windows, which is analogous to a process in image processing (to make images more blurry) called patch extraction. There are implementations available as part of skimage (skimage.util.view_as_windows), numpy (numpy.lib.stride_tricks.sliding_window_view), dask (dask.array.overlap.sliding_window_view) and xarray (xarray.core.rolling).

Data selection using multi-indexes

For many analyses we ultimately want to reduce all the non-spatial dimensions down to one single dimension. For example, this array,

ds_cafe

Dimensions:    (ensemble: 96, init_date: 4, lat: 17, lead_time: 3650, lon: 17)
Coordinates:
  * ensemble   (ensemble) int64 1 2 3 4 5 6 7 8 9 ... 88 89 90 91 92 93 94 95 96
  * init_date  (init_date) datetime64[ns] 1990-11-01 1991-11-01 ... 1993-11-01
  * lat        (lat) float64 -43.48 -41.46 -39.44 ... -15.17 -13.15 -11.12
  * lead_time  (lead_time) int64 0 1 2 3 4 5 6 ... 3644 3645 3646 3647 3648 3649
  * lon        (lon) float64 113.8 116.2 118.8 121.2 ... 146.2 148.8 151.2 153.8
    time       (lead_time, init_date) datetime64[ns] dask.array<chunksize=(3650, 4), meta=np.ndarray>
Data variables:
    precip     (init_date, lead_time, ensemble, lat, lon) float64 dask.array<chunksize=(1, 50, 96, 17, 17), meta=np.ndarray>

would simply become (lat, lon, sample), where sample is a MultiIndex with related ensemble, init_date and lead_time coordinate values.

ds_stacked = ds_cafe.stack({'sample': ['ensemble', 'init_date', 'lead_time']})
ds_stacked

Dimensions:    (lat: 17, lon: 17, sample: 1401600)
Coordinates:
  * lat        (lat) float64 -43.48 -41.46 -39.44 ... -15.17 -13.15 -11.12
  * lon        (lon) float64 113.8 116.2 118.8 121.2 ... 146.2 148.8 151.2 153.8
    time       (sample) datetime64[ns] dask.array<chunksize=(1401600,), meta=np.ndarray>
  * sample     (sample) MultiIndex
  - ensemble   (sample) int64 1 1 1 1 1 1 1 1 1 1 ... 96 96 96 96 96 96 96 96 96
  - init_date  (sample) datetime64[ns] 1990-11-01 1990-11-01 ... 1993-11-01
  - lead_time  (sample) int64 0 1 2 3 4 5 6 ... 3644 3645 3646 3647 3648 3649
Data variables:
    precip     (lat, lon, sample) float64 dask.array<chunksize=(17, 17, 14600), meta=np.ndarray>

We can index (i.e. select and slice) the data using references to ensemble, init_date, lead_time, but not time, which is a bit of a limitation because the work around for time selection involves creating a boolean array the same size as the data array.

I think this limitation will be overcome with the implementation of flexible indexes. A proposal has been developed for adding this feature and work has started as part of the xarray CZI grant.

Making functions dask-aware

Use apply_ufunc. Here's some examples:

Clone this wiki locally