-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathensemble_manager_mod.f90
1950 lines (1449 loc) · 69.8 KB
/
ensemble_manager_mod.f90
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
! DART software - Copyright UCAR. This open source software is provided
! by UCAR, "as is", without charge, subject to all terms of use at
! http://www.image.ucar.edu/DAReS/DART/DART_download
module ensemble_manager_mod
! Manages a data structure that is composed of copies of a vector of variables.
! The general data structure simply represents this two-dimensional array but
! provides tools for storing it with one dimension (copies) or the other
! (variables) complete on each process of a multiprocess implementation. Some
! operations that are specifically aimed at supporting ensemble filter applications
! have been placed here for efficiency even though they might be more
! appropriately abstracted at a higher level of code.
use types_mod, only : r8, i4, i8, MISSING_R8
use utilities_mod, only : do_nml_file, do_nml_term, &
error_handler, E_ERR, E_MSG, do_output, &
nmlfileunit, find_namelist_in_file, &
check_namelist_read, timestamp, set_output
use time_manager_mod, only : time_type, set_time
use mpi_utilities_mod, only : task_count, my_task_id, send_to, receive_from, &
task_sync, broadcast_send, broadcast_recv
use sort_mod, only : index_sort
implicit none
private
public :: copies_in_window, mean_row, set_num_extra_copies, &
get_allow_transpose
public :: init_ensemble_manager, end_ensemble_manager, get_ensemble_time, &
ensemble_type, duplicate_ens, get_var_owner_index, &
get_my_num_copies, get_my_copies, get_my_num_vars, &
get_my_vars, compute_copy_mean, compute_copy_mean_sd, &
get_copy, put_copy, all_vars_to_all_copies, &
all_copies_to_all_vars, allocate_vars, deallocate_vars, &
compute_copy_mean_var, get_copy_owner_index, set_ensemble_time, &
broadcast_copy, prepare_to_write_to_vars, prepare_to_write_to_copies, &
prepare_to_read_from_vars, prepare_to_read_from_copies, prepare_to_update_vars, &
prepare_to_update_copies, print_ens_handle, set_current_time, &
map_task_to_pe, map_pe_to_task, get_current_time, &
allocate_single_copy, put_single_copy, get_single_copy, &
deallocate_single_copy
character(len=*), parameter :: source = 'ensemble_manager_mod.f90'
type ensemble_type
!>@todo update documentation with regard to 'single_restart_file_[in,out]'
!>@todo FIXME the rule here should be that we only access %copies and %vars for efficiency
!>but every other part of this structure should go through accessor routines.
!DIRECT ACCESS INTO STORAGE IS USED TO REDUCE COPYING: BE CAREFUL
!!!private
integer(i8) :: num_vars
integer :: num_copies, my_num_copies, my_num_vars
integer, allocatable :: my_copies(:)
integer(i8), allocatable :: my_vars(:)
! Storage in next line is to be used when each pe has all copies of subset of vars
real(r8), allocatable :: copies(:, :) ! Dimensioned (num_copies, my_num_vars)
! Storage on next line is used when each pe has subset of copies of all vars
real(r8), allocatable :: vars(:, :) ! Dimensioned (num_vars, my_num_copies)
! Time is only related to var complete
type(time_type), allocatable :: time(:)
integer :: distribution_type
integer :: valid ! copies modified last, vars modified last, both same
integer :: id_num
integer, allocatable :: task_to_pe_list(:), pe_to_task_list(:) ! List of tasks
! Flexible my_pe, layout_type which allows different task layouts for different ensemble handles
integer :: my_pe
integer :: layout_type
integer :: transpose_type
integer :: num_extras
type(time_type) :: current_time ! The current time, constant across the ensemble
end type ensemble_type
!PAR other storage option control can be implemented here. In particular, want to find
!PAR some way, either allocating or multiple addressing, to use same chunk of storage
!PAR for both copy and var complete representations.
! track if copies modified last, vars modified last, both are in sync
! (and therefore both valid to be used r/o), or unknown.
integer, parameter :: VALID_UNKNOWN = -1
integer, parameter :: VALID_BOTH = 0 ! vars & copies have same data
integer, parameter :: VALID_VARS = 1 ! vars(:,:) modified last
integer, parameter :: VALID_COPIES = 2 ! copies(:,:) modified last
! unique counter per ensemble handle
integer :: global_counter = 1
! Logical flag for initialization of module
logical :: module_initialized = .false.
! Module storage for writing error messages
character(len = 255) :: msgstring
! Module storage for pe information for this process avoids recomputation
integer :: num_pes
! Control order of communication loops in the transpose routines
logical :: use_copy2var_send_loop = .true.
logical :: use_var2copy_rec_loop = .true.
!-----------------------------------------------------------------
!
! namelist with default values
! Complain if unneeded transposes are done
!>@todo remove all things related to this
! logical :: flag_unneeded_transposes = .false.
! Communication configuration:
! 1 = usual default, 2 - 4 are valid and depend on the machine, ensemble count, and task count
integer :: communication_configuration = 1
! task layout options:
integer :: layout = 1 ! default to my_pe = my_task_id(). Layout2 assumes that the user knows the correct tasks_per_node
integer :: tasks_per_node = 1 ! default to 1 if the user does not specify a number of tasks per node.
logical :: debug = .false.
namelist / ensemble_manager_nml / communication_configuration, &
layout, tasks_per_node, &
debug
!-----------------------------------------------------------------
contains
!-----------------------------------------------------------------
subroutine init_ensemble_manager(ens_handle, num_copies, &
num_vars, distribution_type_in, layout_type, transpose_type_in)
type(ensemble_type), intent(out) :: ens_handle
integer, intent(in) :: num_copies
integer(i8), intent(in) :: num_vars
integer, intent(in), optional :: distribution_type_in
integer, intent(in), optional :: layout_type
integer, intent(in), optional :: transpose_type_in ! no vars, transposable, transpose and duplicate
integer :: iunit, io
integer :: transpose_type
! Distribution type controls pe layout of storage; Default is 1. 1 is only one implemented.
if(.not. present(distribution_type_in)) then
ens_handle%distribution_type = 1
else
! Check for error: only type 1 implemented for now
if(distribution_type_in /= 1) call error_handler(E_ERR, 'init_ensemble_manager', &
'only distribution_type 1 is implemented', source)
ens_handle%distribution_type = distribution_type_in
endif
! First call to init_ensemble_manager must initialize module and read namelist
if ( .not. module_initialized ) then
! Initialize the module with utilities
module_initialized = .true.
! Read the namelist entry
call find_namelist_in_file("input.nml", "ensemble_manager_nml", iunit)
read(iunit, nml = ensemble_manager_nml, iostat = io)
call check_namelist_read(iunit, io, "ensemble_manager_nml")
if (do_nml_file()) write(nmlfileunit, nml=ensemble_manager_nml)
if (do_nml_term()) write( * , nml=ensemble_manager_nml)
! Get mpi information for this process; it's stored in module storage
num_pes = task_count()
endif
! Optional layout_type argument to assign how my_pe is related to my_task_id
! layout_type can be set individually for each ensemble handle. It is not advisable to do this
! because get_obs_ens assumes that the layout is the same for each ensemble handle.
if(.not. present(layout_type) ) then
ens_handle%layout_type = layout ! namelist option
else
ens_handle%layout_type = layout_type
endif
! Check for error: only layout_types 1 and 2 are implemented
if (ens_handle%layout_type /= 1 .and. ens_handle%layout_type /=2 ) then
call error_handler(E_ERR, 'init_ensemble_manager', &
'only layout values 1 (standard), 2 (round-robin) allowed ', source)
endif
! Optional transpose type:
! 1 not transposable - always copy complete
! 2 transposable - has a vars array
! 3 duplicatable - really only 1 copy, but this gets duplicated as vars array on every task during a transpose
if (.not. present(transpose_type_in)) then
transpose_type = 1
else
transpose_type = transpose_type_in
endif
ens_handle%transpose_type = transpose_type
allocate(ens_handle%task_to_pe_list(num_pes))
allocate(ens_handle%pe_to_task_list(num_pes))
call assign_tasks_to_pes(ens_handle, num_copies, ens_handle%layout_type)
ens_handle%my_pe = map_task_to_pe(ens_handle, my_task_id())
! Set the global storage bounds for the number of copies and variables
ens_handle%num_copies = num_copies
ens_handle%num_vars = num_vars
! For debugging, error checking
ens_handle%id_num = global_counter
global_counter = global_counter + 1
! This controls which way the two transpose routines order their
! loops: single sender with multiple receivers, or single receiver
! with multiple senders. For small problems it doesn't matter;
! for very large ensemble sizes and very large MPI task counts you
! will want to profile each combination and pick the fastest.
if (communication_configuration == 1) then
use_copy2var_send_loop = .true.
use_var2copy_rec_loop = .true.
else if (communication_configuration == 2) then
use_copy2var_send_loop = .false.
use_var2copy_rec_loop = .true.
else if (communication_configuration == 3) then
use_copy2var_send_loop = .true.
use_var2copy_rec_loop = .false.
else if (communication_configuration == 4) then
use_copy2var_send_loop = .false.
use_var2copy_rec_loop = .false.
else
write(msgstring, *) 'communication_configuration is', communication_configuration
call error_handler(E_ERR, 'init_ensemble_manager', &
'communication_configuration must be between 1 and 4', &
source, text2=msgstring)
endif
! initially no data
ens_handle%valid = VALID_BOTH
if(debug .and. my_task_id()==0) then
print*, 'pe_to_task_list', ens_handle%pe_to_task_list
print*, 'task_to_pe_list', ens_handle%task_to_pe_list
endif
! Figure out how the ensemble copies are partitioned
call set_up_ens_distribution(ens_handle)
ens_handle%num_extras = 0 ! This can be changed by calling set_num_extra_copies
if (debug) call print_ens_handle(ens_handle)
end subroutine init_ensemble_manager
!-----------------------------------------------------------------
subroutine get_copy(receiving_pe, ens_handle, copy, vars, mtime)
! The requested copy of the vars from the ensemble is stored into
! vars on the receiving_pe process. Only the receiving pe and the pe
! that actually stores the copy do anything. The vars storage only need be
! allocated on the receiving_pe. Time only transferred if mtime present.
! An example application of this routine would be for writing state
! space diagnostics. The filter only uses a single writer to an open file,
! currently process 0. To write out the ensemble mean state to the
! prior and posterior netcdf state space diagnostic files, process 0
! would need to request and receive the copy containing the ensemble
! mean from whatever process stores it when processes have a subset
! of copies of all variables.
integer, intent(in) :: receiving_pe
type(ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: copy
real(r8), intent(out) :: vars(:)
type(time_type), intent(out), optional :: mtime
integer :: owner, owners_index
! Error checking
if (ens_handle%valid /= VALID_VARS .and. ens_handle%valid /= VALID_BOTH) then
call error_handler(E_ERR, 'get_copy', 'last access not var-complete', source)
endif
! Verify that requested copy exists
if(copy < 1 .or. copy > ens_handle%num_copies) then
write(msgstring, *) 'Requested copy: ', copy, ' is > maximum copy: ', ens_handle%num_copies
call error_handler(E_ERR,'get_copy', msgstring, source)
endif
! Make sure that vars has enough space to handle the answer
if(ens_handle%my_pe == receiving_pe) then !HK I think only the receiver needs the space
if(size(vars) < ens_handle%num_vars) then
write(msgstring, *) 'Size of vars: ', size(vars), ' Must be at least ', ens_handle%num_vars
call error_handler(E_ERR,'get_copy', msgstring, source)
endif
endif
! Figure out which PE stores this copy and what its local storage index is
call get_copy_owner_index(ens_handle, copy, owner, owners_index)
!----------- Block of code that must be done by receiving pe -----------------------------
if(ens_handle%my_pe == receiving_pe) then
! If PE that stores is the same, just copy and return
if(ens_handle%my_pe == owner) then
vars = ens_handle%vars(:, owners_index)
if(present(mtime)) mtime = ens_handle%time(owners_index)
! If I'm the receiving PE and also the owner, I'm all finished; return
return
endif
! Otherwise, must wait to receive vars and time from storing pe
call receive_from(map_pe_to_task(ens_handle, owner), vars, mtime)
endif
!----- Block of code that must be done by PE that stores the copy IF it is NOT receiver -----
if(ens_handle%my_pe == owner) then
! Send copy to receiving pe
if(present(mtime)) then
call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
else
call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index))
endif
endif
!------ End of block ---------------------------------------------------------------------
end subroutine get_copy
!-----------------------------------------------------------------
subroutine put_copy(sending_pe, ens_handle, copy, vars, mtime)
! The vars on the sending_pe is stored on the pe
! that stores this copy. Only the sending_pe and storing pe do anything. The
! vars storage only needs to be allocated on the sending_pe. An example
! use is when process 0 in the filter reads the observation keys from the
! obs_sequence file and then needs to place this as the appropriate copy
! in the obs_ensemble file. This copy is stored by some process that may
! not be 0.
integer, intent(in) :: sending_pe
type(ensemble_type), intent(inout) :: ens_handle
integer, intent(in) :: copy
real(r8), intent(in) :: vars(:)
type(time_type), intent(in), optional :: mtime
integer :: owner, owners_index
! Error checking
if (ens_handle%valid /= VALID_VARS .and. ens_handle%valid /= VALID_BOTH) then
call error_handler(E_ERR, 'put_copy', 'last access not var-complete', source)
endif
if(copy < 1 .or. copy > ens_handle%num_copies) then
write(msgstring, *) 'Requested copy: ', copy, ' is > maximum copy: ', ens_handle%num_copies
call error_handler(E_ERR,'put_copy', msgstring, source)
endif
! Make sure that num_vars has enough space to handle the answer
if(ens_handle%num_vars < size(vars)) then
write(msgstring, *) 'Size of vars: ', size(vars), ' Cannot be more than ', ens_handle%num_vars
call error_handler(E_ERR,'put_copy', msgstring, source)
endif
! What PE stores this copy and what is its local storage index
call get_copy_owner_index(ens_handle, copy, owner, owners_index)
! Block of code that must be done by PE that is to send the copy
if(ens_handle%my_pe == sending_pe) then
! If PE that stores is the same, just copy and return
if(ens_handle%my_pe == owner) then
ens_handle%vars(:, owners_index) = vars
if(present(mtime)) ens_handle%time(owners_index) = mtime
! If I'm the sending PE and also the owner, I'm all finished; return
return
endif
! Otherwise, must send vars and possibly time to storing pe
call send_to(map_pe_to_task(ens_handle, owner), vars, mtime)
endif
! Block of code that must be done by PE that stores the copy IF it is NOT sender
if(ens_handle%my_pe == owner) then
! Need to receive copy from sending_pe
if(present(mtime)) then
call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
else
call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index))
endif
endif
ens_handle%valid = VALID_VARS
end subroutine put_copy
!-----------------------------------------------------------------
subroutine broadcast_copy(ens_handle, copy, arraydata)
! find which PE has the global copy number and have it broadcast
! that copy to all the other PEs. arraydata is an output on
! all PEs, even on the PE which is the owner it is separate
! storage from the vars array in the ensemble handle.
type(ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: copy
real(r8), intent(out) :: arraydata(:)
integer :: owner, owners_index
! Error checking
if (ens_handle%valid /= VALID_VARS .and. ens_handle%valid /= VALID_BOTH) then
call error_handler(E_ERR, 'broadcast_copy', 'last access not var-complete', source)
endif
if(copy < 1 .or. copy > ens_handle%num_copies) then
write(msgstring, *) 'Requested copy: ', copy, ' is > maximum copy: ', ens_handle%num_copies
call error_handler(E_ERR,'broadcast_copy', msgstring, source)
endif
! Make sure that arraydata has enough space to handle the answer
if(size(arraydata) < ens_handle%num_vars) then
write(msgstring, *) 'Size of arraydata: ', size(arraydata), ' must be at least ', ens_handle%num_vars
call error_handler(E_ERR,'broadcast_copy', msgstring, source)
endif
! What PE stores this copy and what is its local storage index
call get_copy_owner_index(ens_handle, copy, owner, owners_index)
! First block of code that must be done by PE that is to send the copy
if(ens_handle%my_pe == owner) then
arraydata = ens_handle%vars(:, owners_index)
call broadcast_send(map_pe_to_task(ens_handle, owner), arraydata)
else
call broadcast_recv(map_pe_to_task(ens_handle, owner), arraydata)
endif
end subroutine broadcast_copy
!-----------------------------------------------------------------
subroutine prepare_to_write_to_vars(ens_handle)
! Warn ens manager that we're going to directly update the %vars array
type(ensemble_type), intent(inout) :: ens_handle
!ens_handle%valid = VALID_VARS
end subroutine prepare_to_write_to_vars
!-----------------------------------------------------------------
subroutine prepare_to_write_to_copies(ens_handle)
! Warn ens manager that we're going to directly update the %copies array
type(ensemble_type), intent(inout) :: ens_handle
!ens_handle%valid = VALID_COPIES
end subroutine prepare_to_write_to_copies
!-----------------------------------------------------------------
subroutine prepare_to_read_from_vars(ens_handle)
! Check to be sure that the vars array is current
type(ensemble_type), intent(in) :: ens_handle
!if (ens_handle%valid /= VALID_VARS .and. ens_handle%valid /= VALID_BOTH) then
! call error_handler(E_ERR, 'prepare_to_read_from_vars', &
! 'last access not var-complete', source)
!endif
end subroutine prepare_to_read_from_vars
!-----------------------------------------------------------------
subroutine prepare_to_read_from_copies(ens_handle)
! Check to be sure that the copies array is current
type(ensemble_type), intent(in) :: ens_handle
!if (ens_handle%valid /= VALID_COPIES .and. ens_handle%valid /= VALID_BOTH) then
! call error_handler(E_ERR, 'prepare_to_read_from_copies', &
! 'last access not copy-complete', source)
!endif
end subroutine prepare_to_read_from_copies
!-----------------------------------------------------------------
subroutine prepare_to_update_vars(ens_handle)
! We need read/write access, so it has to start valid for vars or both,
! and then is going to be vars only going out.
type(ensemble_type), intent(inout) :: ens_handle
!if (ens_handle%valid /= VALID_VARS .and. ens_handle%valid /= VALID_BOTH) then
! call error_handler(E_ERR, 'prepare_to_update_vars', &
! 'last access not var-complete', source)
!endif
!ens_handle%valid = VALID_VARS
end subroutine prepare_to_update_vars
!-----------------------------------------------------------------
subroutine prepare_to_update_copies(ens_handle)
! We need read/write access, so it has to start valid for copies or both,
! and then is going to be copies only going out.
type(ensemble_type), intent(inout) :: ens_handle
!if (ens_handle%valid /= VALID_COPIES .and. ens_handle%valid /= VALID_BOTH) then
! call error_handler(E_ERR, 'prepare_to_update_copies', &
! 'last access not copy-complete', source)
!endif
!ens_handle%valid = VALID_COPIES
end subroutine prepare_to_update_copies
!-----------------------------------------------------------------
subroutine set_ensemble_time(ens_handle, indx, mtime)
! Sets the time of an ensemble member indexed by local storage on this pe.
type(ensemble_type), intent(inout) :: ens_handle
integer, intent(in) :: indx
type(time_type), intent(in) :: mtime
if(indx < 1 .or. indx > ens_handle%my_num_copies) then
write(msgstring, *) 'indx: ', indx, ' cannot exceed ', ens_handle%my_num_copies
call error_handler(E_ERR,'set_ensemble_time', msgstring, source)
endif
ens_handle%time(indx) = mtime
end subroutine set_ensemble_time
!-----------------------------------------------------------------
subroutine get_ensemble_time(ens_handle, indx, mtime)
! Returns the time of an ensemble member indexed by local storage on this pe.
type(ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: indx
type(time_type), intent(out) :: mtime
if(indx < 1 .or. indx > ens_handle%my_num_copies) then
write(msgstring, *) 'indx: ', indx, ' cannot exceed ', ens_handle%my_num_copies
call error_handler(E_ERR,'get_ensemble_time', msgstring, source)
endif
mtime = ens_handle%time(indx)
end subroutine get_ensemble_time
!-----------------------------------------------------------------
subroutine end_ensemble_manager(ens_handle)
! Frees up allocated storage for an ensemble handle
type(ensemble_type), intent(inout) :: ens_handle
! Free up the allocated storage
deallocate(ens_handle%my_copies, ens_handle%time, ens_handle%my_vars, &
ens_handle%copies, ens_handle%task_to_pe_list, ens_handle%pe_to_task_list)
if(allocated(ens_handle%vars)) deallocate(ens_handle%vars)
end subroutine end_ensemble_manager
!-----------------------------------------------------------------
subroutine duplicate_ens(ens1, ens2, duplicate_time)
type(ensemble_type), intent(in) :: ens1
type(ensemble_type), intent(inout) :: ens2
logical, intent(in) :: duplicate_time
! Name was changed from copy_ens to avoid confusion in naming.
! Should only be used if the ens1%vars storage is current (each pe has subset of
! copies of all vars).
! If duplicate_time is true, also copies the time information from ens1 to ens2.
! If duplicate_time is false, the times in ens2 are left unchanged.
! Error checking
if (ens1%valid /= VALID_VARS .and. ens1%valid /= VALID_BOTH) then
call error_handler(E_ERR, 'duplicate_ens', &
'last access not var-complete for source ensemble', source)
endif
! Check to make sure that the ensembles are compatible
if(ens1%num_copies /= ens2%num_copies) then
write(msgstring, *) 'num_copies ', ens1%num_copies, ' and ', ens2%num_copies, &
'must be equal'
call error_handler(E_ERR,'duplicate_ens', msgstring, source)
endif
if(ens1%num_vars /= ens2%num_vars) then
write(msgstring, *) 'num_vars ', ens1%num_vars, ' and ', ens2%num_vars, &
'must be equal'
call error_handler(E_ERR,'duplicate_ens', msgstring, source)
endif
if(ens1%distribution_type /= ens2%distribution_type) then
write(msgstring, *) 'distribution_type ', ens1%distribution_type, ' and ', &
ens2%distribution_type, 'must be equal'
call error_handler(E_ERR,'duplicate_ens', msgstring, source)
endif
! Duplicate each copy that is stored locally on this process.
ens2%vars = ens1%vars
ens2%valid = VALID_VARS
! Duplicate time if requested
if(duplicate_time) ens2%time = ens1%time
end subroutine duplicate_ens
!-----------------------------------------------------------------
function get_my_num_copies(ens_handle)
! Returns the number of copies stored on my processor when var complete.
! Same as num_copies if single process is in use.
integer :: get_my_num_copies
type (ensemble_type), intent(in) :: ens_handle
get_my_num_copies = ens_handle%my_num_copies
end function get_my_num_copies
!-----------------------------------------------------------------
subroutine get_my_copies(ens_handle, copies)
! Returns a list of all copies stored on this processor when var complete.
! Requires copies to be dimensioned with at least my_num_copies.
! Returns 1...num_copies for single process.
type (ensemble_type), intent(in) :: ens_handle
integer, intent(out) :: copies(:)
if(size(copies) < ens_handle%my_num_copies) then
write(msgstring, *) 'Array copies only has size ', size(copies), &
' but must be at least ', ens_handle%my_num_copies
call error_handler(E_ERR, 'get_my_copies', msgstring, source)
endif
copies(1:ens_handle%my_num_copies) = ens_handle%my_copies(1:ens_handle%my_num_copies)
end subroutine get_my_copies
!-----------------------------------------------------------------
function get_my_num_vars(ens_handle)
! Returns the number of vars stored on my processor when copy complete.
! Same as num_vars if single process is in use.
integer :: get_my_num_vars
type (ensemble_type), intent(in) :: ens_handle
get_my_num_vars = ens_handle%my_num_vars
end function get_my_num_vars
!-----------------------------------------------------------------
subroutine get_my_vars(ens_handle, vars)
! Returns a list of all vars stored on this processor when copy complete.
! Requires vars to be dimensioned with at least my_num_vars.
! Returns 1...num_vars for single process.
type (ensemble_type), intent(in) :: ens_handle
integer(i8), intent(out) :: vars(:)
if(size(vars) < ens_handle%my_num_vars) then
write(msgstring, *) 'Array vars only has size ', size(vars), &
' but must be at least ', ens_handle%my_num_vars
call error_handler(E_ERR,'get_my_vars', msgstring, source)
endif
vars(1:ens_handle%my_num_vars) = ens_handle%my_vars(1:ens_handle%my_num_vars)
end subroutine get_my_vars
!-----------------------------------------------------------------
subroutine set_up_ens_distribution(ens_handle)
! Figures out how to lay-out the copy complete and vars complete
! distributions. The distribution_type identifies
! different options. Only distribution_type 1 is implemented.
! This puts every nth var or copy on a given processor where n is the
! total number of processes.
type (ensemble_type), intent(inout) :: ens_handle
integer :: num_per_pe_below, num_left_over, i
integer(i8) :: per_pe, suggest_pes
! Check that there are enough pes for the state
per_pe = ens_handle%num_vars / num_pes
if (per_pe >= (huge(i)-100) ) then
suggest_pes = ( ens_handle%num_vars / (huge(i)) ) * 2
write(msgstring, '(A,I5,1X,A)') &
'not enough MPI tasks for the model size, suggest at least ' , &
suggest_pes, 'tasks'
call error_handler(E_ERR, 'set_up_ens_distribution', msgstring, source)
endif
! Option 1: Maximum separation for both vars and copies
! Compute the total number of copies I'll get for var complete
num_per_pe_below = ens_handle%num_copies / num_pes
num_left_over = ens_handle%num_copies - num_per_pe_below * num_pes
if(num_left_over >= (ens_handle%my_pe + 1)) then
ens_handle%my_num_copies = num_per_pe_below + 1
else
ens_handle%my_num_copies = num_per_pe_below
endif
! Do the same thing for copy complete: figure out which vars I get
num_per_pe_below = ens_handle%num_vars / num_pes
num_left_over = ens_handle%num_vars - num_per_pe_below * num_pes
if(num_left_over >= (ens_handle%my_pe + 1)) then
ens_handle%my_num_vars = num_per_pe_below + 1
else
ens_handle%my_num_vars = num_per_pe_below
endif
!Allocate the storage for copies and vars all at once
allocate(ens_handle%my_copies(ens_handle%my_num_copies), &
ens_handle%time (ens_handle%my_num_copies), &
ens_handle%my_vars (ens_handle%my_num_vars), &
ens_handle%copies (ens_handle%num_copies, ens_handle%my_num_vars))
if(ens_handle%transpose_type == 2) then
allocate(ens_handle%vars(ens_handle%num_vars, ens_handle%my_num_copies))
ens_handle%vars = MISSING_R8
endif
if(ens_handle%transpose_type == 3) then
allocate(ens_handle%vars(ens_handle%num_vars,1))
ens_handle%vars = MISSING_R8
endif
! Set everything to missing value
ens_handle%copies = MISSING_R8
! Fill out the number of my members
call get_copy_list(ens_handle, ens_handle%num_copies, ens_handle%my_pe, ens_handle%my_copies, i)
! Initialize times to missing
! This is only initializing times for pes that have ensemble copies
do i = 1, ens_handle%my_num_copies
ens_handle%time(i) = set_time(0, 0)
end do
! Fill out the number of my vars
call get_var_list(ens_handle, ens_handle%num_vars, ens_handle%my_pe, ens_handle%my_vars, i)
end subroutine set_up_ens_distribution
!-----------------------------------------------------------------
subroutine get_copy_owner_index(ens_handle, copy_number, owner, owners_index)
! Given the copy number, returns which PE stores it when var complete
! and its index in that pes local storage. Depends on distribution_type
! with only option 1 currently implemented.
type (ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: copy_number
integer, intent(out) :: owner, owners_index
integer :: div
! Asummes distribution type 1
div = (copy_number - 1) / num_pes
owner = copy_number - div * num_pes - 1
owners_index = div + 1
end subroutine get_copy_owner_index
!-----------------------------------------------------------------
subroutine get_var_owner_index(ens_handle, var_number, owner, owners_index)
! Given the var number, returns which PE stores it when copy complete
! and its index in that pes local storage. Depends on distribution_type
! with only option 1 currently implemented.
! Assumes that all tasks are used in the ensemble
type (ensemble_type), intent(in) :: ens_handle
integer(i8), intent(in) :: var_number
integer, intent(out) :: owner
integer, intent(out) :: owners_index
integer :: div
! Asummes distribution type 1
div = (var_number - 1) / num_pes
owner = var_number - div * num_pes - 1
owners_index = div + 1
end subroutine get_var_owner_index
!-----------------------------------------------------------------
function get_max_num_vars(ens_handle, num_vars)
!!!function get_max_num_vars(num_vars, distribution_type)
! Returns the largest number of vars that are on any pe when copy complete.
! Depends on distribution_type with only option 1 currently implemented.
! Used to get size for creating storage to receive a list of the vars on a pe.
type (ensemble_type), intent(in) :: ens_handle
integer(i8), intent(in) :: num_vars
integer :: get_max_num_vars
!!!integer, intent(in) :: distribution_type
!could this be instead:
!
! get_max_num_vars = num_vars / num_pes ! integer math rounds down
! if (get_max_num_vars * num_pes /= num_vars) &
! get_max_num_vars = get_max_num_vars + 1
!
! if num_vars divides evenly into the num_pes we use
! the exact size. otherwise if uneven we add one.
! this number has to be the same on all PEs because it
! sets the send/recv size. it doesn't matter which pes
! have extra values, just that is any of them do then
! everyone uses the larger number.
get_max_num_vars = num_vars / num_pes + 1
end function get_max_num_vars
!-----------------------------------------------------------------
function get_max_num_copies(ens_handle, num_copies)
!!!function get_max_num_copies(num_copies, distribution_type)
! Returns the largest number of copies that are on any pe when var complete.
! Depends on distribution_type with only option 1 currently implemented.
! Used to get size for creating storage to receive a list of the copies on a pe.
type (ensemble_type), intent(in) :: ens_handle
integer :: get_max_num_copies
integer, intent(in) :: num_copies
!!!integer, intent(in) :: distribution_type
get_max_num_copies = num_copies / num_pes + 1
end function get_max_num_copies
!-----------------------------------------------------------------
subroutine get_var_list(ens_handle, num_vars, pe, var_list, pes_num_vars)
!!!subroutine get_var_list(num_vars, pe, var_list, pes_num_vars, distribution_type)
! Returns a list of the vars stored by process pe when copy complete
! and the number of these vars.
! var_list must be dimensioned large enough to hold all vars.
! Depends on distribution_type with only option 1 currently implemented.
type (ensemble_type), intent(in) :: ens_handle
integer(i8), intent(in) :: num_vars
integer, intent(in) :: pe
integer(i8), intent(out) :: var_list(:)
integer, intent(out) :: pes_num_vars
!!!integer, intent(in) :: distribution_type
integer :: num_per_pe_below, num_left_over, i
! Figure out number of vars stored by pe
num_per_pe_below = num_vars / num_pes
num_left_over = num_vars - num_per_pe_below * num_pes
if(num_left_over >= (pe + 1)) then
pes_num_vars = num_per_pe_below + 1
else
pes_num_vars = num_per_pe_below
endif
! Fill out the pe's vars
do i = 1, pes_num_vars
var_list(i) = (pe + 1) + (i - 1) * num_pes
end do
end subroutine get_var_list
!-----------------------------------------------------------------
subroutine get_copy_list(ens_handle, num_copies, pe, copy_list, pes_num_copies)
!!!subroutine get_copy_list(num_copies, pe, copy_list, pes_num_copies, distribution_type)
! Returns a list of the copies stored by process pe when var complete.
! copy_list must be dimensioned large enough to hold all copies.
! Depends on distribution_type with only option 1 currently implemented.
type (ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: num_copies, pe
integer, intent(out) :: copy_list(:), pes_num_copies
!!!integer, intent(in) :: distribution_type
integer :: num_per_pe_below, num_left_over, i
! Figure out which copies stored by pe
num_per_pe_below = num_copies / num_pes
num_left_over = num_copies - num_per_pe_below * num_pes
if(num_left_over >= (pe + 1)) then
pes_num_copies = num_per_pe_below + 1
else
pes_num_copies = num_per_pe_below
endif
! Fill out the pe's copies
do i = 1, pes_num_copies
copy_list(i) = (pe + 1) + (i - 1) * num_pes
end do
end subroutine get_copy_list
!-----------------------------------------------------------------
!> accessor function
function get_allow_transpose(ens_handle)
type(ensemble_type), intent(in) :: ens_handle
logical :: get_allow_transpose
if (ens_handle%transpose_type == 2 .or. ens_handle%transpose_type == 3) then
get_allow_transpose = .true.
else
get_allow_transpose = .false.
endif
end function get_allow_transpose
!--------------------------------------------------------------------------------
!> Return the physical task for my_pe
function map_pe_to_task(ens_handle, p)
type(ensemble_type), intent(in) :: ens_handle
integer, intent(in) :: p !! pe number
integer :: map_pe_to_task !! physical task number
map_pe_to_task = ens_handle%pe_to_task_list(p + 1)
end function map_pe_to_task
!--------------------------------------------------------------------------------
!> return the number of actual ensemble members (not extra copies)
function copies_in_window(ens_handle)
type(ensemble_type), intent(in) :: ens_handle
integer :: copies_in_window
integer :: ens_size, i
ens_size = ens_handle%num_copies - ens_handle%num_extras
! Counting up the 'real' ensemble copies a task has. Don't
! want the extras (mean, etc.)
if (ens_handle%transpose_type == 1) then ! distibuted (all tasks have all copies)
copies_in_window = ens_size
elseif (ens_handle%transpose_type == 2) then ! var complete (only some tasks have data)
copies_in_window = 0
do i = 1, ens_handle%my_num_copies
if (ens_handle%my_copies(i) <= ens_size) then
copies_in_window = copies_in_window + 1
endif
enddo
elseif(ens_handle%transpose_type == 3)then ! mean copy on each process
copies_in_window = 1
endif
end function copies_in_window
!--------------------------------------------------------------------------------
!> return the index of the mean row
!> mean row is the row in state_ens_handle%copies(:,:) which is the mean. Typically