From 65a2fbe69f4d03658503f92beeb3a8bba616a569 Mon Sep 17 00:00:00 2001 From: "Daniel R. Roe" Date: Tue, 5 Nov 2024 10:57:10 -0500 Subject: [PATCH] Fix parallel handling of data in repeated trajectory runs for 'rotate' and 'average' (#1113) * Add test for handling generated data set vs read-in dataset * Test that a generated set can be used as a read-in set once processing is complete. * Add broadcast function * Ensure set is properly broadcast if it was previously generated but we are using it in a new trajectory processing run * Enable parallel dataset handling test * Add broadcast to the coords frames data set. Make sure we use long int when broadcasting dataset size. * Add data broadcast to frames coords set * No need for a broadcast, all data is on the disk * Add broadcast * Add broadcast for string set * Add Sync and Bcast to tensor dataset * Add bcast * Add broadcast for vector data * Add bcast * Ensure 'average' broadcasts the final average coords set to non-master ranks for future use. Addresses #1096. * Version 6.29.7. Revision bump for fixed broadcasting of data created during trajectory processing in a subsequent trajectory processing run for rotate and average commands. --------- Co-authored-by: Daniel R. Roe --- src/Action_Average.cpp | 14 +++ src/Action_Rotate.cpp | 6 +- src/DataSet_Coords_CRD.cpp | 4 +- src/DataSet_Coords_FRM.cpp | 17 +++ src/DataSet_Coords_FRM.h | 2 + src/DataSet_Coords_REF.cpp | 9 ++ src/DataSet_Coords_REF.h | 2 + src/DataSet_Coords_TRJ.h | 2 + src/DataSet_Mat3x3.cpp | 19 ++++ src/DataSet_Mat3x3.h | 2 + src/DataSet_Tensor.cpp | 38 ++++++- src/DataSet_Tensor.h | 2 + src/DataSet_Vector.cpp | 29 +++++ src/DataSet_Vector.h | 6 ++ src/DataSet_Vector_Scalar.cpp | 22 ++++ src/DataSet_Vector_Scalar.h | 2 + src/DataSet_double.cpp | 16 +++ src/DataSet_double.h | 2 + src/DataSet_float.cpp | 15 +++ src/DataSet_float.h | 2 + src/DataSet_integer_disk.h | 2 + src/DataSet_integer_mem.cpp | 16 +++ src/DataSet_integer_mem.h | 2 + src/DataSet_string.cpp | 34 ++++++ src/DataSet_string.h | 2 + src/DataSet_unsignedInt.cpp | 16 +++ src/DataSet_unsignedInt.h | 2 + src/Frame.cpp | 22 ++++ src/Frame.h | 1 + src/ParallelSetFrameNum.cpp | 44 ++++++-- src/ParallelSetFrameNum.h | 15 ++- src/SymmetricTensor.h | 1 + src/Version.h | 2 +- test/Makefile | 1 + test/Test_Parallel_Dataset/RunTest.sh | 55 ++++++++++ test/Test_Parallel_Dataset/TZ2.RM.dat.save | 101 ++++++++++++++++++ .../Test_Parallel_Dataset/TZ2.rotate.dat.save | 101 ++++++++++++++++++ 37 files changed, 613 insertions(+), 15 deletions(-) create mode 100755 test/Test_Parallel_Dataset/RunTest.sh create mode 100644 test/Test_Parallel_Dataset/TZ2.RM.dat.save create mode 100644 test/Test_Parallel_Dataset/TZ2.rotate.dat.save diff --git a/src/Action_Average.cpp b/src/Action_Average.cpp index 86b0eb789f..5cc282e787 100644 --- a/src/Action_Average.cpp +++ b/src/Action_Average.cpp @@ -145,6 +145,20 @@ int Action_Average::SyncAction() { AvgFrame_.SumToMaster(trajComm_); if (trajComm_.Master()) Nframes_ = total_frames; + // If setting up a COORDS set, do it here for non-master ranks since Print() + // is only called by the master. + if (crdset_ != 0) { + trajComm_.MasterBcast( &Nframes_, 1, MPI_INT ); + AvgFrame_.BcastFrame( trajComm_ ); + // Do non-master rank setup of the DataSet here. Master will still do it + // in Print(). + if (!trajComm_.Master()) { + AvgFrame_.Divide( (double)Nframes_ ); + DataSet_Coords_REF& ref = static_cast( *crdset_ ); + ref.CoordsSetup( AvgParm_, CoordinateInfo() ); // Coords Only + ref.AddFrame( AvgFrame_ ); + } + } return 0; } #endif diff --git a/src/Action_Rotate.cpp b/src/Action_Rotate.cpp index 786b9c925d..7900575fcd 100644 --- a/src/Action_Rotate.cpp +++ b/src/Action_Rotate.cpp @@ -38,7 +38,8 @@ int Action_Rotate::Get3x3Set(DataSetList const& DSL, std::string const& dsname) mprinterr("Error: No 3x3 matrices data set '%s'\n", dsname.c_str()); return 1; } - parallelNum_.SetForParallel( rmatrices_ ); + if (parallelNum_.SetForParallel( rmatrices_ )) + return 1; return 0; } @@ -77,6 +78,9 @@ int Action_Rotate::SetupOutputSets(DataSetList& DSL, std::string const& dsname, /** Initialize action. */ Action::RetType Action_Rotate::Init(ArgList& actionArgs, ActionInit& init, int debugIn) { +# ifdef MPI + parallelNum_.SetComm( init.TrajComm() ); +# endif double xrot = 0.0, yrot = 0.0, zrot = 0.0; DataFile* outfile = 0; std::string output_setname; diff --git a/src/DataSet_Coords_CRD.cpp b/src/DataSet_Coords_CRD.cpp index 3fda40148d..4ff93f488f 100644 --- a/src/DataSet_Coords_CRD.cpp +++ b/src/DataSet_Coords_CRD.cpp @@ -183,8 +183,8 @@ int DataSet_Coords_CRD::Sync(size_t total, std::vector const& rank_frames, int DataSet_Coords_CRD::Bcast(Parallel::Comm const& commIn) { if (commIn.Size() == 1) return 0; // Assume all data is currently on the master process. - int totalSize = Size(); - int err = commIn.MasterBcast( &totalSize, 1, MPI_INT ); + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); if (!commIn.Master()) { //rprintf("DEBUG: Resizing array to %i\n", totalSize); frames_.Resize( totalSize ); diff --git a/src/DataSet_Coords_FRM.cpp b/src/DataSet_Coords_FRM.cpp index ec9edea873..505c78b3db 100644 --- a/src/DataSet_Coords_FRM.cpp +++ b/src/DataSet_Coords_FRM.cpp @@ -34,6 +34,23 @@ int DataSet_Coords_FRM::Sync(size_t total, std::vector const& rank_frames, } return 0; } + +/** Broadcast data to all processes. + */ +int DataSet_Coords_FRM::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + frames_.resize( totalSize ); + } + // Broadcast each matrix separately + for (unsigned int idx = 0; idx < Size(); idx++) + err += frames_[idx].BcastFrame( commIn ); + return commIn.CheckError( err ); +} #endif /** Add a single element. */ diff --git a/src/DataSet_Coords_FRM.h b/src/DataSet_Coords_FRM.h index 53e89a67f1..edc178ac39 100644 --- a/src/DataSet_Coords_FRM.h +++ b/src/DataSet_Coords_FRM.h @@ -16,6 +16,8 @@ class DataSet_Coords_FRM : public DataSet_Coords { # ifdef MPI /// Synchronize all data to the master process int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif /// Print info to stdout void Info() const { return; } diff --git a/src/DataSet_Coords_REF.cpp b/src/DataSet_Coords_REF.cpp index 10abce711f..87922859c9 100644 --- a/src/DataSet_Coords_REF.cpp +++ b/src/DataSet_Coords_REF.cpp @@ -126,3 +126,12 @@ int DataSet_Coords_REF::StripRef(AtomMask const& stripMask) { delete stripParm; // OK to free, parm has been copied by CoordsSetup. return 0; } + +#ifdef MPI +/** Broadcast the frame to all processes. + */ +int DataSet_Coords_REF::Bcast(Parallel::Comm const& commIn) { + int err = frame_.BcastFrame( commIn ); + return commIn.CheckError( err ); +} +#endif /*MPI*/ diff --git a/src/DataSet_Coords_REF.h b/src/DataSet_Coords_REF.h index 04af48205b..deaa3b6886 100644 --- a/src/DataSet_Coords_REF.h +++ b/src/DataSet_Coords_REF.h @@ -13,6 +13,8 @@ class DataSet_Coords_REF : public DataSet_Coords { size_t Size() const { if (!frame_.empty()) return 1; else return 0; } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&) { return 1; } + /// Ensure each process has the frame + int Bcast(Parallel::Comm const&); # endif void Info() const; void Add( size_t, const void* ) { return; } diff --git a/src/DataSet_Coords_TRJ.h b/src/DataSet_Coords_TRJ.h index d2b3d01da2..341b9c04c5 100644 --- a/src/DataSet_Coords_TRJ.h +++ b/src/DataSet_Coords_TRJ.h @@ -18,6 +18,8 @@ class DataSet_Coords_TRJ : public DataSet_Coords { size_t Size() const { return IDX_.MaxFrames(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&) { return 1; } + /// All data is on disk, no need to broadcast + int Bcast(Parallel::Comm const&) { return 0; } # endif void Info() const; void Add( size_t, const void* ) { return; } diff --git a/src/DataSet_Mat3x3.cpp b/src/DataSet_Mat3x3.cpp index 306eaef9d7..61b9d30dde 100644 --- a/src/DataSet_Mat3x3.cpp +++ b/src/DataSet_Mat3x3.cpp @@ -45,4 +45,23 @@ int DataSet_Mat3x3::Sync(size_t total, std::vector const& rank_frames, } return 0; } + +/** Broadcast data to all processes. + * NOTE: For now, do multiple separate broadcasts for each element. + * In the future this should probably be consolidated. + */ +int DataSet_Mat3x3::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Resize( totalSize ); + } + // Broadcast each matrix separately + for (unsigned int idx = 0; idx < Size(); idx++) + err += commIn.MasterBcast( data_[idx].Dptr(), 9, MPI_DOUBLE ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_Mat3x3.h b/src/DataSet_Mat3x3.h index 063c7b9195..5f3c521982 100644 --- a/src/DataSet_Mat3x3.h +++ b/src/DataSet_Mat3x3.h @@ -12,6 +12,8 @@ class DataSet_Mat3x3 : public DataSet { size_t Size() const { return data_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_Tensor.cpp b/src/DataSet_Tensor.cpp index b572e63d5b..e7454424b2 100644 --- a/src/DataSet_Tensor.cpp +++ b/src/DataSet_Tensor.cpp @@ -7,9 +7,43 @@ DataSet_Tensor::DataSet_Tensor() : {} #ifdef MPI -int DataSet_Tensor::Sync(size_t, std::vector const&, Parallel::Comm const&) +int DataSet_Tensor::Sync(size_t total, std::vector const& rank_frames, + Parallel::Comm const& commIn) { - return 1; + if (commIn.Size()==1) return 0; + if (commIn.Master()) { + // Resize to accept data from other ranks. + Data_.resize( total ); + int midx = rank_frames[0]; // Index on master + for (int rank = 1; rank < commIn.Size(); rank++) { + for (int ridx = 0; ridx != rank_frames[rank]; ridx++, midx++) + // TODO: Consolidate to 1 send/recv via arrays? + commIn.SendMaster( Data_[midx].Ptr(), 6, rank, MPI_DOUBLE ); + } + } else { // Send data to master + for (unsigned int ridx = 0; ridx != Data_.size(); ++ridx) + commIn.SendMaster( Data_[ridx].Ptr(), 6, commIn.Rank(), MPI_DOUBLE ); + } + return 0; +} + +/** Broadcast data to all processes. + * NOTE: For now, do multiple separate broadcasts for each element. + * In the future this should probably be consolidated. + */ +int DataSet_Tensor::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + } + // Broadcast each tensor separately + for (unsigned int idx = 0; idx < Size(); idx++) + err += commIn.MasterBcast( Data_[idx].Ptr(), 6, MPI_DOUBLE ); + return commIn.CheckError( err ); } #endif diff --git a/src/DataSet_Tensor.h b/src/DataSet_Tensor.h index e22956392f..3250fc32e0 100644 --- a/src/DataSet_Tensor.h +++ b/src/DataSet_Tensor.h @@ -25,6 +25,8 @@ class DataSet_Tensor : public DataSet { size_t Size() const { return Data_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_Vector.cpp b/src/DataSet_Vector.cpp index 0041b31127..7e5ac85eef 100644 --- a/src/DataSet_Vector.cpp +++ b/src/DataSet_Vector.cpp @@ -265,4 +265,33 @@ int DataSet_Vector::Sync(size_t total, std::vector const& rank_frames, } return 0; } + +/** Broadcast data in given array to all processes. + * NOTE: For now, do multiple separate broadcasts for each element. + * In the future this should probably be consolidated. + */ +int DataSet_Vector::bcast_Varray(Varray& vecs, Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = vecs.size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + vecs.resize( totalSize ); + } + // Broadcast each vector separately + for (unsigned int idx = 0; idx < vecs.size(); idx++) + err += commIn.MasterBcast( vecs[idx].Dptr(), 3, MPI_DOUBLE ); + return err; +} + +/** Broadcast data to all processes. */ +int DataSet_Vector::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + int err = bcast_Varray( vectors_, commIn ); + if (!origins_.empty()) + err += bcast_Varray( origins_, commIn ); + return commIn.CheckError( err ); +} + #endif diff --git a/src/DataSet_Vector.h b/src/DataSet_Vector.h index 768eaf2303..3a8b0e4c78 100644 --- a/src/DataSet_Vector.h +++ b/src/DataSet_Vector.h @@ -16,6 +16,8 @@ class DataSet_Vector : public DataSet { size_t Size() const { return vectors_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); @@ -64,6 +66,10 @@ class DataSet_Vector : public DataSet { /// \return Constant for normalization via spherical harmonics addition theorem. static double SphericalHarmonicsNorm(int); private: +# ifdef MPI + /// Used to broadcast a Varray + static int bcast_Varray(Varray&, Parallel::Comm const&); +# endif int order_; ///< Order for spherical harmonics calculations Varray vectors_; Varray origins_; diff --git a/src/DataSet_Vector_Scalar.cpp b/src/DataSet_Vector_Scalar.cpp index 351eaf2cec..8070f50d8f 100644 --- a/src/DataSet_Vector_Scalar.cpp +++ b/src/DataSet_Vector_Scalar.cpp @@ -77,4 +77,26 @@ int DataSet_Vector_Scalar::Sync(size_t total, std::vector const& rank_frame } return 0; } + +/** Broadcast data to all processes. + * NOTE: For now, do multiple separate broadcasts for each element. + * In the future this should probably be consolidated. + */ +int DataSet_Vector_Scalar::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + vecs_.resize( totalSize ); + vals_.resize( totalSize ); + } + // Broadcast each vector separately + for (unsigned int idx = 0; idx < vecs_.size(); idx++) + err += commIn.MasterBcast( vecs_[idx].Dptr(), 3, MPI_DOUBLE ); + // Broadcast data + err += commIn.MasterBcast( &vals_[0], totalSize, MPI_DOUBLE ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_Vector_Scalar.h b/src/DataSet_Vector_Scalar.h index a9e00b0729..70b030cfdc 100644 --- a/src/DataSet_Vector_Scalar.h +++ b/src/DataSet_Vector_Scalar.h @@ -20,6 +20,8 @@ class DataSet_Vector_Scalar : public DataSet { size_t MemUsageInBytes() const; # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif // ------------------------------------------- Vec3 const& Vec(unsigned int i) const { return vecs_[i]; } diff --git a/src/DataSet_double.cpp b/src/DataSet_double.cpp index 7f0d65173b..6675452e51 100644 --- a/src/DataSet_double.cpp +++ b/src/DataSet_double.cpp @@ -80,4 +80,20 @@ int DataSet_double::Sync(size_t total, std::vector const& rank_frames, commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_DOUBLE ); return 0; } + +/** Broadcast data to all processes. + */ +int DataSet_double::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + } + // Broadcast data + err += commIn.MasterBcast( &Data_[0], totalSize, MPI_DOUBLE ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_double.h b/src/DataSet_double.h index 5b358c4b0a..5f6af48d10 100644 --- a/src/DataSet_double.h +++ b/src/DataSet_double.h @@ -24,6 +24,8 @@ class DataSet_double : public DataSet_1D { size_t Size() const { return Data_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_float.cpp b/src/DataSet_float.cpp index 4ff1ee7ba4..d9817b7015 100644 --- a/src/DataSet_float.cpp +++ b/src/DataSet_float.cpp @@ -78,4 +78,19 @@ int DataSet_float::Sync(size_t total, std::vector const& rank_frames, commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_FLOAT ); return 0; } +/** Broadcast data to all processes. + */ +int DataSet_float::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + } + // Broadcast data + err += commIn.MasterBcast( &Data_[0], totalSize, MPI_FLOAT ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_float.h b/src/DataSet_float.h index 5d52905bd0..fb7e3f2bd3 100644 --- a/src/DataSet_float.h +++ b/src/DataSet_float.h @@ -18,6 +18,8 @@ class DataSet_float : public DataSet_1D { size_t Size() const { return Data_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_integer_disk.h b/src/DataSet_integer_disk.h index 1adeee04c7..d048e7c9d5 100644 --- a/src/DataSet_integer_disk.h +++ b/src/DataSet_integer_disk.h @@ -12,6 +12,8 @@ class DataSet_integer_disk : public DataSet_integer { size_t Size() const { return nvals_; } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// No need to broadcast, all data on disk + int Bcast(Parallel::Comm const&) { return 0; } # endif void Info() const; int Allocate(SizeArray const&); diff --git a/src/DataSet_integer_mem.cpp b/src/DataSet_integer_mem.cpp index bac4eeca8e..d6ff57e7f1 100644 --- a/src/DataSet_integer_mem.cpp +++ b/src/DataSet_integer_mem.cpp @@ -99,4 +99,20 @@ const //rprintf("DEBUG: Send %zu frames toRank %i tag %i\n", Data_.size(), toRank, tag); return commIn.Send( (void*)&(Data_[0]), Data_.size(), MPI_INT, toRank, tag ); } + +/** Broadcast data to all processes. + */ +int DataSet_integer_mem::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + } + // Broadcast data + err += commIn.MasterBcast( &Data_[0], totalSize, MPI_INT ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_integer_mem.h b/src/DataSet_integer_mem.h index 3d0ac06657..7490c11fce 100644 --- a/src/DataSet_integer_mem.h +++ b/src/DataSet_integer_mem.h @@ -27,6 +27,8 @@ class DataSet_integer_mem : public DataSet_integer { int Sync(size_t, std::vector const&, Parallel::Comm const&); int Recv(size_t, unsigned int, int, int, int, Parallel::Comm const&); int Send(int, int, Parallel::Comm const&) const; + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_string.cpp b/src/DataSet_string.cpp index 36265c188e..c260060a0e 100644 --- a/src/DataSet_string.cpp +++ b/src/DataSet_string.cpp @@ -148,4 +148,38 @@ int DataSet_string::Sync(size_t total, std::vector const& rank_frames, } return 0; } + +/** Broadcast data to all processes. + */ +int DataSet_string::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Broadcast the format width + int maxWidth = format_.Width(); + int err = commIn.MasterBcast( &maxWidth, 1, MPI_INT ); + // Assume all data is currently on the master process. + long int totalSize = Size(); + err += commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + format_.SetWidth( maxWidth ); + } + std::vector strSizes( totalSize, 0 ); + if (commIn.Master()) { + // On master rank, first send other ranks all string sizes + for (unsigned int idx = 0; idx != Size(); idx++) + strSizes[idx] = Data_[idx].size(); + commIn.MasterBcast( &strSizes[0], totalSize, MPI_INT ); + } else { + // On non master ranks, first receive sizes broadcast from master + commIn.MasterBcast( &strSizes[0], totalSize, MPI_INT ); + // Resize each string + for (unsigned int idx = 0; idx != Size(); idx++) + Data_[idx].resize( strSizes[idx] ); + } + // Now broadcast each string + for (unsigned int idx = 0; idx < Size(); idx++) + err += commIn.MasterBcast( &(Data_[idx][0]), strSizes[idx], MPI_CHAR ); + return err; +} #endif diff --git a/src/DataSet_string.h b/src/DataSet_string.h index ae2a6fbc6d..cd349d264a 100644 --- a/src/DataSet_string.h +++ b/src/DataSet_string.h @@ -22,6 +22,8 @@ class DataSet_string : public DataSet { size_t Size() const { return Data_.size(); } # ifdef MPI int Sync(size_t, std::vector const&, Parallel::Comm const&); + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/DataSet_unsignedInt.cpp b/src/DataSet_unsignedInt.cpp index 4fb6cc0ac9..84f0aa427b 100644 --- a/src/DataSet_unsignedInt.cpp +++ b/src/DataSet_unsignedInt.cpp @@ -97,4 +97,20 @@ const //rprintf("DEBUG: Send %zu frames toRank %i tag %i\n", Data_.size(), toRank, tag); return commIn.Send( (void*)&(Data_[0]), Data_.size(), MPI_UNSIGNED, toRank, tag ); } + +/** Broadcast data to all processes. + */ +int DataSet_unsignedInt::Bcast(Parallel::Comm const& commIn) { + if (commIn.Size() == 1) return 0; + // Assume all data is currently on the master process. + long int totalSize = Size(); + int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG ); + if (!commIn.Master()) { + //rprintf("DEBUG: Resizing array to %i\n", totalSize); + Data_.resize( totalSize ); + } + // Broadcast data + err += commIn.MasterBcast( &Data_[0], totalSize, MPI_UNSIGNED ); + return commIn.CheckError( err ); +} #endif diff --git a/src/DataSet_unsignedInt.h b/src/DataSet_unsignedInt.h index fee4f2708c..b9f544c0bf 100644 --- a/src/DataSet_unsignedInt.h +++ b/src/DataSet_unsignedInt.h @@ -29,6 +29,8 @@ class DataSet_unsignedInt : public DataSet_1D { int Sync(size_t, std::vector const&, Parallel::Comm const&); int Recv(size_t, unsigned int, int, int, int, Parallel::Comm const&); int Send(int, int, Parallel::Comm const&) const; + /// Ensure each process has all frames + int Bcast(Parallel::Comm const&); # endif void Info() const { return; } int Allocate(SizeArray const&); diff --git a/src/Frame.cpp b/src/Frame.cpp index efae5a7473..7e3e69363c 100644 --- a/src/Frame.cpp +++ b/src/Frame.cpp @@ -1498,6 +1498,28 @@ int Frame::RecvFrame(int sendrank, Parallel::Comm const& commIn) { return 0; } +/** Broadcast all data from master. */ +int Frame::BcastFrame(Parallel::Comm const& commIn) { + int err = commIn.MasterBcast( X_, ncoord_, MPI_DOUBLE ); + if (V_ != 0) + err += commIn.MasterBcast( V_, ncoord_, MPI_DOUBLE ); + if (F_ != 0) + err += commIn.MasterBcast( F_, ncoord_, MPI_DOUBLE ); + err += box_.BroadcastBox( commIn ); + err += commIn.MasterBcast( &T_, 1, MPI_DOUBLE); + err += commIn.MasterBcast( &pH_, 1, MPI_DOUBLE); + err += commIn.MasterBcast( &redox_, 1, MPI_DOUBLE); + err += commIn.MasterBcast( &time_, 1, MPI_DOUBLE); + int remdIndicesSize = remd_indices_.size(); + err += commIn.MasterBcast( &remdIndicesSize, 1, MPI_INT ); + remd_indices_.resize( remdIndicesSize ); + err += commIn.MasterBcast( &remd_indices_[0], remd_indices_.size(), MPI_INT ); + err += commIn.MasterBcast( &repidx_, 1, MPI_INT); + err += commIn.MasterBcast( &crdidx_, 1, MPI_INT); + err += commIn.MasterBcast( &step_, 1, MPI_INT); + return 0; +} + /** Sum across all ranks, store in master. */ int Frame::SumToMaster(Parallel::Comm const& commIn) { if (commIn.Master()) { diff --git a/src/Frame.h b/src/Frame.h index 2247b78806..e83ab145eb 100644 --- a/src/Frame.h +++ b/src/Frame.h @@ -280,6 +280,7 @@ class Frame { // ----- Parallel Routines ------------------- int SendFrame(int, Parallel::Comm const&) const; int RecvFrame(int, Parallel::Comm const&); + int BcastFrame(Parallel::Comm const&); int SumToMaster(Parallel::Comm const&); # endif private: diff --git a/src/ParallelSetFrameNum.cpp b/src/ParallelSetFrameNum.cpp index 53e841bba5..4a3dd447f4 100644 --- a/src/ParallelSetFrameNum.cpp +++ b/src/ParallelSetFrameNum.cpp @@ -3,19 +3,49 @@ #include "DataSet.h" using namespace Cpptraj; -void ParallelSetFrameNum::SetForParallel(DataSet const* setIn) { +int ParallelSetFrameNum::SetForParallel(DataSet* setIn) { set_ = setIn; # ifdef MPI - // In parallel, number to use will depend on whether the set exists or not - if (set_->Size() > 0) - exists_ = true; - else + // In parallel, number to use will depend on whether the set exists or not. + // Also check if the set needs to be synced. If it does, then it really + // does not exist yet but will be generated. + if (set_->Size() == 0 && set_->NeedsSync()) exists_ = false; + else + exists_ = true; + // In parallel if the set exists it must be the same on all processes. + if (exists_) { + if (trajComm_.IsNull()) { + mprinterr("Internal Error: ParallelSetFrameNum::SetForParallel(): Null communicator.\n"); + return 1; + } + long int mysize = set_->Size(); + std::vector sizeOnRank( trajComm_.Size() ); + trajComm_.AllGather( &mysize, 1, MPI_LONG, &sizeOnRank[0] ); + bool set_needs_bcast = false; + for (int rank = 1; rank < trajComm_.Size(); rank++) { + if (sizeOnRank[rank] != sizeOnRank[0]) { +# ifdef DEBUG_CPPTRAJ_PARALLELSETFRAMENUM + mprintf("DEBUG: Size on rank (%li) does not match rank 0 (%li)\n", sizeOnRank[rank], sizeOnRank[0]); +# endif + set_needs_bcast = true; + break; + } + } + if (set_needs_bcast) { + int err = set_->Bcast(trajComm_); + if (trajComm_.CheckError(err)) { + mprinterr("Internal Error: ParallelSetFrameNum::SetForParallel(): DataSet broadcast error.\n"); + return 1; + } + } + } # ifdef DEBUG_CPPTRAJ_PARALLELSETFRAMENUM if (!exists_) - rprintf("DEBUG: Set '%s' does not yet exist.\n", set_->legend()); + rprintf("DEBUG: Set '%s' does not yet exist (needs sync %i).\n", set_->legend(), (int)set_->NeedsSync()); else - rprintf("DEBUG: Set '%s' exists with size %zu.\n", set_->legend(), set_->Size()); + rprintf("DEBUG: Set '%s' exists with size %zu (needs sync %i).\n", set_->legend(), set_->Size(), (int)set_->NeedsSync()); # endif # endif /*MPI */ + return 0; } diff --git a/src/ParallelSetFrameNum.h b/src/ParallelSetFrameNum.h index 535980dce8..2b652d856d 100644 --- a/src/ParallelSetFrameNum.h +++ b/src/ParallelSetFrameNum.h @@ -1,5 +1,8 @@ #ifndef INC_PARALLELSETFRAMENUM_H #define INC_PARALLELSETFRAMENUM_H +#ifdef MPI +# include "Parallel.h" +#endif class DataSet; namespace Cpptraj { /// Used to pick the correct frame from a data set in parallel @@ -10,13 +13,18 @@ namespace Cpptraj { * In case 1, the data set should be accessed using ActionFrame::TrajoutNum(). * In case 2, the data set should be accessed using the frameNum variable * since the set has not yet been synced. + * The SetForParallel() routine will also ensure the data set is broadcast + * to all processes if it needs to be. */ class ParallelSetFrameNum { public: /// CONSTRUCTOR ParallelSetFrameNum() : set_(0), exists_(false) {} +# ifdef MPI + void SetComm(Parallel::Comm const& commIn) { trajComm_ = commIn; } +# endif /// Initialize with DataSet, record if it has any data yet. - void SetForParallel(DataSet const*); + int SetForParallel(DataSet*); /// \return Correct frame number in parallel based on whether set exists or is being generated int ParallelFrameNum(int frameNum, int trajoutNum) const { # ifdef MPI @@ -31,8 +39,11 @@ class ParallelSetFrameNum { # endif } private: - DataSet const* set_; ///< The DataSet in question + DataSet* set_; ///< The DataSet in question bool exists_; ///< True if set exists, false if it is being generated. +# ifdef MPI + Parallel::Comm trajComm_; ///< The communicator associated with the set +# endif }; } // END namespace Cpptraj #endif diff --git a/src/SymmetricTensor.h b/src/SymmetricTensor.h index b6c10b7640..c6dbc006b6 100644 --- a/src/SymmetricTensor.h +++ b/src/SymmetricTensor.h @@ -29,6 +29,7 @@ template class SymmetricTensor { T const& U23() const { return U_[5]; } T const* Ptr() const { return U_; } + T* Ptr() { return U_; } private: T U_[6]; }; diff --git a/src/Version.h b/src/Version.h index b8aecef7f6..dbf3238cbc 100644 --- a/src/Version.h +++ b/src/Version.h @@ -12,7 +12,7 @@ * Whenever a number that precedes is incremented, all subsequent * numbers should be reset to 0. */ -#define CPPTRAJ_INTERNAL_VERSION "V6.29.6" +#define CPPTRAJ_INTERNAL_VERSION "V6.29.7" /// PYTRAJ relies on this #define CPPTRAJ_VERSION_STRING CPPTRAJ_INTERNAL_VERSION #endif diff --git a/test/Makefile b/test/Makefile index 2931fb985c..8c9754e1a4 100644 --- a/test/Makefile +++ b/test/Makefile @@ -336,6 +336,7 @@ test.rotatedihedral: test.dataset: @-cd Test_DataSetCmd && ./RunTest.sh $(OPT) + @-cd Test_Parallel_Dataset && ./RunTest.sh $(OPT) test.remap: @-cd Test_Remap && ./RunTest.sh $(OPT) diff --git a/test/Test_Parallel_Dataset/RunTest.sh b/test/Test_Parallel_Dataset/RunTest.sh new file mode 100755 index 0000000000..d52e9ee538 --- /dev/null +++ b/test/Test_Parallel_Dataset/RunTest.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +TESTNAME='Test handling of read-in vs generated data sets' + +# The purpose of these tests is to make sure that CPPTRAJ consistently +# handles read-in data sets and generated data sets the same way in +# both serial and parallel. + +. ../MasterTest.sh + +CleanFiles cpptraj.in TZ2.RM.dat TZ2.rotate.dat TZ2.rotate.fromset.dat TZ2.rotate2.dat + +INPUT='-i cpptraj.in' + +UNITNAME="Handling of generated dataset with 'rotate' command" +CheckFor netcdf +if [ $? -eq 0 ] ; then + cat > cpptraj.in < cpptraj.in <