Skip to content

Commit

Permalink
Fix parallel handling of data in repeated trajectory runs for 'rotate…
Browse files Browse the repository at this point in the history
…' and 'average' (#1113)

* Add test for handling generated data set vs read-in dataset

* Test that a generated set can be used as a read-in set once processing
is complete.

* Add broadcast function

* Ensure set is properly broadcast if it was previously generated but we
are using it in a new trajectory processing run

* Enable parallel dataset handling test

* Add broadcast to the coords frames data set. Make sure we use long int when broadcasting dataset size.

* Add data broadcast to frames coords set

* No need for a broadcast, all data is on the disk

* Add broadcast

* Add broadcast for string set

* Add Sync and Bcast to tensor dataset

* Add bcast

* Add broadcast for vector data

* Add bcast

* Ensure 'average' broadcasts the final average coords set to non-master
ranks for future use. Addresses #1096.

* Version 6.29.7. Revision bump for fixed broadcasting of data created
during trajectory processing in a subsequent trajectory processing run
for rotate and average commands.

---------

Co-authored-by: Daniel R. Roe <daniel.roe@nih.gov>
  • Loading branch information
drroe and Daniel R. Roe authored Nov 5, 2024
1 parent b41f221 commit 65a2fbe
Show file tree
Hide file tree
Showing 37 changed files with 613 additions and 15 deletions.
14 changes: 14 additions & 0 deletions src/Action_Average.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ int Action_Average::SyncAction() {
AvgFrame_.SumToMaster(trajComm_);
if (trajComm_.Master())
Nframes_ = total_frames;
// If setting up a COORDS set, do it here for non-master ranks since Print()
// is only called by the master.
if (crdset_ != 0) {
trajComm_.MasterBcast( &Nframes_, 1, MPI_INT );
AvgFrame_.BcastFrame( trajComm_ );
// Do non-master rank setup of the DataSet here. Master will still do it
// in Print().
if (!trajComm_.Master()) {
AvgFrame_.Divide( (double)Nframes_ );
DataSet_Coords_REF& ref = static_cast<DataSet_Coords_REF&>( *crdset_ );
ref.CoordsSetup( AvgParm_, CoordinateInfo() ); // Coords Only
ref.AddFrame( AvgFrame_ );
}
}
return 0;
}
#endif
Expand Down
6 changes: 5 additions & 1 deletion src/Action_Rotate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ int Action_Rotate::Get3x3Set(DataSetList const& DSL, std::string const& dsname)
mprinterr("Error: No 3x3 matrices data set '%s'\n", dsname.c_str());
return 1;
}
parallelNum_.SetForParallel( rmatrices_ );
if (parallelNum_.SetForParallel( rmatrices_ ))
return 1;
return 0;
}

Expand Down Expand Up @@ -77,6 +78,9 @@ int Action_Rotate::SetupOutputSets(DataSetList& DSL, std::string const& dsname,
/** Initialize action. */
Action::RetType Action_Rotate::Init(ArgList& actionArgs, ActionInit& init, int debugIn)
{
# ifdef MPI
parallelNum_.SetComm( init.TrajComm() );
# endif
double xrot = 0.0, yrot = 0.0, zrot = 0.0;
DataFile* outfile = 0;
std::string output_setname;
Expand Down
4 changes: 2 additions & 2 deletions src/DataSet_Coords_CRD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ int DataSet_Coords_CRD::Sync(size_t total, std::vector<int> const& rank_frames,
int DataSet_Coords_CRD::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_INT );
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
frames_.Resize( totalSize );
Expand Down
17 changes: 17 additions & 0 deletions src/DataSet_Coords_FRM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ int DataSet_Coords_FRM::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data to all processes.
*/
int DataSet_Coords_FRM::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
frames_.resize( totalSize );
}
// Broadcast each matrix separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += frames_[idx].BcastFrame( commIn );
return commIn.CheckError( err );
}
#endif

/** Add a single element. */
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Coords_FRM.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class DataSet_Coords_FRM : public DataSet_Coords {
# ifdef MPI
/// Synchronize all data to the master process
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
/// Print info to stdout
void Info() const { return; }
Expand Down
9 changes: 9 additions & 0 deletions src/DataSet_Coords_REF.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,12 @@ int DataSet_Coords_REF::StripRef(AtomMask const& stripMask) {
delete stripParm; // OK to free, parm has been copied by CoordsSetup.
return 0;
}

#ifdef MPI
/** Broadcast the frame to all processes.
*/
int DataSet_Coords_REF::Bcast(Parallel::Comm const& commIn) {
int err = frame_.BcastFrame( commIn );
return commIn.CheckError( err );
}
#endif /*MPI*/
2 changes: 2 additions & 0 deletions src/DataSet_Coords_REF.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class DataSet_Coords_REF : public DataSet_Coords {
size_t Size() const { if (!frame_.empty()) return 1; else return 0; }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&) { return 1; }
/// Ensure each process has the frame
int Bcast(Parallel::Comm const&);
# endif
void Info() const;
void Add( size_t, const void* ) { return; }
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Coords_TRJ.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class DataSet_Coords_TRJ : public DataSet_Coords {
size_t Size() const { return IDX_.MaxFrames(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&) { return 1; }
/// All data is on disk, no need to broadcast
int Bcast(Parallel::Comm const&) { return 0; }
# endif
void Info() const;
void Add( size_t, const void* ) { return; }
Expand Down
19 changes: 19 additions & 0 deletions src/DataSet_Mat3x3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ int DataSet_Mat3x3::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Mat3x3::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Resize( totalSize );
}
// Broadcast each matrix separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += commIn.MasterBcast( data_[idx].Dptr(), 9, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_Mat3x3.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class DataSet_Mat3x3 : public DataSet {
size_t Size() const { return data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
38 changes: 36 additions & 2 deletions src/DataSet_Tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,43 @@ DataSet_Tensor::DataSet_Tensor() :
{}

#ifdef MPI
int DataSet_Tensor::Sync(size_t, std::vector<int> const&, Parallel::Comm const&)
int DataSet_Tensor::Sync(size_t total, std::vector<int> const& rank_frames,
Parallel::Comm const& commIn)
{
return 1;
if (commIn.Size()==1) return 0;
if (commIn.Master()) {
// Resize to accept data from other ranks.
Data_.resize( total );
int midx = rank_frames[0]; // Index on master
for (int rank = 1; rank < commIn.Size(); rank++) {
for (int ridx = 0; ridx != rank_frames[rank]; ridx++, midx++)
// TODO: Consolidate to 1 send/recv via arrays?
commIn.SendMaster( Data_[midx].Ptr(), 6, rank, MPI_DOUBLE );
}
} else { // Send data to master
for (unsigned int ridx = 0; ridx != Data_.size(); ++ridx)
commIn.SendMaster( Data_[ridx].Ptr(), 6, commIn.Rank(), MPI_DOUBLE );
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Tensor::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast each tensor separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += commIn.MasterBcast( Data_[idx].Ptr(), 6, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif

Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class DataSet_Tensor : public DataSet {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
29 changes: 29 additions & 0 deletions src/DataSet_Vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,33 @@ int DataSet_Vector::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data in given array to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Vector::bcast_Varray(Varray& vecs, Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = vecs.size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
vecs.resize( totalSize );
}
// Broadcast each vector separately
for (unsigned int idx = 0; idx < vecs.size(); idx++)
err += commIn.MasterBcast( vecs[idx].Dptr(), 3, MPI_DOUBLE );
return err;
}

/** Broadcast data to all processes. */
int DataSet_Vector::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
int err = bcast_Varray( vectors_, commIn );
if (!origins_.empty())
err += bcast_Varray( origins_, commIn );
return commIn.CheckError( err );
}

#endif
6 changes: 6 additions & 0 deletions src/DataSet_Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class DataSet_Vector : public DataSet {
size_t Size() const { return vectors_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down Expand Up @@ -64,6 +66,10 @@ class DataSet_Vector : public DataSet {
/// \return Constant for normalization via spherical harmonics addition theorem.
static double SphericalHarmonicsNorm(int);
private:
# ifdef MPI
/// Used to broadcast a Varray
static int bcast_Varray(Varray&, Parallel::Comm const&);
# endif
int order_; ///< Order for spherical harmonics calculations
Varray vectors_;
Varray origins_;
Expand Down
22 changes: 22 additions & 0 deletions src/DataSet_Vector_Scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,26 @@ int DataSet_Vector_Scalar::Sync(size_t total, std::vector<int> const& rank_frame
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Vector_Scalar::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
vecs_.resize( totalSize );
vals_.resize( totalSize );
}
// Broadcast each vector separately
for (unsigned int idx = 0; idx < vecs_.size(); idx++)
err += commIn.MasterBcast( vecs_[idx].Dptr(), 3, MPI_DOUBLE );
// Broadcast data
err += commIn.MasterBcast( &vals_[0], totalSize, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_Vector_Scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class DataSet_Vector_Scalar : public DataSet {
size_t MemUsageInBytes() const;
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
// -------------------------------------------
Vec3 const& Vec(unsigned int i) const { return vecs_[i]; }
Expand Down
16 changes: 16 additions & 0 deletions src/DataSet_double.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,20 @@ int DataSet_double::Sync(size_t total, std::vector<int> const& rank_frames,
commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_DOUBLE );
return 0;
}

/** Broadcast data to all processes.
*/
int DataSet_double::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_double.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class DataSet_double : public DataSet_1D {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
15 changes: 15 additions & 0 deletions src/DataSet_float.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,19 @@ int DataSet_float::Sync(size_t total, std::vector<int> const& rank_frames,
commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_FLOAT );
return 0;
}
/** Broadcast data to all processes.
*/
int DataSet_float::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_FLOAT );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_float.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class DataSet_float : public DataSet_1D {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_integer_disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class DataSet_integer_disk : public DataSet_integer {
size_t Size() const { return nvals_; }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// No need to broadcast, all data on disk
int Bcast(Parallel::Comm const&) { return 0; }
# endif
void Info() const;
int Allocate(SizeArray const&);
Expand Down
16 changes: 16 additions & 0 deletions src/DataSet_integer_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,20 @@ const
//rprintf("DEBUG: Send %zu frames toRank %i tag %i\n", Data_.size(), toRank, tag);
return commIn.Send( (void*)&(Data_[0]), Data_.size(), MPI_INT, toRank, tag );
}

/** Broadcast data to all processes.
*/
int DataSet_integer_mem::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_INT );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_integer_mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class DataSet_integer_mem : public DataSet_integer {
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
int Recv(size_t, unsigned int, int, int, int, Parallel::Comm const&);
int Send(int, int, Parallel::Comm const&) const;
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
Loading

0 comments on commit 65a2fbe

Please sign in to comment.