Skip to content

Commit

Permalink
Add support for reading data in multiple files
Browse files Browse the repository at this point in the history
`NCFileReader`s can now read multiple files at the same time. The files
have to contain temporal data for the given variable and they are
aggregated along the time dimension. To use this feature, just pass a
vector of file paths to the constructor.

This capability still has to be surfaced to the TimeVaryingInputs.
  • Loading branch information
Sbozzolo committed Nov 15, 2024
1 parent f2c4a62 commit 3b19e7a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 35 deletions.
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ ClimaUtilities.jl Release Notes
main
------

#### Support reading time data across multiple files. PR [#127](https://github.com/CliMA/ClimaUtilities.jl/pull/127)

`NCFileReader`s can now read multiple files at the same time. The files have to
contain temporal data for the given variable and they are aggregated along the
time dimension. To use this feature, just pass a vector of file paths to the
constructor.

v0.1.18
------

Expand Down
28 changes: 18 additions & 10 deletions docs/src/filereaders.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ At this point, the implemented `FileReaders` are always linked to a specific
variable and they come with a caching system to avoid unnecessary reads.

Future extensions might include:
- dealing with multiple files containing the same variables (e.g. time series when the dates are split in different files);
- doing chunked reads;
- async reads.

Expand All @@ -20,8 +19,10 @@ Future extensions might include:
> This extension is loaded when loading `NCDatasets`
The only file reader currently implemented is the `NCFileReader`, used to read
NetCDF files. Each `NCFileReader` is associated to one particular file and
variable (but multiple `NCFileReader`s can share the same file).
NetCDF files. Each `NCFileReader` is associated to a collection of files
(possibly just one) and one variable (but multiple `NCFileReader`s can share the
same file). When a `NCFileReader` is constructed with multiple files, the
various files should contain the time development of the given variable.

Once created, `NCFileReader` is accessed with the `read!(file_reader, date)`
function, which returns the `Array` associated to given `date` (if available).
Expand All @@ -31,13 +32,14 @@ preallocated array so it can be accessed multiple times without reallocating.
`NCFileReader`s implement two additional features: (1) optional preprocessing,
and (2) cache reads. `NCFileReader`s can be created with a `preprocessing_func`
keyword argument, function is applied to the read datasets when `read`ing.
`preprocessing_func` should be a lightweight function, such as removing `NaN`s or changing units.
Every time `read(file_reader, date)` is called, the `NCFileReader` checks if the
`date` is currently stored in the cache. If yes, it just returns the value (without
accessing the disk). If not, it reads and process the data and adds it to the
cache. This uses a least-recently-used (LRU) cache implemented in `DataStructures`,
which removes the least-recently-used data stored in the cache when its maximum
size is reached (the default max size is 128).
`preprocessing_func` should be a lightweight function, such as removing `NaN`s
or changing units. Every time `read(file_reader, date)` is called, the
`NCFileReader` checks if the `date` is currently stored in the cache. If yes, it
just returns the value (without accessing the disk). If not, it reads and
process the data and adds it to the cache. This uses a least-recently-used (LRU)
cache implemented in `DataStructures`, which removes the least-recently-used
data stored in the cache when its maximum size is reached (the default max size
is 128).

It is good practice to always close the `NCFileReader`s when they are no longer
needed. The function `close_all_ncfiles` closes all the ones that are currently
Expand Down Expand Up @@ -74,6 +76,12 @@ close(v_var)
# Alternatively: FileReaders.close_all_ncfiles()
```

Suppose now that the data is split in multiple years, we can read them as with a
single `NCFileReader` simply by passing the list of files:
```julia
u_var = FileReaders.NCFileReader(["era5_2000.nc", "era5_2001.nc", "era5_2002.nc"], "u")
```

## API

```@docs
Expand Down
86 changes: 62 additions & 24 deletions ext/NCFileReaderExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,45 @@ include("nc_common.jl")

# We allow multiple NCFileReader to share the same underlying NCDataset. For this, we put
# all the NCDataset into a dictionary where we keep track of them. OPEN_NCFILES is
# dictionary that maps file paths to a Tuple with the first element being the NCDataset and
# the second element being a Set of Strings, the variables that are being read from that
# file. Every time a NCFileReader is created, this Set is modified by adding or removing a
# varname.
const OPEN_NCFILES = Dict{String, Tuple{NCDatasets.NCDataset, Set{String}}}()
# dictionary that maps Vector of file paths (or a single string) to a Tuple with the first
# element being the NCDataset and the second element being a Set of Strings, the variables
# that are being read from that file. Every time a NCFileReader is created, this Set is
# modified by adding or removing a varname.
const OPEN_NCFILES =
Dict{Union{String, Vector{String}}, Tuple{NetCDFDataset, Set{String}}}()

"""
NCFileReader
A struct to read and process NetCDF files.
NCFileReader wants to be smart, e.g., caching reads, or spinning the I/O off to a different
thread (not implemented yet).
thread (not implemented yet). Multiple NetCDF files can be read at the same time as long as
they can be aggregated along the time dimension.
"""
struct NCFileReader{
STR1 <: AbstractString,
VSTR <: Vector{STR} where {STR <: AbstractString},
STR2 <: AbstractString,
DIMS <: Tuple,
NC <: NCDatasets.NCDataset,
NC <: NetCDFDataset,
DATES <: AbstractArray{Dates.DateTime},
PREP <: Function,
CACHE <: DataStructures.LRUCache{Dates.DateTime, <:AbstractArray},
} <: FileReaders.AbstractFileReader
"""Path of the NetCDF file"""
file_path::STR1
"""Path of the NetCDF file(s)"""
file_paths::VSTR

"""Name of the dataset in the NetCDF file"""
"""Name of the dataset in the NetCDF files"""
varname::STR2

"""A tuple of arrays with the various physical dimensions where the data is defined
(e.g., lon/lat)"""
dimensions::DIMS

"""A vector of DateTime collecting all the available dates in the file"""
"""A vector of DateTime collecting all the available dates in the files"""
available_dates::DATES

"""NetCDF file opened by NCDataset. Don't forget to close the reader!"""
"""NetCDF dataset opened by NCDataset. Don't forget to close the reader!"""
dataset::NC

"""Optional function that is applied to the read dataset. Useful to do unit-conversion
Expand All @@ -64,27 +66,63 @@ end

"""
FileReaders.NCFileReader(
file_path::AbstractString,
file_paths,
varname::AbstractString;
preprocess_func = identity,
cache_max_size:Int = 128,
)
A struct to efficiently read and process NetCDF files.
When more than one file is passed, the files should contain the time development of one or
multiple variables. Files are joined along the time dimension.
## Argument
`file_paths` can be a string, or a collection of paths to files that contain the
same variables but at different times.
"""
function FileReaders.NCFileReader(
file_path::AbstractString,
file_paths,
varname::AbstractString;
preprocess_func = identity,
cache_max_size::Int = 128,
)
# file_paths could be a vector/tuple or a string. Let's start by standarizing to a
# vector
file_paths isa AbstractString && (file_paths = [file_paths])
only_one_file = length(file_paths) == 1

# If we have more than one file, we have to aggregate them
aggtime_kwarg = ()
if !only_one_file
# Let's first try to identify the time dimension, if it exists. To do that, we open the
# first dataset. We need this to aggregate multiple datasets, if data is split across
# multiple files
NCDatasets.NCDataset(first(file_paths)) do first_dataset
is_time = x -> x == "time" || x == "date" || x == "t"
time_dims = filter(is_time, NCDatasets.dimnames(first_dataset))
if !isempty(time_dims)
aggtime_kwarg = (:aggdim => first(time_dims),)
else
error("Multiple files given, but no time dimension")
end
end
end

# When we have only one file, we have to pass it as a String
file_path_to_ncdataset = only_one_file ? first(file_paths) : file_paths

# Get dataset from global dictionary. If not available, open new file and add entry to
# global dictionary
# Get dataset from global dictionary. If not available, open the new dataset and add
# entry to global dictionary
dataset, open_varnames = get!(
OPEN_NCFILES,
file_path,
(NCDatasets.NCDataset(file_path), Set([varname])),
file_paths, # We map the collection of files to the dataset
(
NCDatasets.NCDataset(file_path_to_ncdataset; aggtime_kwarg...),
Set([varname]),
),
)
# push! will do nothing when file is opened for the first time
push!(open_varnames, varname)
Expand Down Expand Up @@ -124,7 +162,7 @@ function FileReaders.NCFileReader(
)

return NCFileReader(
file_path,
file_paths,
varname,
dimensions,
available_dates,
Expand All @@ -143,14 +181,14 @@ Close `NCFileReader`. If no other `NCFileReader` is using the same file, close t
function Base.close(file_reader::NCFileReader)
# If we don't have the key, we don't have to do anything (we already closed
# the file)
file_is_not_open = !haskey(OPEN_NCFILES, file_reader.file_path)
file_is_not_open && return nothing
files_are_not_open = !haskey(OPEN_NCFILES, file_reader.file_paths)
files_are_not_open && return nothing

open_variables = OPEN_NCFILES[file_reader.file_path][end]
open_variables = OPEN_NCFILES[file_reader.file_paths][end]
pop!(open_variables, file_reader.varname)
if isempty(open_variables)
NCDatasets.close(file_reader.dataset)
delete!(OPEN_NCFILES, file_reader.file_path)
delete!(OPEN_NCFILES, file_reader.file_paths)
end
return nothing
end
Expand Down
6 changes: 5 additions & 1 deletion ext/nc_common.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# For a single and multi-file dataset
const NetCDFDataset =
Union{NCDatasets.NCDataset, NCDatasets.CommonDataModel.MFDataset}

"""
read_available_dates(ds::NCDatasets.NCDataset)
Return all the dates in the given NCDataset. The dates are read from the "time"
or "date" datasets. If none is available, return an empty vector.
"""
function read_available_dates(ds::NCDatasets.NCDataset)
function read_available_dates(ds::NetCDFDataset)
if "time" in keys(ds.dim)
return Dates.DateTime.(
reinterpret.(Ref(NCDatasets.DateTimeStandard), ds["time"][:])
Expand Down
11 changes: 11 additions & 0 deletions test/file_readers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ using NCDatasets
close(ncreader_u)
@test isempty(open_ncfiles)
end

# Test times split across multiple files
PATHS = [
joinpath(@__DIR__, "test_data", "era5_1979_1.0x1.0_lai.nc"),
joinpath(@__DIR__, "test_data", "era5_1980_1.0x1.0_lai.nc"),
]
NCDataset(PATHS, aggdim = "time") do nc
ncreader_agg = FileReaders.NCFileReader(PATHS, "lai_lv")
FileReaders.available_dates(ncreader_agg) == nc["time"][:]
length(FileReaders.available_dates(ncreader_agg)) == 8
end
end

@testset "NCFileReader without time" begin
Expand Down
Binary file added test/test_data/era5_1979_1.0x1.0_lai.nc
Binary file not shown.
Binary file added test/test_data/era5_1980_1.0x1.0_lai.nc
Binary file not shown.

0 comments on commit 3b19e7a

Please sign in to comment.