Skip to content

Commit

Permalink
Merge 31b2ec9 into 2f14f11
Browse files Browse the repository at this point in the history
  • Loading branch information
juliasloan25 authored Sep 5, 2024
2 parents 2f14f11 + 31b2ec9 commit 87a4a98
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 46 deletions.
151 changes: 116 additions & 35 deletions ext/DataHandlingExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,51 @@ import ClimaUtilities.DataHandling

"""
DataHandler{
FR <: AbstractFileReader,
FR <: AbstractDict{String, AbstractFileReader},
REG <: AbstractRegridder,
SPACE <: ClimaCore.Spaces.AbstractSpace,
TSTART <: AbstractFloat,
DATES <: AbstractArray{Dates.DateTime},
DIMS,
TIMES <: AbstractArray{<:AbstractFloat},
CACHE <: DataStructures.LRUCache{Dates.DateTime, ClimaCore.Fields.Field},
FUNC <: Function,
}
Currently, the `DataHandler` works with one variable at the time. This might not be the most
Currently, the `DataHandler` remaps one variable at the time. This might not be the most
efficiently way to tackle the problem: we should be able to reuse the interpolation weights if
multiple variables are defined on the same grid and have to be remapped to the same grid.
Multiple input variables may be stored in the `DataHandler` by providing a dictionary
`file_readers` of variable names mapped to `FileReader` objects, along with a `compose_func`
that combines them into a single data variable. This is useful when we require variables
that are not directly available in the input data, but can be computed from the available variables.
The order of the variables in `varnames` must match the argument order of `compose_func`.
Assumptions:
- There is only one file with the entire time development of the given variable
- For each input variable, there is only one file with the entire time development of the given variable
- The file has well-defined physical dimensions (e.g., lat/lon)
- Currently, the time dimension has to be either "time" or "date", the spatial
dimensions have to be lat and lon (restriction from TempestRegridder)
- If using multiple input variables, the input variables must have the same time development
and spatial resolution
DataHandler is meant to live on the CPU, but the Fields can be on the GPU as well.
"""
struct DataHandler{
FR <: AbstractFileReader,
FR <: AbstractDict{String, AbstractFileReader},
REG <: AbstractRegridder,
SPACE <: ClimaCore.Spaces.AbstractSpace,
TSTART <: AbstractFloat,
DATES <: AbstractArray{Dates.DateTime},
DIMS,
TIMES <: AbstractArray{<:AbstractFloat},
CACHE <: DataStructures.LRUCache{Dates.DateTime, ClimaCore.Fields.Field},
FUNC <: Function,
NAMES <: Union{AbstractString, AbstractArray{AbstractString}},
}
"""Object responsible for getting the data from disk to memory"""
file_reader::FR
"""Dictionary of variable names and objects responsible for getting the input data from disk to memory"""
file_readers::FR

"""Object responsible for resampling a rectangular grid to the simulation grid"""
regridder::REG
Expand All @@ -76,11 +87,17 @@ struct DataHandler{

"""Private field where cached data is stored"""
_cached_regridded_fields::CACHE

"""Function to combine multiple input variables into a single data variable"""
compose_func::FUNC

"""Names of the datasets in the NetCDF that have to be read and processed"""
varnames::NAMES
end

"""
DataHandler(file_path::AbstractString,
varname::AbstractString,
DataHandler(file_paths::Union{AbstractString, AbstractArray{AbstractString}},
varnames::Union{AbstractString, AbstractArray{AbstractString}},
target_space::ClimaCore.Spaces.AbstractSpace;
reference_date::Dates.DateTime = Dates.DateTime(1979, 1, 1),
t_start::AbstractFloat = 0.0,
Expand All @@ -89,15 +106,17 @@ end
regridder_kwargs = (),
file_reader_kwargs = ())
Create a `DataHandler` to read `varname` from `file_path` and remap it to `target_space`.
Create a `DataHandler` to read `varnames` from `file_paths` and remap them to `target_space`.
`file_paths` may contain either one path for all variables or one path for each variable.
In the latter case, the entries of `file_paths` and `varnames` are expected to match based on position.
The DataHandler maintains an LRU cache of Fields that were previously computed.
Positional arguments
=====================
- `file_path`: Path of the NetCDF file that contains the data.
- `varname`: Name of the dataset in the NetCDF that has to be read and processed.
- `file_paths`: Paths of the NetCDF file(s) that contain the input data.
- `varnames`: Names of the datasets in the NetCDF that have to be read and processed.
- `target_space`: Space where the simulation is run, where the data has to be regridded to.
Keyword arguments
Expand All @@ -122,10 +141,13 @@ everything more type stable.)
It can be a NamedTuple, or a Dictionary that maps Symbols to values.
- `file_reader_kwargs`: Additional keywords to be passed to the constructor of the file reader.
It can be a NamedTuple, or a Dictionary that maps Symbols to values.
- `compose_func`: Function to combine multiple input variables into a single data variable.
The default, to be used in the case of one input variable, is the identity.
Note that the order of `varnames` must match the argument order of `compose_func`.
"""
function DataHandling.DataHandler(
file_path::AbstractString,
varname::AbstractString,
file_paths::Union{AbstractString, AbstractArray{AbstractString}},
varnames::Union{AbstractString, AbstractArray{AbstractString}},
target_space::ClimaCore.Spaces.AbstractSpace;
reference_date::Union{Dates.DateTime, Dates.Date} = Dates.DateTime(
1979,
Expand All @@ -136,16 +158,38 @@ function DataHandling.DataHandler(
regridder_type = nothing,
cache_max_size::Int = 128,
regridder_kwargs = (),
file_reader_kwargs = (),
file_reader_kwargs = ()compose_func::Function = identity,
)
# Verify that the number of file paths and variable names match
@assert (length(file_paths) == 1 || length(file_paths) == length(varnames)) "Number of file paths ($(length(file_paths))) and variable names ($(length(varnames))) do not match."

# Verify that `compose_func` is specified when using multiple input variables
@assert (length(varnames) == 1 || !(compose_func == identity)) "`compose_func` must be specified when using multiple input variables"

# Determine which regridder to use if not already specified
regridder_type =
isnothing(regridder_type) ? Regridders.default_regridder_type() :
regridder_type

# File reader, deals with ingesting data, possibly buffered/cached
file_reader = NCFileReader(file_path, varname; file_reader_kwargs...)
# Construct a file reader, which deals with ingesting data and is possibly buffered/cached, for each variable
file_readers = Dict()
# If there is only one file, we can use the same file reader for all variables
if length(file_paths) == 1
file_reader =
NCFileReader(file_paths[1], varnames[1]; file_reader_kwargs...)
merge!(file_readers, Dict(varnames[1] => file_reader))
else
# If there are multiple files, we need a file reader for each variable
for (file_path, varname) in zip(file_paths, varnames)
merge!(
file_readers,
Dict(
varname =>
NCFileReader(file_path, varname; file_reader_kwargs...),
),
) # TODO should kwargs vary between input vars? contains e.g. preprocess_func
end
end

regridder_args = ()

Expand All @@ -165,7 +209,9 @@ function DataHandling.DataHandler(
regridder_kwargs = merge((; regrid_dir), regridder_kwargs)
end

regridder_args = (target_space, varname, file_path)
# Note: using one arbitrary element of `varnames` and of `file_paths` assumes
# that all input variables will use the same regridding
regridder_args = (target_space, varnames[1], file_paths[1])
elseif regridder_type == :InterpolationsRegridder
regridder_args = (target_space,)
end
Expand All @@ -179,30 +225,37 @@ function DataHandling.DataHandler(
max_size = cache_max_size,
)

available_dates = file_reader.available_dates
# Note: using one arbitrary element of `file_readers` assumes
# that all input variables have the same time development
available_dates = collect(values(file_readers))[1].available_dates
times_s = period_to_seconds_float.(available_dates .- reference_date)
available_times = times_s .- t_start
dimensions = collect(values(file_readers))[1].dimensions

return DataHandler(
file_reader,
file_readers,
regridder,
target_space,
file_reader.dimensions,
dimensions,
available_dates,
t_start,
Dates.DateTime(reference_date),
available_times,
_cached_regridded_fields,
compose_func,
varnames,
)
end

"""
close(data_handler::DataHandler)
Close any file associated to the given `data_handler`.
Close all files associated to the given `data_handler`.
"""
function Base.close(data_handler::DataHandler)
close(data_handler.file_reader)
for (_, v) in data_handler.file_readers
close(v)
end
return nothing
end

Expand Down Expand Up @@ -345,33 +398,63 @@ Return the regridded snapshot from `data_handler` associated to the given `time`
The `time` has to be available in the `data_handler`.
When using multiple input variables, the `varnames` argument determines the order of arguments
to the `compose_func` function used to produce the data variable.
`regridded_snapshot` potentially modifies the internal state of `data_handler` and it might be a very
expensive operation.
"""
function DataHandling.regridded_snapshot(
data_handler::DataHandler,
date::Dates.DateTime,
date::Dates.DateTime;
)
varnames = data_handler.varnames
@assert (!isnothing(varnames) || length(data_handler.file_readers) == 1) "`varnames` must be specified when using multiple input variables"
compose_func = data_handler.compose_func

# Dates.DateTime(0) is the cache key for static maps
if date != Dates.DateTime(0)
date in data_handler.available_dates || error(
"Date $date not available in file $(data_handler.file_reader.file_path)",
)
file_path = data_handler.file_readers[varnames[1]].file_path
date in data_handler.available_dates ||
error("Date $date not available in file $(file_path)")
end

regridder_type = nameof(typeof(data_handler.regridder))
regrid_args = ()

return get!(data_handler._cached_regridded_fields, date) do
# If there are multiple input variables, we have to read them all and pass them to the
# compose function, then regrid the result
if length(data_handler.file_readers) > 1
# Read input data from each file, maintaining order
inputs = [read(file_readers[varname], date) for varname in varnames]
data_composed = compose_func(inputs...)

# Regrid the combined data
if regridder_type == :TempestRegridder
regrid_args = (date,)
elseif regridder_type == :InterpolationsRegridder
regrid_args =
(read(data_handler.file_reader, date), data_handler.dimensions)
regrid_args = (data_handler.dimensions,)
else
error("Uncaught case")
error("Invalid regridder type")
end

return regrid(data_handler.regridder, data_composed, regrid_args...)
else
# If there is only one input variable, we can regrid it directly
return get!(data_handler._cached_regridded_fields, date) do
if regridder_type == :TempestRegridder
regrid_args = (date,)
elseif regridder_type == :InterpolationsRegridder
regrid_args = (
read(data_handler.file_readers[varname], date),
data_handler.dimensions,
)
else
error("Invalid regridder type")
end

regrid(data_handler.regridder, regrid_args...)
end
regrid(data_handler.regridder, regrid_args...)
end
end

Expand All @@ -393,9 +476,7 @@ function DataHandling.regridded_snapshot(data_handler::DataHandler)
end

"""
regridded_snapshot(dest::ClimaCore.Fields.Field, data_handler::DataHandler, date::Dates.DateTime)
regridded_snapshot(data_handler::DataHandler, time::AbstractFloat)
regridded_snapshot(data_handler::DataHandler)
regridded_snapshot!(dest::ClimaCore.Fields.Field, data_handler::DataHandler, date::Dates.DateTime)
Write to `dest` the regridded snapshot from `data_handler` associated to the given `time`.
Expand All @@ -404,7 +485,7 @@ The `time` has to be available in the `data_handler`.
`regridded_snapshot!` potentially modifies the internal state of `data_handler` and it might be a very
expensive operation.
"""
function DataHandling.regridded_snapshot!(dest, data_handler, time)
function DataHandling.regridded_snapshot!(dest, data_handler, time;)
dest .= DataHandling.regridded_snapshot(data_handler, time)
return nothing
end
Expand Down
14 changes: 7 additions & 7 deletions ext/SpaceVaryingInputsExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ end

"""
SpaceVaryingInput(data_handler::DataHandler)
SpaceVaryingInput(file_path::AbstractString,
varname::AbstractString,
SpaceVaryingInput(file_paths::Union{AbstractString, AbstractArray{String}},
varnames::Union{AbstractString, AbstractArray{String}},
target_space::Spaces.AbstractSpace;
regridder_type::Symbol,
regridder_kwargs = (),
Expand All @@ -139,22 +139,22 @@ a parameter is defined on the surface of the Earth.
Returns a ClimaCore.Fields.Field of scalars; analogous to the 1D case which also
returns a ClimaCore.Fields.Field of scalars.
"""
function SpaceVaryingInputs.SpaceVaryingInput(data_handler)
function SpaceVaryingInputs.SpaceVaryingInput(data_handler;)
return regridded_snapshot(data_handler)
end

function SpaceVaryingInputs.SpaceVaryingInput(
file_path,
varname,
file_paths,
varnames,
target_space;
regridder_type = nothing,
regridder_kwargs = (),
file_reader_kwargs = (),
)
return SpaceVaryingInputs.SpaceVaryingInput(
DataHandler(
file_path,
varname,
file_paths,
varnames,
target_space;
regridder_type,
regridder_kwargs,
Expand Down
8 changes: 4 additions & 4 deletions ext/TimeVaryingInputsExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ function TimeVaryingInputs.TimeVaryingInput(
end

function TimeVaryingInputs.TimeVaryingInput(
file_path::AbstractString,
varname::AbstractString,
file_paths::Union{AbstractString, AbstractArray{String}},
varnames::Union{AbstractString, AbstractArray{String}},
target_space::ClimaCore.Spaces.AbstractSpace;
method = LinearInterpolation(),
reference_date::Dates.DateTime = Dates.DateTime(1979, 1, 1),
Expand All @@ -151,8 +151,8 @@ function TimeVaryingInputs.TimeVaryingInput(
file_reader_kwargs = (),
)
data_handler = DataHandling.DataHandler(
file_path,
varname,
file_paths,
varnames,
target_space;
reference_date,
t_start,
Expand Down

0 comments on commit 87a4a98

Please sign in to comment.