Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ out-*
*.c
*.pyc
*.dSYM/*
**/*.zarr/*

.idea/*
Profile.prof
Expand Down
1 change: 1 addition & 0 deletions environment_py3_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ dependencies:
- nbval
- scikit-learn
- pykdtree
- zarr
1 change: 1 addition & 0 deletions environment_py3_osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ dependencies:
- nbval
- scikit-learn
- pykdtree
- zarr
1 change: 1 addition & 0 deletions environment_py3_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ dependencies:
- pytest
- nbval
- pykdtree
- zarr
213 changes: 145 additions & 68 deletions parcels/examples/tutorial_output.ipynb

Large diffs are not rendered by default.

143 changes: 53 additions & 90 deletions parcels/particlefile/baseparticlefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from abc import ABC
from abc import abstractmethod

import netCDF4
import numpy as np

try:
Expand Down Expand Up @@ -56,7 +55,6 @@ class BaseParticleFile(ABC):
outputdt = None
lasttime_written = None
dataset = None
metadata = None
name = None
particleset = None
parcels_mesh = None
Expand All @@ -81,7 +79,6 @@ def __init__(self, name, particleset, outputdt=np.infty, write_ondelete=False, c
self.lasttime_written = None # variable to check if time has been written already

self.dataset = None
self.metadata = {}
if pset_info:
for v in pset_info.keys():
setattr(self, v, pset_info[v])
Expand Down Expand Up @@ -110,6 +107,11 @@ def __init__(self, name, particleset, outputdt=np.infty, write_ondelete=False, c

self.file_list = []

self.metadata = {"feature_type": "trajectory", "Conventions": "CF-1.6/CF-1.7",
"ncei_template_version": "NCEI_NetCDF_Trajectory_Template_v2.0",
"parcels_version": parcels_version,
"parcels_mesh": self.parcels_mesh}

tmp_dir = tempwritedir
if tempwritedir is None:
tmp_dir = os.path.join(os.path.dirname(str(self.name)), "out-%s" % ''.join(random.choice(string.ascii_uppercase) for _ in range(8)))
Expand All @@ -136,109 +138,73 @@ def _reserved_var_names(self):
"""
pass

def open_netcdf_file(self, data_shape):
"""Initialise NetCDF4.Dataset for trajectory output.
def open_output_file(self, data_shape):
"""Initialise file for trajectory output.
The output follows the format outlined in the Discrete Sampling Geometries
section of the CF-conventions:
http://cfconventions.org/cf-conventions/v1.6.0/cf-conventions.html#discrete-sampling-geometries
The current implementation is based on the NCEI template:
http://www.nodc.noaa.gov/data/formats/netcdf/v2.0/trajectoryIncomplete.cdl

:param data_shape: shape of the variables in the NetCDF4 file
:param data_shape: shape of the variables in the output file
"""
extension = os.path.splitext(str(self.name))[1]
fname = self.name if extension in ['.nc', '.nc4'] else "%s.nc" % self.name
if os.path.exists(str(fname)):
os.remove(str(fname))

coords = self._create_trajectory_file(fname=fname, data_shape=data_shape)
self._create_trajectory_records(coords=coords)
self._create_metadata_records()

def close_netcdf_file(self):
self.dataset.close()

def _create_trajectory_file(self, fname, data_shape):
self.dataset = netCDF4.Dataset(fname, "w", format="NETCDF4")
self.dataset.createDimension("obs", data_shape[1])
self.dataset.createDimension("traj", data_shape[0])
coords = ("traj", "obs")
self.dataset.feature_type = "trajectory"
self.dataset.Conventions = "CF-1.6/CF-1.7"
self.dataset.ncei_template_version = "NCEI_NetCDF_Trajectory_Template_v2.0"
self.dataset.parcels_version = parcels_version
self.dataset.parcels_mesh = self.parcels_mesh
return coords

def _create_trajectory_records(self, coords):
self.fname = self.name if extension in ['.nc', '.nc4', '.zarr'] else "%s.nc" % self.name
self.outputformat = extension
if os.path.exists(str(self.fname)):
if 'zarr' in self.outputformat:
shutil.rmtree(str(self.fname))
else:
os.remove(str(self.fname))
self.attrs = self._create_variables_attribute_dict()

def _create_variables_attribute_dict(self):
"""
creates the NetCDF record structure of a trajectory.
creates the dictionary with variable attributes.

Attention:
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
"""
# Create ID variable according to CF conventions
self.id = self.dataset.createVariable("trajectory", "i8", coords, fill_value=-2**(63)) # minint64 fill_value
self.id.long_name = "Unique identifier for each particle"
self.id.cf_role = "trajectory_id"

# Create time, lat, lon and z variables according to CF conventions:
self.time = self.dataset.createVariable("time", "f8", coords, fill_value=np.nan)
self.time.long_name = ""
self.time.standard_name = "time"
if self.time_origin.calendar is None:
self.time.units = "seconds"
else:
self.time.units = "seconds since " + str(self.time_origin)
self.time.calendar = 'standard' if self.time_origin.calendar == 'np_datetime64' else self.time_origin.calendar
self.time.axis = "T"

if self.lonlatdepth_dtype is np.float64:
lonlatdepth_precision = "f8"
else:
lonlatdepth_precision = "f4"

if ('lat' in self.var_names):
self.lat = self.dataset.createVariable("lat", lonlatdepth_precision, coords, fill_value=np.nan)
self.lat.long_name = ""
self.lat.standard_name = "latitude"
self.lat.units = "degrees_north"
self.lat.axis = "Y"

if ('lon' in self.var_names):
self.lon = self.dataset.createVariable("lon", lonlatdepth_precision, coords, fill_value=np.nan)
self.lon.long_name = ""
self.lon.standard_name = "longitude"
self.lon.units = "degrees_east"
self.lon.axis = "X"

if ('depth' in self.var_names) or ('z' in self.var_names):
self.z = self.dataset.createVariable("z", lonlatdepth_precision, coords, fill_value=np.nan)
self.z.long_name = ""
self.z.standard_name = "depth"
self.z.units = "m"
self.z.positive = "down"
attrs = {'z': {"long_name": "",
"standard_name": "depth",
"units": "m",
"positive": "down"},
'trajectory': {"long_name": "Unique identifier for each particle",
"cf_role": "trajectory_id",
"_FillValue": self.fill_value_map[np.int64]},
'time': {"long_name": "",
"standard_name": "time",
"units": "seconds",
"axis": "T"},
'lon': {"long_name": "",
"standard_name": "longitude",
"units": "degrees_east",
"axis":
"X"},
'lat': {"long_name": "",
"standard_name": "latitude",
"units": "degrees_north",
"axis": "Y"}}

if self.time_origin.calendar is not None:
attrs['time']['units'] = "seconds since " + str(self.time_origin)
attrs['time']['calendar'] = 'standard' if self.time_origin.calendar == 'np_datetime64' else self.time_origin.calendar

for vname, dtype in zip(self.var_names, self.var_dtypes):
if vname not in self._reserved_var_names():
fill_value = self.fill_value_map[dtype]
nc_dtype_fmt = self.fmt_map[dtype]
setattr(self, vname, self.dataset.createVariable(vname, nc_dtype_fmt, coords, fill_value=fill_value))
getattr(self, vname).long_name = ""
getattr(self, vname).standard_name = vname
getattr(self, vname).units = "unknown"
attrs[vname] = {"_FillValue": self.fill_value_map[dtype],
"long_name": "",
"standard_name": vname,
"units": "unknown"}

for vname, dtype in zip(self.var_names_once, self.var_dtypes_once):
fill_value = self.fill_value_map[dtype]
nc_dtype_fmt = self.fmt_map[dtype]
setattr(self, vname, self.dataset.createVariable(vname, nc_dtype_fmt, "traj", fill_value=fill_value))
getattr(self, vname).long_name = ""
getattr(self, vname).standard_name = vname
getattr(self, vname).units = "unknown"
attrs[vname] = {"_FillValue": self.fill_value_map[dtype],
"long_name": "",
"standard_name": vname,
"units": "unknown"}

def _create_metadata_records(self):
for name, message in self.metadata.items():
setattr(self.dataset, name, message)
return attrs

def __del__(self):
if self.convert_at_end:
Expand All @@ -260,10 +226,7 @@ def add_metadata(self, name, message):
:param name: Name of the metadata variabale
:param message: message to be written
"""
if self.dataset is None:
self.metadata[name] = message
else:
setattr(self.dataset, name, message)
self.metadata[name] = message

def dump_dict_to_npy(self, data_dict, data_dict_once):
"""Buffer data to set of temporary numpy files, using np.save"""
Expand Down
19 changes: 14 additions & 5 deletions parcels/particlefile/particlefileaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from glob import glob
import numpy as np
import xarray as xr

try:
from mpi4py import MPI
Expand Down Expand Up @@ -97,12 +98,14 @@ def read_from_npy(self, file_list, n_timesteps, var, dtype):
data[id_ind, t_ind] = data_dict[var][ii]
time_index[id_ind] = time_index[id_ind] + 1

if dtype == np.bool_:
data = data.astype(np.bool_)
# remove rows and columns that are completely filled with nan values
return data[time_index > 0, :]

def export(self):
"""
Exports outputs in temporary NPY-files to NetCDF file
Exports outputs in temporary NPY-files to output file (either netcdf or zarr)

Attention:
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
Expand Down Expand Up @@ -145,18 +148,24 @@ def export(self):
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']

ds = xr.Dataset(attrs=pset_info_local['metadata'])
for var, dtype in zip(self.var_names, self.var_dtypes):
data = self.read_from_npy(global_file_list, n_timesteps, var, dtype)
if var == self.var_names[0]:
self.open_netcdf_file(data.shape)
self.open_output_file(data.shape)
varout = 'z' if var == 'depth' else var
getattr(self, varout)[:, :] = data
varout = 'trajectory' if varout == 'id' else varout
ds[varout] = xr.DataArray(data=data, dims=["traj", "obs"], attrs=self.attrs[varout])

if len(self.var_names_once) > 0:
n_timesteps_once = {}
for i in n_timesteps:
n_timesteps_once[i] = 1
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, n_timesteps_once, var, dtype)
data = self.read_from_npy(global_file_list_once, n_timesteps_once, var, dtype)
ds[var] = xr.DataArray(data=data.flatten(), dims=["traj"], attrs=self.attrs[var])

self.close_netcdf_file()
if 'zarr' in self.outputformat:
ds.to_zarr(self.fname)
else:
ds.to_netcdf(self.fname)
19 changes: 14 additions & 5 deletions parcels/particlefile/particlefilesoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from glob import glob
import numpy as np
import xarray as xr

try:
from mpi4py import MPI
Expand Down Expand Up @@ -97,12 +98,14 @@ def read_from_npy(self, file_list, n_timesteps, var, dtype):
data[id_ind, t_ind] = data_dict[var][ii]
time_index[id_ind] = time_index[id_ind] + 1

if dtype == np.bool_:
data = data.astype(np.bool_)
# remove rows and columns that are completely filled with nan values
return data[time_index > 0, :]

def export(self):
"""
Exports outputs in temporary NPY-files to NetCDF file
Exports outputs in temporary NPY-files to output file (either netcdf or zarr)

Attention:
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
Expand Down Expand Up @@ -146,18 +149,24 @@ def export(self):
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']

ds = xr.Dataset(attrs=pset_info_local['metadata'])
for var, dtype in zip(self.var_names, self.var_dtypes):
data = self.read_from_npy(global_file_list, n_timesteps, var, dtype)
if var == self.var_names[0]:
self.open_netcdf_file(data.shape)
self.open_output_file(data.shape)
varout = 'z' if var == 'depth' else var
getattr(self, varout)[:, :] = data
varout = 'trajectory' if varout == 'id' else varout
ds[varout] = xr.DataArray(data=data, dims=["traj", "obs"], attrs=self.attrs[varout])

if len(self.var_names_once) > 0:
n_timesteps_once = {}
for i in n_timesteps:
n_timesteps_once[i] = 1
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, n_timesteps_once, var, dtype)
data = self.read_from_npy(global_file_list_once, n_timesteps_once, var, dtype)
ds[var] = xr.DataArray(data=data.flatten(), dims=["traj"], attrs=self.attrs[var])

self.close_netcdf_file()
if 'zarr' in self.outputformat:
ds.to_zarr(self.fname)
else:
ds.to_netcdf(self.fname)
Loading