-
Notifications
You must be signed in to change notification settings - Fork 21
/
io.jl
1658 lines (1505 loc) · 56.6 KB
/
io.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#=
Code to support input and output of data and configuration.
Input data can be loaded from netCDF files.
Output data can be written to netCDF or CSV files.
For configuration files we use TOML.
=#
"""Turn "a.aa.aaa" into (:a, :aa, :aaa)"""
symbols(s) = Tuple(Symbol(x) for x in split(s, '.'))
"""Turn symbols"a.aa.aaa" into (:a, :aa, :aaa)"""
macro symbols_str(s)
Tuple(Symbol(x) for x in split(s, '.'))
end
"Get a nested field using a tuple of Symbols"
param(obj, fields) = foldl(getproperty, fields; init = obj)
param(obj, fields::AbstractString) = param(obj, symbols(fields))
function param(obj, fields, default)
try
return param(obj, fields)
catch
return default
end
end
"""
Config(path::AbstractString)
Config(dict::AbstractDict)
Config(dict::Dict{String,Any}, path::Union{String,Nothing})
Struct that contains the parsed TOML configuration, as well as a reference to the TOML path,
if it exists. It behaves largely like a distionary, but it overloads `getproperty` and
`setproperty` to support syntax like `config.model.reinit = false`.
"""
struct Config
dict::Dict{String,Any} # nested key value mapping of all settings
path::Union{String,Nothing} # path to the TOML file, or nothing
end
Config(path::AbstractString) = Config(TOML.parsefile(path), path)
Config(dict::AbstractDict) = Config(dict, nothing)
# allows using getproperty, e.g. config.input.time instead of config["input"]["time"]
function Base.getproperty(config::Config, f::Symbol)
dict = Dict(config)
path = pathof(config)
a = dict[String(f)]
# if it is a Dict, wrap the result in Config to keep the getproperty behavior
return a isa AbstractDict ? Config(a, path) : a
end
function Base.setproperty!(config::Config, f::Symbol, x)
dict = Dict(config)
return dict[String(f)] = x
end
"Get a value from the Config with either the key or an alias of the key."
function get_alias(config::Config, key, alias, default)
alias_or_default = param(config, alias, default)
return param(config, key, alias_or_default)
end
"Get a value from the Config with a key, throwing an error if it is not one of the options."
function get_options(config::Config, key, options, default)
dict = Dict(config)
a = get(dict, key, default)
if !(a in options)
error("TOML key `$key` is set to $(repr(a)). Valid options are: $options.")
end
return a
end
# also used in autocomplete
Base.propertynames(config::Config) = collect(keys(Dict(config)))
Base.haskey(config::Config, key) = haskey(Dict(config), key)
Base.keys(config::Config) = keys(Dict(config))
Base.values(config::Config) = values(Dict(config))
Base.pairs(config::Config) = pairs(Dict(config))
Base.get(config::Config, key, default) = get(Dict(config), key, default)
Base.getindex(config::Config, key) = getindex(Dict(config), key)
Base.setindex!(config::Config, val, key) = setindex!(Dict(config), val, key)
Base.pop!(config::Config, key) = pop!(Dict(config), key)
Base.pop!(config::Config, key, default) = pop!(Dict(config), key, default)
Base.Dict(config::Config) = getfield(config, :dict)
Base.pathof(config::Config) = getfield(config, :path)
Base.iterate(config::Config) = iterate(Dict(config))
Base.iterate(config::Config, state) = iterate(Dict(config), state)
function Base.dirname(config::Config)
path = pathof(config)
return path === nothing ? nothing : dirname(path)
end
function combined_path(config::Config, dir::AbstractString, path::AbstractString)
tomldir = dirname(config)
return normpath(tomldir, dir, path)
end
"Construct a path relative to both the TOML directory and the optional `dir_input`"
function input_path(config::Config, path::AbstractString)
dir = get(config, "dir_input", ".")
return combined_path(config, dir, path)
end
"Construct a path relative to both the TOML directory and the optional `dir_output`"
function output_path(config::Config, path::AbstractString)
dir = get(config, "dir_output", ".")
return combined_path(config, dir, path)
end
"Extract netCDF variable name `ncname` from `var` (type `String` or `Config`). If `var` has
type `Config`, either `scale`, `offset` and an optional `index` are expected (with `ncname`)
or a `value` (uniform value), these are stored as part of `NamedTuple` `modifier`."
function ncvar_name_modifier(var; config = nothing)
ncname = nothing
modifier = (scale = 1.0, offset = 0.0, value = nothing, index = nothing)
if isa(var, Config)
if haskey(var, "netcdf") &&
haskey(var.netcdf, "variable") &&
haskey(var.netcdf.variable, "name")
ncname = var.netcdf.variable.name
scale = param(var, "scale", 1.0)
offset = param(var, "offset", 0.0)
if haskey(var, "layer") || haskey(var, "class")
haskey(var, "layer") ? dim_name = "layer" : dim_name = "class"
if length(var[dim_name]) > 1
# if modifier is provided as a list for each dim item
indices = []
for i = 1:length(var[dim_name])
index = get_index_dimension(var, config, var[dim_name][i])
@info "NetCDF parameter `$ncname` is modified with scale `$(scale[i])` and offset `$(offset[i])` at index `$index`."
push!(indices, index)
end
modifier =
(scale = scale, offset = offset, value = nothing, index = indices)
else
index = get_index_dimension(var, config, var[dim_name])
modifier =
(scale = scale, offset = offset, value = nothing, index = index)
@info "NetCDF parameter `$ncname` is modified with scale `$scale` and offset `$offset` at index `$index`."
end
else
modifier =
(scale = scale, offset = offset, value = nothing, index = nothing)
@info "NetCDF parameter `$ncname` is modified with scale `$scale` and offset `$offset`."
end
elseif haskey(var, "value")
modifier =
(scale = 1.0, offset = 0.0, value = param(var, "value"), index = nothing)
else
error("Unrecognized modifier $(Dict(var))")
end
elseif isa(var, String)
ncname = var
else
error("Unknown type")
end
return ncname, modifier
end
"Extract a netCDF variable at a given time"
function get_at(
ds::CFDataset,
varname::AbstractString,
times::AbstractVector{<:TimeType},
t::TimeType,
)
# this behaves like a backward fill interpolation
i = findfirst(>=(t), times)
t < first(times) && throw(DomainError("time $t before dataset begin $(first(times))"))
i === nothing && throw(DomainError("time $t after dataset end $(last(times))"))
return get_at(ds, varname, i)
end
function get_at(ds::CFDataset, varname::AbstractString, i)
return read_standardized(ds, varname, (x = :, y = :, time = i))
end
function get_param_res(model)
Dict(
symbols"vertical.precipitation" => model.lateral.river.reservoir.precipitation,
symbols"vertical.potential_evaporation" =>
model.lateral.river.reservoir.evaporation,
)
end
function get_param_lake(model)
Dict(
symbols"vertical.precipitation" => model.lateral.river.lake.precipitation,
symbols"vertical.potential_evaporation" => model.lateral.river.lake.evaporation,
)
end
mover_params = (symbols"vertical.precipitation", symbols"vertical.potential_evaporation")
function load_fixed_forcing(model)
@unpack reader, network, config = model
@unpack forcing_parameters = reader
do_reservoirs = get(config.model, "reservoirs", false)::Bool
do_lakes = get(config.model, "lakes", false)::Bool
mover_params =
(symbols"vertical.precipitation", symbols"vertical.potential_evaporation")
reverse_indices = network.land.reverse_indices
if do_reservoirs
sel_reservoirs = network.reservoir.indices_coverage
param_res = get_param_res(model)
end
if do_lakes
sel_lakes = network.lake.indices_coverage
param_lake = get_param_lake(model)
end
for (par, ncvar) in forcing_parameters
if ncvar.name === nothing
val = ncvar.value * ncvar.scale + ncvar.offset
param_vector = param(model, par)
param_vector .= val
# set fixed precipitation and evaporation over the lakes and reservoirs and put
# these into the lakes and reservoirs structs and set the precipitation and
# evaporation to 0 in the vertical model
if par in mover_params
if do_reservoirs
for (i, sel_reservoir) in enumerate(sel_reservoirs)
param_vector[reverse_indices[sel_reservoir]] .= 0
param_res[par][i] = val
end
end
if do_lakes
for (i, sel_lake) in enumerate(sel_lakes)
param_vector[reverse_indices[sel_lake]] .= 0
param_lake[par][i] = val
end
end
end
end
end
end
"Get dynamic netCDF input for the given time"
function update_forcing!(model)
@unpack vertical, clock, reader, network, config = model
@unpack dataset, dataset_times, forcing_parameters = reader
do_reservoirs = get(config.model, "reservoirs", false)::Bool
do_lakes = get(config.model, "lakes", false)::Bool
if do_reservoirs
sel_reservoirs = network.reservoir.indices_coverage
param_res = get_param_res(model)
end
if do_lakes
sel_lakes = network.lake.indices_coverage
param_lake = get_param_lake(model)
end
# Wflow expects `right` labeling of the forcing time interval, e.g. daily precipitation
# at 01-02-2000 00:00:00 is the accumulated total precipitation between 01-01-2000
# 00:00:00 and 01-02-2000 00:00:00.
# load from netCDF into the model according to the mapping
for (par, ncvar) in forcing_parameters
# no need to update fixed values
ncvar.name === nothing && continue
time = convert(eltype(dataset_times), clock.time)
data = get_at(dataset, ncvar.name, dataset_times, time)
if ncvar.scale != 1.0 || ncvar.offset != 0.0
data .= data .* ncvar.scale .+ ncvar.offset
end
# calculate the mean precipitation and evaporation over the lakes and reservoirs
# and put these into the lakes and reservoirs structs
# and set the precipitation and evaporation to 0 in the vertical model
if par in mover_params
if do_reservoirs
for (i, sel_reservoir) in enumerate(sel_reservoirs)
avg = mean(data[sel_reservoir])
data[sel_reservoir] .= 0
param_res[par][i] = avg
end
end
if do_lakes
for (i, sel_lake) in enumerate(sel_lakes)
avg = mean(data[sel_lake])
data[sel_lake] .= 0
param_lake[par][i] = avg
end
end
end
param_vector = param(model, par)
sel = active_indices(network, par)
data_sel = data[sel]
if any(ismissing, data_sel)
print(par)
msg = "Forcing data has missing values on active model cells for $(ncvar.name)"
throw(ArgumentError(msg))
end
param_vector .= data_sel
end
return model
end
"""
monthday_passed(curr, avail)
Given two monthday tuples such as (12, 31) and (12, 15), return true if the first argument
falls after or on the same day as the second argument, assuming the same year. The tuples
generally come from `Dates.monthday`.
# Examples
```julia-repl
julia> monthday_passed((12, 31), (12, 15))
true
```
"""
monthday_passed(curr, avail) = (curr[1] >= avail[1]) && (curr[2] >= avail[2])
"Get dynamic and cyclic netCDF input"
function load_dynamic_input!(model)
update_forcing!(model)
if haskey(model.config.input, "cyclic")
update_cyclic!(model)
end
end
"Get cyclic netCDF input for the given time"
function update_cyclic!(model)
@unpack vertical, clock, reader, network, config = model
@unpack cyclic_dataset, cyclic_times, cyclic_parameters = reader
# pick up the data that is valid for the past model time step
month_day = monthday(clock.time - clock.dt)
is_first_timestep = clock.iteration == 1
for (par, ncvar) in cyclic_parameters
if is_first_timestep || (month_day in cyclic_times[par])
# time for an update of the cyclic forcing
i = findlast(t -> monthday_passed(month_day, t), cyclic_times[par])
isnothing(i) &&
error("Could not find applicable cyclic timestep for $month_day")
# load from netCDF into the model according to the mapping
data = get_at(cyclic_dataset, ncvar.name, i)
param_vector = param(model, par)
sel = active_indices(network, par)
param_vector .= data[sel]
if ncvar.scale != 1.0 || ncvar.offset != 0.0
param_vector .= param_vector .* ncvar.scale .+ ncvar.offset
end
end
end
end
"""
nc_handles::Dict{String, NCDataset{Nothing}}
For each netCDF file that will be opened for writing, store an entry in this Dict from the
absolute path of the file to the NCDataset. This allows us to close the NCDataset if we try
to create them twice in the same session, and thus providing a workaround for this issue:
https://github.com/Alexander-Barth/NCDatasets.jl/issues/106
Note that using this will prevent automatic garbage collection and thus closure of the
NCDataset.
"""
const nc_handles = Dict{String,NCDataset{Nothing}}()
"Safely create a netCDF file, even if it has already been opened for creation"
function create_tracked_netcdf(path)
abs_path = abspath(path)
# close existing NCDataset if it exists
if haskey(nc_handles, abs_path)
# fine if it was already closed
close(nc_handles[abs_path])
end
# create directory if needed
mkpath(dirname(path))
ds = NCDataset(path, "c")
nc_handles[abs_path] = ds
return ds
end
"prepare an output dataset for scalar data"
function setup_scalar_netcdf(
path,
ncvars,
modelmap,
calendar,
time_units,
extra_dim,
config,
float_type = Float32,
)
ds = create_tracked_netcdf(path)
defDim(ds, "time", Inf) # unlimited
defVar(
ds,
"time",
Float64,
("time",),
attrib = ["units" => time_units, "calendar" => calendar],
)
set_extradim_netcdf(ds, extra_dim)
for (nc, netcdfvars) in zip(ncvars, config.netcdf.variable)
# Delft-FEWS requires the attribute :cf_role = "timeseries_id" when a netCDF file
# contains more than one location list
defVar(
ds,
nc.location_dim,
nc.locations,
(nc.location_dim,),
attrib = ["cf_role" => "timeseries_id"],
)
v = param(modelmap, nc.par)
if eltype(v) <: AbstractFloat
defVar(
ds,
nc.var,
float_type,
(nc.location_dim, "time"),
attrib = ["_FillValue" => float_type(NaN)],
)
elseif eltype(v) <: SVector
if haskey(netcdfvars, extra_dim.name)
# `extra_dim.name` as specified in the TOML file is used to index
defVar(
ds,
nc.var,
float_type,
(nc.location_dim, "time"),
attrib = ["_FillValue" => float_type(NaN)],
)
else
defVar(
ds,
nc.var,
float_type,
(nc.location_dim, extra_dim.name, "time"),
attrib = ["_FillValue" => float_type(NaN)],
)
end
else
error("Unsupported output type: ", typeof(v))
end
end
return ds
end
"set extra dimension in output netCDF file"
function set_extradim_netcdf(
ds,
extra_dim::NamedTuple{
(:name, :value),
Tuple{String,Vector{T}},
} where {T<:Union{String,Float64}},
)
# the axis attribute `Z` is required to import this type of 3D data by Delft-FEWS the
# values of this dimension `extra_dim.value` should be of type Float64
if extra_dim.name == "layer"
attributes =
["long_name" => "layer_index", "standard_name" => "layer_index", "axis" => "Z"]
elseif extra_dim.name == "classes"
attributes = [
"long_name" => extra_dim.name,
"standard_name" => extra_dim.name,
"axis" => "Z",
]
end
defVar(ds, extra_dim.name, extra_dim.value, (extra_dim.name,), attrib = attributes)
return nothing
end
set_extradim_netcdf(ds, extra_dim::Nothing) = nothing
"prepare an output dataset for grid data"
function setup_grid_netcdf(
path,
ncx,
ncy,
parameters,
calendar,
time_units,
extra_dim,
sizeinmetres;
float_type = Float32,
deflatelevel = 0,
)
ds = create_tracked_netcdf(path)
defDim(ds, "time", Inf) # unlimited
if sizeinmetres
defVar(
ds,
"x",
ncx,
("x",),
attrib = [
"long_name" => "x coordinate of projection",
"standard_name" => "projection_x_coordinate",
"axis" => "X",
"units" => "m",
],
deflatelevel = deflatelevel,
)
defVar(
ds,
"y",
ncy,
("y",),
attrib = [
"long_name" => "y coordinate of projection",
"standard_name" => "projection_y_coordinate",
"axis" => "Y",
"units" => "m",
],
deflatelevel = deflatelevel,
)
else
defVar(
ds,
"lon",
ncx,
("lon",),
attrib = [
"long_name" => "longitude",
"standard_name" => "longitude",
"axis" => "X",
"units" => "degrees_east",
],
)
defVar(
ds,
"lat",
ncy,
("lat",),
attrib = [
"long_name" => "latitude",
"standard_name" => "latitude",
"axis" => "Y",
"units" => "degrees_north",
],
deflatelevel = deflatelevel,
)
end
set_extradim_netcdf(ds, extra_dim)
defVar(
ds,
"time",
Float64,
("time",),
attrib = ["units" => time_units, "calendar" => calendar],
deflatelevel = deflatelevel,
)
if sizeinmetres
for (key, val) in pairs(parameters)
if eltype(val.vector) <: AbstractFloat
# all floats are saved as Float32
defVar(
ds,
key,
float_type,
("x", "y", "time"),
attrib = ["_FillValue" => float_type(NaN)],
deflatelevel = deflatelevel,
)
elseif eltype(val.vector) <: SVector
# for SVectors an additional dimension (`extra_dim`) is required
defVar(
ds,
key,
float_type,
("x", "y", extra_dim.name, "time"),
attrib = ["_FillValue" => float_type(NaN)],
deflatelevel = deflatelevel,
)
else
error("Unsupported output type: ", typeof(val.vector))
end
end
else
for (key, val) in pairs(parameters)
if eltype(val.vector) <: AbstractFloat
# all floats are saved as Float32
defVar(
ds,
key,
float_type,
("lon", "lat", "time"),
attrib = ["_FillValue" => float_type(NaN)],
deflatelevel = deflatelevel,
)
elseif eltype(val.vector) <: SVector
# for SVectors an additional dimension (`extra_dim`) is required
defVar(
ds,
key,
float_type,
("lon", "lat", extra_dim.name, "time"),
attrib = ["_FillValue" => float_type(NaN)],
deflatelevel = deflatelevel,
)
else
error("Unsupported output type: ", typeof(val.vector))
end
end
end
return ds
end
"Add a new time to the unlimited time dimension, and return the index"
function add_time(ds, time)
i = length(ds["time"]) + 1
ds["time"][i] = time
return i
end
struct NCReader{T}
dataset::CFDataset
dataset_times::Vector{T}
cyclic_dataset::Union{NCDataset,Nothing}
cyclic_times::Dict{Tuple{Symbol,Vararg{Symbol}},Vector{Tuple{Int,Int}}}
forcing_parameters::Dict{Tuple{Symbol,Vararg{Symbol}},NamedTuple}
cyclic_parameters::Dict{Tuple{Symbol,Vararg{Symbol}},NamedTuple}
end
struct Writer
dataset::Union{NCDataset,Nothing} # dataset (netCDF) for grid data
parameters::Dict{String,Any} # mapping of netCDF variable names to model parameters (arrays)
nc_path::Union{String,Nothing} # path netCDF file (grid data)
csv_path::Union{String,Nothing} # path of CSV file
csv_cols::Vector # model parameter (arrays) and associated reducer function for CSV output
csv_io::IO # file handle to CSV file
state_dataset::Union{NCDataset,Nothing} # dataset with model states (netCDF)
state_parameters::Dict{String,Any} # mapping of netCDF variable names to model states (arrays)
state_nc_path::Union{String,Nothing} # path netCDF file with states
dataset_scalar::Union{NCDataset,Nothing} # dataset (netCDF) for scalar data
nc_scalar::Vector # model parameter (arrays) and associated reducer function for netCDF scalar output
ncvars_dims::Vector # model parameter (String) and associated netCDF variable, location dimension and location name for scalar data
nc_scalar_path::Union{String,Nothing} # path netCDF file (scalar data)
extra_dim::Union{NamedTuple,Nothing} # name and values for extra dimension (to store SVectors)
end
function prepare_reader(config)
path_forcing = config.input.path_forcing
abspath_forcing = input_path(config, path_forcing)
cyclic_path = input_path(config, config.input.path_static)
@info "Cyclic parameters are provided by `$cyclic_path`."
# absolute paths are not supported, see Glob.jl#2
# the path separator in a glob pattern is always /
if isabspath(path_forcing)
parts = splitpath(path_forcing)
# use the root/drive as the dir, to support * in directory names as well
glob_dir = parts[1]
glob_path = join(parts[2:end], '/')
else
tomldir = dirname(config)
dir_input = get(config, "dir_input", ".")
glob_dir = normpath(tomldir, dir_input)
glob_path = replace(path_forcing, '\\' => '/')
end
@info "Forcing parameters are provided by `$abspath_forcing`."
dynamic_paths = glob(glob_path, glob_dir) # expand "data/forcing-year-*.nc"
if isempty(dynamic_paths)
error("No files found with name '$glob_path' in '$glob_dir'")
end
dataset = NCDataset(dynamic_paths, aggdim = "time", deferopen = false)
if haskey(dataset["time"].attrib, "_FillValue")
@warn "Time dimension contains `_FillValue` attribute, this is not in line with CF conventions."
nctimes = dataset["time"][:]
times_dropped = collect(skipmissing(nctimes))
# check if lenght has changed (missings in time dimension are not allowed), and throw
# an error if the lenghts are different
if length(times_dropped) != length(nctimes)
error("Time dimension in `$abspath_forcing` contains missing values")
else
nctimes = times_dropped
nctimes_type = eltype(nctimes)
end
else
nctimes = dataset["time"][:]
nctimes_type = eltype(nctimes)
end
# check for cyclic parameters
do_cyclic = haskey(config.input, "cyclic")
# create map from internal location to netCDF variable name for forcing parameters
forcing_parameters = Dict{Tuple{Symbol,Vararg{Symbol}},NamedTuple}()
for par in config.input.forcing
fields = symbols(par)
ncname, mod = ncvar_name_modifier(param(config.input, fields))
forcing_parameters[fields] =
(name = ncname, scale = mod.scale, offset = mod.offset, value = mod.value)
@info "Set `$par` using netCDF variable `$ncname` as forcing parameter."
end
# create map from internal location to netCDF variable name for cyclic parameters and
# store cyclic times for each internal location (duplicate cyclic times are possible
# this way, however it seems not worth to keep track of unique cyclic times for now
# (memory usage))
if do_cyclic == true
cyclic_dataset = NCDataset(cyclic_path)
cyclic_parameters = Dict{Tuple{Symbol,Vararg{Symbol}},NamedTuple}()
cyclic_times = Dict{Tuple{Symbol,Vararg{Symbol}},Vector{Tuple{Int,Int}}}()
for par in config.input.cyclic
fields = symbols(par)
ncname, mod = ncvar_name_modifier(param(config.input, fields))
i = findfirst(x -> startswith(x, "time"), dimnames(cyclic_dataset[ncname]))
dimname = dimnames(cyclic_dataset[ncname])[i]
cyclic_nc_times = collect(cyclic_dataset[dimname])
cyclic_times[fields] = timecycles(cyclic_nc_times)
cyclic_parameters[fields] =
(name = ncname, scale = mod.scale, offset = mod.offset)
@info "Set `$par` using netCDF variable `$ncname` as cyclic parameter, with `$(length(cyclic_nc_times))` timesteps."
end
else
cyclic_parameters = Dict{Tuple{Symbol,Vararg{Symbol}},NamedTuple}()
cyclic_dataset = nothing
cyclic_times = Dict{Tuple{Symbol,Vararg{Symbol}},Vector{Tuple{Int,Int}}}()
end
# check if there is overlap
if do_cyclic == true
forcing_keys = keys(forcing_parameters)
cyclic_keys = keys(cyclic_parameters)
for forcing_key in forcing_keys
if forcing_key in cyclic_keys
error("parameter specified in both forcing and cyclic")
end
end
end
return NCReader{nctimes_type}(
dataset,
nctimes,
cyclic_dataset,
cyclic_times,
forcing_parameters,
cyclic_parameters,
)
end
"Get a Vector of all unique location ids from a 2D map"
function locations_map(ds, mapname, config)
map_2d = ncread(
ds,
config,
mapname;
optional = false,
type = Union{Int,Missing},
allow_missing = true,
)
ids = unique(skipmissing(map_2d))
return ids
end
"Get a Vector{Tuple} with model parameter and associated netCDF variable name, dimension and location names for scalar data"
function nc_variables_dims(nc_variables, dataset, config)
ncvars_dims = []
for nc_var in nc_variables
var = nc_var["name"]::String
par = nc_var["parameter"]::String
if haskey(nc_var, "map")
mapname = nc_var["map"]
ids = string.(locations_map(dataset, mapname, config))
location_dim = string(var, '_', nc_var["map"])
push!(
ncvars_dims,
(par = par, var = var, location_dim = location_dim, locations = ids),
)
else
push!(
ncvars_dims,
(
par = par,
var = var,
location_dim = nc_var["location"],
locations = [nc_var["location"]],
),
)
end
end
return ncvars_dims
end
"Get a Vector{String} of all columns names for the CSV header, except the first, time"
function csv_header(cols, dataset, config)
header = [col["header"] for col in cols]
header = String[]
for col in cols
h = col["header"]::String
if haskey(col, "map")
mapname = col["map"]
ids = locations_map(dataset, mapname, config)
hvec = [string(h, '_', id) for id in ids]
append!(header, hvec)
else
push!(header, h)
end
end
return header
end
"""
Flatten a nested dictionary, keeping track of the full address of the keys.
Useful for converting TOML of the format:
[a.b]
field_of_b = "name_in_output"
[a.d.c]
field_of_c = "other_name_in_output"
to a non-nested format:
Dict(
symbols"a.b.field_of_b" => "name_in_output,
symbols"a.d.c.field_of_c" => "other_name_in_output,
)
"""
function flat!(d, path, el::Dict)
for (k, v) in pairs(el)
flat!(d, string(path, '.', k), v)
end
return d
end
function flat!(d, path, el)
k = symbols(path)
d[k] = el
return d
end
"""
ncnames(dict)
Create a flat mapping from internal parameter locations to netCDF variable names.
Ignores top level values in the Dict. This function is used to convert a TOML such as:
```toml
[output]
path = "path/to/file.nc"
[output.vertical]
canopystorage = "my_canopystorage"
[output.lateral.river]
q = "my_q"
```
To a dictionary of the flattened parameter locations and netCDF names. The top level
values are ignored since the output path is not a netCDF name.
```julia
Dict(
(:vertical, :canopystorage) => "my_canopystorage,
(:lateral, :river, :q) => "my_q,
)
```
"""
function ncnames(dict)
ncnames_dict = Dict{Tuple{Symbol,Vararg{Symbol}},String}()
for (k, v) in dict
if v isa Dict # ignore top level values (e.g. output.path)
flat!(ncnames_dict, k, v)
end
end
return ncnames_dict
end
"""
out_map(ncnames_dict, modelmap)
Create a Dict that maps parameter netCDF names to arrays in the Model.
"""
function out_map(ncnames_dict, modelmap)
output_map = Dict{String,Any}()
for (par, ncname) in ncnames_dict
A = param(modelmap, par)
output_map[ncname] = (par = par, vector = A)
end
return output_map
end
function get_reducer_func(col, rev_inds, args...)
parameter = col["parameter"]
if occursin("reservoir", parameter)
reducer_func = reducer(col, rev_inds.reservoir, args...)
elseif occursin("lake", parameter)
reducer_func = reducer(col, rev_inds.lake, args...)
elseif occursin("river", parameter)
reducer_func = reducer(col, rev_inds.river, args...)
elseif occursin("drain", parameter)
reducer_func = reducer(col, rev_inds.drain, args...)
else
reducer_func = reducer(col, rev_inds.land, args...)
end
end
function prepare_writer(
config,
modelmap,
rev_inds,
x_nc,
y_nc,
nc_static;
extra_dim = nothing,
)
sizeinmetres = get(config.model, "sizeinmetres", false)::Bool
calendar = get(config, "calendar", "standard")::String
time_units = get(config, "time_units", CFTime.DEFAULT_TIME_UNITS)
# create an output netCDF that will hold all timesteps of selected parameters for grid
# data but only if config.output.path has been set
if haskey(config, "output") && haskey(config.output, "path")
nc_path = output_path(config, config.output.path)
deflatelevel = get(config.output, "compressionlevel", 0)::Int
@info "Create an output netCDF file `$nc_path` for grid data, using compression level `$deflatelevel`."
# create a flat mapping from internal parameter locations to netCDF variable names
output_ncnames = ncnames(config.output)
# fill the output_map by mapping parameter netCDF names to arrays
output_map = out_map(output_ncnames, modelmap)
ds = setup_grid_netcdf(
nc_path,
x_nc,
y_nc,
output_map,
calendar,
time_units,
extra_dim,
sizeinmetres,
deflatelevel = deflatelevel,
)
else
nc_path = nothing
output_map = Dict{String,Any}()
ds = nothing
end
# create a separate state output netCDF that will hold the last timestep of all states
# but only if config.state.path_output has been set
if haskey(config, "state") && haskey(config.state, "path_output")
state_ncnames = check_states(config)
state_map = out_map(state_ncnames, modelmap)
nc_state_path = output_path(config, config.state.path_output)
@info "Create a state output netCDF file `$nc_state_path`."
ds_outstate = setup_grid_netcdf(
nc_state_path,
x_nc,
y_nc,
state_map,
calendar,
time_units,
extra_dim,
sizeinmetres;
float_type = Float,
)
else
ds_outstate = nothing
state_map = Dict{String,Any}()
nc_state_path = nothing
end
# create an output netCDF that will hold all timesteps of selected parameters for scalar
# data, but only if config.netcdf.variable has been set.
if haskey(config, "netcdf") && haskey(config.netcdf, "variable")
nc_scalar_path = output_path(config, config.netcdf.path)
@info "Create an output netCDF file `$nc_scalar_path` for scalar data."
# get netCDF info for scalar data (variable name, locationset (dim) and
# location ids)
ncvars_dims = nc_variables_dims(config.netcdf.variable, nc_static, config)
ds_scalar = setup_scalar_netcdf(
nc_scalar_path,
ncvars_dims,
modelmap,
calendar,
time_units,
extra_dim,
config,
)
# create a vector of (parameter, reducer) named tuples which will be used to
# retrieve and reduce data during a model run
nc_scalar = []
for var in config.netcdf.variable
parameter = var["parameter"]