-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathmpi_utilities_mod.f90
2033 lines (1556 loc) · 71 KB
/
mpi_utilities_mod.f90
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
! DART software - Copyright UCAR. This open source software is provided
! by UCAR, "as is", without charge, subject to all terms of use at
! http://www.image.ucar.edu/DAReS/DART/DART_download
!> A collection of interfaces to the MPI (Message Passing Interface)
!> multi-processor communication library routines.
!>
!> This file and its companion file, null_mpi_utilities_mod.f90,
!> are the only modules in the DART system that should be calling
!> the MPI library routines directly. This isolates the MPI calls
!> and allows programs to swap in the null version to compile the
!> same source files into a serial program.
!>
!> The names of these routines were intentionally picked to be
!> more descriptive to someone who doesn't the MPI interfaces.
!> e.g. MPI_AllReduce() may not immediately tell a user what
!> it does, but broadcast_minmax() is hopefully more helpful.
!>
!> If you add any routines or change any arguments in this file
!> you must make the same changes in the null version. These two
!> modules have the same module name and must have identical
!> public routines and calling formats.
!>
!> All MPI routines are called from here. There is a companion
!> file which has the same module name and entry points but all
!> routines are stubs. This allows a single-task version of the
!> code to be compiled without the MPI libraries.
module mpi_utilities_mod
use types_mod, only : i8, r8, digits12
use utilities_mod, only : error_handler, &
E_ERR, E_WARN, E_MSG, E_DBG, get_unit, close_file, &
set_output, set_tasknum, initialize_utilities, &
finalize_utilities, &
nmlfileunit, do_output, do_nml_file, do_nml_term, &
find_namelist_in_file, check_namelist_read
use time_manager_mod, only : time_type, get_time, set_time
! BUILD TIP 1
! Many MPI installations have an MPI module; if one is present, use it.
! ('use mpi')
! If not, there will be an MPI include file which defines the parameters.
! ('include mpif.h')
! Use one but not both. The 'use' line must be before the 'implicit none'
! and 'private' lines, 'include' must come after. Go figure.
! For more help on compiling a module which uses MPI see the
! $DART/developer_tests/mpi_utilities/tests/README
use mpi
! the NAG compiler needs these special definitions enabled
! !!NAG_BLOCK_EDIT START COMMENTED_OUT
! !#ifdef __NAG__
! use F90_unix_proc, only : sleep, system, exit
!! block for NAG compiler
! PURE SUBROUTINE SLEEP(SECONDS,SECLEFT)
! INTEGER,INTENT(IN) :: SECONDS
! INTEGER,OPTIONAL,INTENT(OUT) :: SECLEFT
!
! SUBROUTINE SYSTEM(STRING,STATUS,ERRNO)
! CHARACTER*(*),INTENT(IN) :: STRING
! INTEGER,OPTIONAL,INTENT(OUT) :: STATUS,ERRNO
!
!!also used in exit_all outside this module
! SUBROUTINE EXIT(STATUS)
! INTEGER,OPTIONAL :: STATUS
!! end block
! !#endif
! !!NAG_BLOCK_EDIT END COMMENTED_OUT
implicit none
private
!include "mpif.h"
! BUILD TIP 2
! Some compilers require an interface block for the system() function;
! some fail if you define one. If you get an error at link time (something
! like 'undefined symbol _system_') try running the fixsystem script in
! this directory. It is a sed script that comments in and out the interface
! block below. Please leave the BLOCK comment lines unchanged.
! !!SYSTEM_BLOCK_EDIT START COMMENTED_OUT
! !#if .not. defined (__GFORTRAN__) .and. .not. defined(__NAG__)
! ! interface block for getting return code back from system() routine
! interface
! function system(string)
! character(len=*) :: string
! integer :: system
! end function system
! end interface
! ! end block
! !#endif
! !!SYSTEM_BLOCK_EDIT END COMMENTED_OUT
! allow global sum to be computed for integers, r4, and r8s
interface sum_across_tasks
module procedure sum_across_tasks_int4
module procedure sum_across_tasks_int8
module procedure sum_across_tasks_real
end interface
! ---- private data for mpi_utilities ----
integer :: myrank = -1 ! my mpi number
integer :: total_tasks = -1 ! total mpi tasks/procs
integer :: my_local_comm = 0 ! duplicate communicator private to this file
integer :: datasize = 0 ! which MPI type corresponds to our r8 definition
integer :: longinttype = 0 ! create an MPI type corresponding to our i8 definition
public :: initialize_mpi_utilities, finalize_mpi_utilities, &
task_count, my_task_id, block_task, restart_task, &
task_sync, array_broadcast, send_to, receive_from, iam_task0, &
broadcast_send, broadcast_recv, shell_execute, sleep_seconds, &
sum_across_tasks, get_dart_mpi_comm, datasize, send_minmax_to, &
get_from_fwd, get_from_mean, broadcast_minmax, broadcast_flag, &
start_mpi_timer, read_mpi_timer, send_sum_to, get_global_max, &
all_reduce_min_max ! deprecated, replace by broadcast_minmax
character(len=*), parameter :: source = 'mpi_utilities_mod.f90'
logical :: module_initialized = .false.
character(len = 256) :: saved_progname = ''
character(len = 128) :: shell_name = '' ! if needed, add ksh, tcsh, bash, etc
integer :: head_task = 0 ! def 0, but N-1 if reverse_task_layout true
logical :: print4status = .true. ! minimal messages for async4 handshake
logical :: given_communicator = .false. ! if communicator passed in, use it
character(len = 256) :: errstring, errstring1
! for broadcasts, pack small messages into larger ones. remember that the
! byte size will be this count * 8 because we only communicate r8s. (unless
! the code is compiled with r8 redefined as r4, in which case it's * 4).
integer, parameter :: PACKLIMIT = 512
! also for broadcasts, make sure message size is not too large. if so,
! split a single request into two or more broadcasts. i know 2G is really
! 2 * 1024 * 1024 * 1024, but err on the conservative side here.
integer, parameter :: BCAST_MAXSIZE = 2 * 1000 * 1000 * 1000
! option for simple send/recvs to limit max single message size.
! split a single request into two or more broadcasts. i know 2G is really
! 2 * 1024 * 1024 * 1024, but err on the conservative side here.
integer, parameter :: SNDRCV_MAXSIZE = 2 * 1000 * 1000 * 1000
! this turns on trace messages for most MPI communications
logical :: verbose = .false. ! very very very verbose, use with care
logical :: async2_verbose = .false. ! messages only for system() in async2
logical :: async4_verbose = .false. ! messages only for block/restart async4
! if your batch system does the task layout backwards, set this to true
! so the last task will communicate with the script in async 4 mode.
! as of now, mpich and mvapich do it forward, openmpi does it backwards.
logical :: reverse_task_layout = .false. ! task 0 on head node; task N-1 if .true.
logical :: separate_node_sync = .false. ! true if tasks & script do not share nodes
logical :: create_local_comm = .true. ! make a private communicator
! for large numbers of MPI tasks, you will get replicated messages, one
! per task, if this is set to true. however, for debugging if you need
! messages from tasks which aren't 0, this will elicit them. error messages
! from any task will print regardless of this setting.
logical :: all_tasks_print = .false. ! by default only msgs from 0 print
! make local array copy for send/recv/bcast. was needed on an old, buggy version
! of the mpi libs but seems unneeded now.
logical :: make_copy_before_sendrecv = .false. ! should not be needed; .true. is very slow
logical :: make_copy_before_broadcast = .false. ! should not be needed; .true. is very slow
! NAMELIST: change the following from .false. to .true. to enable
! the reading of this namelist. This is the only place you need
! to make this change.
logical :: read_namelist = .false.
namelist /mpi_utilities_nml/ reverse_task_layout, all_tasks_print, &
verbose, async2_verbose, async4_verbose, &
shell_name, separate_node_sync, create_local_comm, &
make_copy_before_sendrecv, make_copy_before_broadcast
contains
!-----------------------------------------------------------------------------
! mpi cover routines
!-----------------------------------------------------------------------------
!-----------------------------------------------------------------------------
!> Initialize MPI and query it for global information. Make a duplicate
!> communicator so that any user code which wants to call MPI will not
!> interfere with any outstanding asynchronous requests, accidental tag
!> matches, etc. This routine must be called before any other routine in
!> this file, and it should not be called more than once (but it does have
!> defensive code in case that happens.)
subroutine initialize_mpi_utilities(progname, alternatename, communicator)
character(len=*), intent(in), optional :: progname
character(len=*), intent(in), optional :: alternatename
integer, intent(in), optional :: communicator
integer :: errcode, iunit
logical :: already
if ( module_initialized ) then
! return without calling the code below multiple times
write(errstring, *) 'initialize_mpi_utilities has already been called'
call error_handler(E_WARN,'initialize_mpi_utilities', errstring, source)
return
endif
! prevent any other code from calling into this init
! routine and causing overlapping code execution
module_initialized = .true.
! some implementations of mpich need this to happen before any I/O is done.
! this makes the startup sequence very tricky.
! still, allow for the possibility that the user code has already initialized
! the MPI libs and do not try to call initialize again.
errcode = -999
call MPI_Initialized(already, errcode)
if (errcode /= MPI_SUCCESS) then
write(*, *) 'MPI_Initialized returned error code ', errcode
call exit(-99)
endif
if (.not.already) then
call MPI_Init(errcode)
if (errcode /= MPI_SUCCESS) then
write(*, *) 'MPI_Init returned error code ', errcode
call exit(-99)
endif
endif
if (.not. present(communicator)) then
! give this a temporary initial value, in case we call the abort code.
! later we will dup the world comm and use a private comm for our comms.
my_local_comm = MPI_COMM_WORLD
else
my_local_comm = communicator
given_communicator = .true.
endif
call MPI_Comm_rank(my_local_comm, myrank, errcode)
if (errcode /= MPI_SUCCESS) then
write(*, *) 'MPI_Comm_rank returned error code ', errcode
call exit(-99)
endif
! pass the arguments through so the utilities can log the program name
! only PE0 gets to output, whenever possible.
if (myrank == 0) then
call initialize_utilities(progname, alternatename, .true.)
else
call initialize_utilities(progname, alternatename, .false.)
endif
! save a copy of the initial program name for use in finalize.
! make sure it is not too long to fit into our variable.
if (present(progname)) then
if (len_trim(progname) <= len(saved_progname)) then
saved_progname = trim(progname)
else
saved_progname = progname(1:len(saved_progname))
endif
endif
! if logging, add this info to the log
! (must come after regular utils are initialized)
! this must come AFTER the standard utils are initialized.
! Read the DART namelist for the mpi_utilities.
if (read_namelist) then
call find_namelist_in_file('input.nml', 'mpi_utilities_nml', iunit)
read(iunit, nml = mpi_utilities_nml, iostat = errcode)
call check_namelist_read(iunit, errcode, "mpi_utilities_nml")
else
errstring = ' !must edit assimilation_code/modules/utilities/mpi_utilities_mod.f90 to enable this namelist'
if (do_nml_file()) write(nmlfileunit, '(A)') trim(errstring)
if (do_nml_term()) write( * , '(A)') trim(errstring)
endif
! Record the namelist values used for the run
if (do_nml_file()) write(nmlfileunit, nml=mpi_utilities_nml)
if (do_nml_term()) write( * , nml=mpi_utilities_nml)
! duplicate the world communicator to isolate us from any other user
! calls to MPI. All subsequent mpi calls here will use the local communicator
! and not the global world comm.
if (.not. given_communicator .and. create_local_comm) then
call MPI_Comm_dup(MPI_COMM_WORLD, my_local_comm, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Comm_dup returned error code ', errcode
call error_handler(E_ERR,'initialize_mpi_utilities', errstring, source)
endif
endif
! find out who we are (0 to N-1).
call MPI_Comm_rank(my_local_comm, myrank, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Comm_rank returned error code ', errcode
call error_handler(E_ERR,'initialize_mpi_utilities', errstring, source)
endif
! number of tasks (if 10, returns 10. task id numbers go from 0-9)
call MPI_Comm_size(my_local_comm, total_tasks, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Comm_size returned error code ', errcode
call error_handler(E_ERR,'initialize_mpi_utilities', errstring, source)
endif
! tell the utilities module what task number we are.
call set_tasknum(myrank)
! Turn off non-critical log messages from all but task 0, for performance
! and for sanity (e.g. only one copy of informational messages). can be
! overridden by namelist if desired.
if (all_tasks_print) then
call set_output(.true.) ! everyone gets to talk
else
if (myrank /= 0) call set_output(.false.) ! process 0 speaks for all
endif
! Users have the option of redefining the DART r8 kind to be the same size
! as r4. But when we call the MPI routines we have to know what MPI type
! to use. The internal dart kind 'digits12' is always defined to be *8, so
! if the parameters r8 and digits12 are the same value, use *8. Otherwise
! assume r4 and r8 were defined to be the same and use *4. (Some users
! do this to cut down on the space requirements.)
if (r8 == digits12) then
datasize = MPI_REAL8
!print *, "using real * 8 for datasize of r8"
else
datasize = MPI_REAL4
if (myrank == 0) then
write(errstring, *) "Using real * 4 for datasize of r8"
call error_handler(E_MSG,'initialize_mpi_utilities: ',errstring, source)
endif
endif
! create a type we can use for integer(i8) calls
longinttype = MPI_INTEGER8
! or:
!call MPI_Type_Create_F90_Integer(15, longinttype, errcode)
!if (errcode /= MPI_SUCCESS) then
! write(errstring, '(a,i8)') 'MPI_Type_Create_F90_Integer returned error code ', errcode
! call error_handler(E_ERR,'initialize_mpi_utilities', errstring, source)
!endif
! in async 4 mode, where the controlling job (usually filter) and the
! model are both mpi tasks and they handshake via named pipes, the tasks
! writing and reading the named pipes must be on the same node as the
! one running the start script. many MPI implementations (mpich, mvapich)
! put task 0 on the first node, with the script. some (openmpi)
! lay them out in reverse order, and task N-1 is on the same node
! as the script. if this is wrong, things will hang when the
! filter task tries to advance the model - it won't be able to
! write the 'go ahead' message to the pipe. set this via namelist
! to match what works on your system.
if (reverse_task_layout) then
head_task = total_tasks - 1
else
head_task = 0 ! normal case
endif
! MPI successfully initialized. Log for the record how many tasks.
if (verbose) write(*,*) "PE", myrank, ": MPI successfully initialized"
if (myrank == 0) then
write(errstring, *) 'Running with ', total_tasks, ' MPI processes.'
call error_handler(E_MSG,'initialize_mpi_utilities: ',errstring, source)
endif
end subroutine initialize_mpi_utilities
!-----------------------------------------------------------------------------
!> Shut down MPI cleanly. This must be done before the program exits; on
!> some implementations of MPI the final I/O flushes are not done until this
!> is called. The optional argument can prevent us from calling MPI_Finalize,
!> so that user code can continue to use MPI after this returns. Calling other
!> routines in this file after calling finalize will invalidate your warranty.
subroutine finalize_mpi_utilities(callfinalize, async)
logical, intent(in), optional :: callfinalize
integer, intent(in), optional :: async
integer :: errcode
logical :: dofinalize
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'finalize_mpi_utilities', errstring, source)
endif
! give the async=4 case a chance to tell the script to shut down.
if (present(async)) then
call finished_task(async)
endif
! close the log files and write a timestamp
if (saved_progname /= '') then
call finalize_utilities(saved_progname)
else
call finalize_utilities()
endif
! For the SGI implementation of MPI in particular, sync before shutting
! down MPI. Once MPI is shut down, no other output can be written; it causes
! tasks to hang if they try. Make sure all tasks are here and ready to
! close down at the same time.
call task_sync()
if (.not. given_communicator) then
! Release the private communicator we created at init time.
if (my_local_comm /= MPI_COMM_WORLD) then
call MPI_Comm_free(my_local_comm, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Comm_free returned error code ', errcode
call error_handler(E_ERR,'finalize_mpi_utilities', errstring, source)
endif
endif
my_local_comm = MPI_COMM_WORLD
endif
! If the optional argument is not given, or is given and is true,
! shut down mpi. Only if the argument is specified and is false do we
! skip the finalization.
if (.not.present(callfinalize)) then
dofinalize = .TRUE.
else
dofinalize = callfinalize
endif
! Normally we shut down MPI here. If the user tells us not to shut down MPI
! they must call this routine from their own code before exiting.
if (dofinalize) then
if (verbose) write(*,*) "PE", myrank, ": MPI finalize being called now"
call MPI_Finalize(errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Finalize returned error code ', errcode
call error_handler(E_ERR,'finalize_mpi_utilities', errstring, source)
endif
endif
! NO I/O after calling MPI_Finalize. on some MPI implementations
! this can hang the job.
end subroutine finalize_mpi_utilities
!-----------------------------------------------------------------------------
!> Return the total number of MPI tasks. e.g. if the number of tasks is 4,
!> it returns 4. (The actual task numbers are 0-3.)
function task_count()
integer :: task_count
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'task_count', errstring, source)
endif
task_count = total_tasks
end function task_count
!-----------------------------------------------------------------------------
!> Return my unique task id. Values run from 0 to N-1 (where N is the
!> total number of MPI tasks.
function my_task_id()
integer :: my_task_id
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'my_task_id', errstring, source)
endif
my_task_id = myrank
end function my_task_id
!-----------------------------------------------------------------------------
!> Synchronize all tasks. This subroutine does not return until all tasks
!> execute this line of code.
subroutine task_sync()
integer :: errcode
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'task_sync', errstring, source)
endif
if (verbose) write(*,*) "PE", myrank, ": waiting at MPI Barrier"
call MPI_Barrier(my_local_comm, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Barrier returned error code ', errcode
call error_handler(E_ERR,'task_sync', errstring, source)
endif
if (verbose) write(*,*) "PE", myrank, ": MPI Barrier released"
end subroutine task_sync
!-----------------------------------------------------------------------------
!> Send the srcarray to the destination id.
!> If time is specified, it is also sent in a separate communications call.
!> This is a synchronous call; it will not return until the destination has
!> called receive to accept the data. If the send_to/receive_from calls are
!> not paired correctly the code will hang.
subroutine send_to(dest_id, srcarray, time, label)
integer, intent(in) :: dest_id
real(r8), intent(in) :: srcarray(:)
type(time_type), intent(in), optional :: time
character(len=*), intent(in), optional :: label
integer :: tag, errcode
integer :: itime(2)
integer :: itemcount, offset, nextsize
real(r8), allocatable :: tmpdata(:)
if (verbose) write(*,*) "PE", myrank, ": start of send_to "
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'send_to', errstring, source)
endif
! simple idiotproofing
if ((dest_id < 0) .or. (dest_id >= total_tasks)) then
write(errstring, '(a,i8,a,i8)') "destination task id ", dest_id, &
"must be >= 0 and < ", total_tasks
call error_handler(E_ERR,'send_to', errstring, source)
endif
itemcount = size(srcarray)
if (present(label)) then
write(*,*) trim(label)//" PE", myrank, ": send_to itemsize ", itemcount, " dest ", dest_id
else if (verbose) then
write(*,*) "PE", myrank, ": send_to itemsize ", itemcount, " dest ", dest_id
endif
! use my task id as the tag; unused at this point.
tag = myrank
if (make_copy_before_sendrecv) allocate(tmpdata(min(itemcount, SNDRCV_MAXSIZE)))
if (itemcount <= SNDRCV_MAXSIZE) then
if (verbose) write(*,*) "PE", myrank, ": send_to ", itemcount, " dest ", dest_id
if (.not. make_copy_before_sendrecv) then
call MPI_Ssend(srcarray, itemcount, datasize, dest_id, tag, &
my_local_comm, errcode)
else
! this copy should be unneeded, but on the intel fortran 9.0 compiler and mpich
! on one platform, calling this subroutine with an array section resulted in some
! apparently random memory corruption. making a copy of the data into a local,
! contiguous buffer before send and receive fixed the corruption. this shouldn't
! have been needed, and is a performance/memory sink.
tmpdata = srcarray
call MPI_Ssend(tmpdata, itemcount, datasize, dest_id, tag, &
my_local_comm, errcode)
endif
else
! there are a few places in the code where we send/receive a full state vector.
! as these get really large, they may exceed the limits of the MPI library.
! break large messages up into smaller chunks if needed.
offset = 1
do while (offset <= itemcount)
if (itemcount-offset >= SNDRCV_MAXSIZE) then
nextsize = SNDRCV_MAXSIZE
else if (itemcount-offset == 0) then
nextsize = 1
else
nextsize = itemcount - offset
endif
if (verbose) write(*,*) 'sending array items ', offset, ' thru ' , offset + nextsize - 1
if (.not. make_copy_before_sendrecv) then
call MPI_Ssend(srcarray(offset:offset+nextsize-1), nextsize, datasize, dest_id, tag, &
my_local_comm, errcode)
else
tmpdata = srcarray(offset:offset+nextsize-1)
call MPI_Ssend(tmpdata, nextsize, datasize, dest_id, tag, &
my_local_comm, errcode)
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Ssend returned error code ', errcode
call error_handler(E_ERR,'send_to', errstring, source)
endif
offset = offset + nextsize
enddo
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Ssend returned error code ', errcode
call error_handler(E_ERR,'send_to', errstring, source)
endif
! if time specified, call MPI again to send the 2 time ints.
if (present(time)) then
if (verbose) write(*,*) "PE", myrank, ": time present"
call get_time(time, itime(1), itime(2))
call MPI_Ssend(itime, 2, MPI_INTEGER, dest_id, tag*2, my_local_comm, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Ssend returned error code ', errcode
call error_handler(E_ERR,'send_to', errstring, source)
endif
if (verbose) write(*,*) "PE", myrank, ": sent time to ", dest_id
endif
if (make_copy_before_sendrecv) deallocate(tmpdata)
if (verbose) write(*,*) "PE", myrank, ": end of send_to "
end subroutine send_to
!-----------------------------------------------------------------------------
!> Receive data into the destination array from the src task.
!> If time is specified, it is received in a separate communications call.
!> This is a synchronous call; it will not return until the source has
!> sent the data. If the send_to/receive_from calls are not paired correctly
!> the code will hang.
subroutine receive_from(src_id, destarray, time, label)
integer, intent(in) :: src_id
real(r8), intent(inout) :: destarray(:)
type(time_type), intent(out), optional :: time
character(len=*), intent(in), optional :: label
integer :: tag, errcode
integer :: itime(2)
integer :: status(MPI_STATUS_SIZE)
integer :: itemcount, offset, nextsize
real(r8), allocatable :: tmpdata(:)
if (verbose) write(*,*) "PE", myrank, ": start of receive_from "
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'receive_from', errstring, source)
endif
! simple idiotproofing
if ((src_id < 0) .or. (src_id >= total_tasks)) then
write(errstring, '(a,i8,a,i8)') "source task id ", src_id, &
"must be >= 0 and < ", total_tasks
call error_handler(E_ERR,'receive_from', errstring, source)
endif
itemcount = size(destarray)
if (present(label)) then
write(*,*) trim(label)//" PE", myrank, ": receive_from itemsize ", itemcount, " src ", src_id
else if (verbose) then
write(*,*) "PE", myrank, ": receive_from itemsize ", itemcount, " src ", src_id
endif
! send_to uses its own id as the tag.
tag = src_id
if (verbose) write(*,*) "PE", myrank, ": receive ", itemcount, " src ", src_id
if (make_copy_before_sendrecv) allocate(tmpdata(min(itemcount,SNDRCV_MAXSIZE)))
if (itemcount <= SNDRCV_MAXSIZE) then
if (.not. make_copy_before_sendrecv) then
call MPI_Recv(destarray, itemcount, datasize, src_id, MPI_ANY_TAG, &
my_local_comm, status, errcode)
else
! this copy should be unneeded, but on the intel fortran 9.0 compiler and mpich
! on one platform, calling this subroutine with an array section resulted in some
! apparently random memory corruption. making a copy of the data into a local,
! contiguous buffer before send and receive fixed the corruption. this shouldn't
! have been needed, and is a performance/memory sink.
call MPI_Recv(tmpdata, itemcount, datasize, src_id, MPI_ANY_TAG, &
my_local_comm, status, errcode)
destarray = tmpdata
endif
else
! there are a few places in the code where we send/receive a full state vector.
! as these get really large, they may exceed the limits of the MPI library.
! break large messages up into smaller chunks if needed.
offset = 1
do while (offset <= itemcount)
if (itemcount-offset >= SNDRCV_MAXSIZE) then
nextsize = SNDRCV_MAXSIZE
else if (itemcount-offset == 0) then
nextsize = 1
else
nextsize = itemcount - offset
endif
if (verbose) write(*,*) 'recving array items ', offset, ' thru ' , offset + nextsize - 1
if (.not. make_copy_before_sendrecv) then
call MPI_Recv(destarray(offset:offset+nextsize-1), nextsize, datasize, src_id, MPI_ANY_TAG, &
my_local_comm, status, errcode)
else
call MPI_Recv(tmpdata, itemcount, datasize, src_id, MPI_ANY_TAG, &
my_local_comm, status, errcode)
destarray(offset:offset+nextsize-1) = tmpdata
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Recv returned error code ', errcode
call error_handler(E_ERR,'receive_from', errstring, source)
endif
offset = offset + nextsize
end do
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Recv returned error code ', errcode
call error_handler(E_ERR,'receive_from', errstring, source)
endif
if (verbose) write(*,*) "PE", myrank, ": received from ", src_id
! if time specified, call MPI again to send the 2 time ints.
if (present(time)) then
if (verbose) write(*,*) "PE", myrank, ": time present"
call MPI_Recv(itime, 2, MPI_INTEGER, src_id, tag*2, &
my_local_comm, status, errcode)
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Recv returned error code ', errcode
call error_handler(E_ERR,'receive_from', errstring, source)
endif
if (itime(2) < 0) then
write(errstring, '(a,i8)') 'seconds in settime were < 0; using 0'
call error_handler(E_MSG,'receive_from', errstring, source)
time = set_time(itime(1), 0)
else
time = set_time(itime(1), itime(2))
endif
if (verbose) write(*,*) "PE", myrank, ": received time from ", src_id
endif
if (make_copy_before_sendrecv) deallocate(tmpdata)
if (verbose) write(*,*) "PE", myrank, ": end of receive_from "
end subroutine receive_from
!-----------------------------------------------------------------------------
! NOTE: so far we only seem to be sending real data, not integer, and 1D arrays.
! this could be overloaded to send 2D arrays, and ints if needed.
!> The data array values on the root task will be broadcast to every other
!> task. When this routine returns, all tasks will have the contents of the
!> root array in their own arrays. Thus 'array' is intent(in) on root, and
!> intent(out) on all other tasks.
subroutine array_broadcast(array, root, icount)
real(r8), intent(inout) :: array(:)
integer, intent(in) :: root
integer, intent(in), optional :: icount
integer :: itemcount, errcode, offset, nextsize
real(r8), allocatable :: tmpdata(:)
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'array_broadcast', errstring, source)
endif
! simple idiotproofing
if ((root < 0) .or. (root >= total_tasks)) then
write(errstring, '(a,i8,a,i8)') "root task id ", root, &
"must be >= 0 and < ", total_tasks
call error_handler(E_ERR,'array_broadcast', errstring, source)
endif
! if the actual data to be sent is shorter than the size of 'array',
! there are performance advantages to sending only the actual data
! and not the full array size. (performance tested on an ibm machine.)
! calling code must determine if this is the case and pass in a length
! shorter than the array size.
if (present(icount)) then
if (icount > size(array)) then
write(errstring, '(a,i12)') "number of items to broadcast: ", icount
write(errstring1, '(a,i12)') "cannot be larger than the array size: ", size(array)
call error_handler(E_ERR,'array_broadcast', errstring, source, &
text2=errstring1)
endif
itemcount = icount
else
itemcount = size(array)
endif
if (verbose .and. myrank == root) write(*,*) "PE", myrank, ": bcast itemsize from here ", itemcount
! comment this in only if you really have to. it will flood the output with
! messages from every task except one.
!if (verbose .and. myrank /= root) write(*,*) "PE", myrank, ": bcast itemsize ", itemcount, " root ", root
if (make_copy_before_broadcast) allocate(tmpdata(min(itemcount,BCAST_MAXSIZE)))
! at least one user has run into a limit in the MPI implementation where you
! cannot broadcast too large an array. if the size of this array is too large,
! broadcast it in chunks until all the data has been processed
if (itemcount <= BCAST_MAXSIZE) then
if (.not. make_copy_before_broadcast) then
call MPI_Bcast(array, itemcount, datasize, root, my_local_comm, errcode)
else
if (my_task_id() == root) tmpdata = array
call MPI_Bcast(tmpdata, itemcount, datasize, root, my_local_comm, errcode)
if (my_task_id() /= root) array = tmpdata
endif
else
offset = 1
do while (offset <= itemcount)
if (itemcount-offset >= BCAST_MAXSIZE) then
nextsize = BCAST_MAXSIZE
else if (itemcount-offset == 0) then
nextsize = 1
else
nextsize = itemcount - offset
endif
if (verbose) write(*,*) 'bcasting array items ', offset, ' thru ' , offset + nextsize - 1
! test this - are array sections going to cause array temps to be created?
if (.not. make_copy_before_broadcast) then
call MPI_Bcast(array(offset:offset+nextsize-1), nextsize, datasize, root, my_local_comm, errcode)
else
if (my_task_id() == root) tmpdata = array(offset:offset+nextsize-1)
call MPI_Bcast(tmpdata, nextsize, datasize, root, my_local_comm, errcode)
if (my_task_id() /= root) array(offset:offset+nextsize-1) = tmpdata
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Bcast returned error code ', errcode
call error_handler(E_ERR,'array_broadcast', errstring, source)
endif
offset = offset + nextsize
end do
endif
if (errcode /= MPI_SUCCESS) then
write(errstring, '(a,i8)') 'MPI_Bcast returned error code ', errcode
call error_handler(E_ERR,'array_broadcast', errstring, source)
endif
if (make_copy_before_broadcast) deallocate(tmpdata)
end subroutine array_broadcast
!-----------------------------------------------------------------------------
! DART-specific cover utilities
!-----------------------------------------------------------------------------
!> Return .TRUE. if my local task id is 0, .FALSE. otherwise.
!> (Task numbers in MPI start at 0, contrary to the rules of polite fortran.)
function iam_task0()
logical :: iam_task0
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'iam_task0', errstring, source)
endif
iam_task0 = (myrank == 0)
end function iam_task0
!-----------------------------------------------------------------------------
!> this must be paired with the same number of broadcast_recv()s on all
!> other tasks. it will not return until all tasks in the communications
!> group have made the call.
!>
!> cover routine for array broadcast. one additional sanity check -- make
!> sure the 'from' matches my local task id. also, these arrays are
!> intent(in) here, but they call a routine which is intent(inout) so they
!> must be the same here.
subroutine broadcast_send(from, array1, array2, array3, array4, array5, &
scalar1, scalar2, scalar3, scalar4, scalar5)
integer, intent(in) :: from
! arrays are really only intent(in) here, but must match array_broadcast() call.
real(r8), intent(inout) :: array1(:)
real(r8), intent(inout), optional :: array2(:), array3(:), array4(:), array5(:)
real(r8), intent(inout), optional :: scalar1, scalar2, scalar3, scalar4, scalar5
real(r8) :: packbuf(PACKLIMIT)
real(r8) :: local(5)
logical :: doscalar, morethanone
integer :: itemcount
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'
call error_handler(E_ERR,'broadcast_send', errstring, source)
endif
! simple idiotproofing
if (from /= myrank) then
write(errstring, '(a,i8,a,i8)') "'from' task id ", from, &
"must be same as current task id ", myrank
call error_handler(E_ERR,'broadcast_send', errstring, source)
endif
! for relatively small array sizes, pack them into a single send/recv pair.
call countup(array1, array2, array3, array4, array5, &
scalar1, scalar2, scalar3, scalar4, scalar5, &
itemcount, morethanone, doscalar)
if (itemcount <= PACKLIMIT .and. morethanone) then
call packit(packbuf, array1, array2, array3, array4, array5, doscalar, &
scalar1, scalar2, scalar3, scalar4, scalar5)
call array_broadcast(packbuf, from, itemcount)
else
call array_broadcast(array1, from)
if (morethanone) then
if (present(array2)) call array_broadcast(array2, from)
if (present(array3)) call array_broadcast(array3, from)
if (present(array4)) call array_broadcast(array4, from)
if (present(array5)) call array_broadcast(array5, from)
if (doscalar) then
call packscalar(local, scalar1, scalar2, scalar3, scalar4, scalar5)
call array_broadcast(local, from)
endif
endif
endif
end subroutine broadcast_send
!-----------------------------------------------------------------------------
!> this must be paired with a single broadcast_send() on one other task, and
!> broadcast_recv() on all other tasks, and it must match exactly the number
!> of args in the sending call.
!> it will not return until all tasks in the communications group have
!> made the call.
!>
!> cover routine for array broadcast. one additional sanity check -- make
!> sure the 'from' is not the same as my local task id. these arrays are
!> intent(out) here, but they call a routine which is intent(inout) so they
!> must be the same here.
subroutine broadcast_recv(from, array1, array2, array3, array4, array5, &
scalar1, scalar2, scalar3, scalar4, scalar5)
integer, intent(in) :: from
! arrays are really only intent(out) here, but must match array_broadcast() call.
real(r8), intent(inout) :: array1(:)
real(r8), intent(inout), optional :: array2(:), array3(:), array4(:), array5(:)
real(r8), intent(inout), optional :: scalar1, scalar2, scalar3, scalar4, scalar5
real(r8) :: packbuf(PACKLIMIT)
real(r8) :: local(5)
logical :: doscalar, morethanone
integer :: itemcount
if ( .not. module_initialized ) then
write(errstring, *) 'initialize_mpi_utilities() must be called first'