Skip to content

Commit

Permalink
Enable WF to instruct db-debug creation
Browse files Browse the repository at this point in the history
  • Loading branch information
koparasy committed Jun 7, 2024
1 parent 570c1cb commit 8cfafc4
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 68 deletions.
47 changes: 34 additions & 13 deletions src/AMSlib/wf/basedb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "AMS.h"
#include "debug.h"
#include "wf/debug.h"
#include "wf/resource_manager.hpp"
#include "wf/utils.hpp"
Expand Down Expand Up @@ -127,7 +128,7 @@ class BaseDB

virtual bool updateModel() { return false; }

virtual bool storePredicate() { return false; }
virtual bool storePredicate() const { return false; }
};

/**
Expand Down Expand Up @@ -279,7 +280,7 @@ class csvDB final : public FileDB
bool* predicate = nullptr) override
{
CFATAL(CSV,
predicate == nullptr,
predicate != nullptr,
"CSV database does not support storing uq-predicates")

_store(num_elements, inputs, outputs);
Expand All @@ -292,7 +293,7 @@ class csvDB final : public FileDB
{

CFATAL(CSV,
predicate == nullptr,
predicate != nullptr,
"CSV database does not support storing uq-predicates")

_store(num_elements, inputs, outputs);
Expand Down Expand Up @@ -347,6 +348,8 @@ class hdf5DB final : public FileDB
/** @brief the dataset descriptor of the predicates */
hid_t pSet;

const bool predicateStore;

/** @brief create or get existing hdf5 dataset with the provided name
* storing data as Ckunked pieces. The Chunk value controls the chunking
* performed by HDF5 and thus controls the write performance
Expand Down Expand Up @@ -412,7 +415,10 @@ class hdf5DB final : public FileDB
* @param[in] rId a unique Id for each process taking part in a distributed
* execution (rank-id)
*/
hdf5DB(std::string path, std::string fn, uint64_t rId);
hdf5DB(std::string path,
std::string fn,
uint64_t rId,
bool predicate = false);

/**
* @brief deconstructs the class and closes the file
Expand Down Expand Up @@ -467,7 +473,7 @@ class hdf5DB final : public FileDB
* @brief Returns whether the DB can also store predicate information for debug
* purposes
*/
bool storePredicate() override { return true; }
bool storePredicate() const override { return predicateStore; }
};
#endif

Expand Down Expand Up @@ -589,7 +595,7 @@ class RedisDB : public BaseDB<TypeValue>
{

CFATAL(REDIS,
predicate == nullptr,
predicate != nullptr,
"REDIS database does not support storing uq-predicates")

const size_t num_in = inputs.size();
Expand Down Expand Up @@ -2269,7 +2275,7 @@ class RabbitMQDB final : public BaseDB
bool* predicate = nullptr) override
{
CFATAL(RMQDB,
predicate == nullptr,
predicate != nullptr,
"RMQ database does not support storing uq-predicates")

interface.publish(appDomain, num_elements, inputs, outputs);
Expand All @@ -2281,7 +2287,7 @@ class RabbitMQDB final : public BaseDB
bool* predicate = nullptr) override
{
CFATAL(RMQDB,
predicate == nullptr,
predicate != nullptr,
"RMQ database does not support storing uq-predicates")

interface.publish(appDomain, num_elements, inputs, outputs);
Expand Down Expand Up @@ -2328,11 +2334,12 @@ class FilesystemInterface
{
std::string dbPath;
bool connected;
bool debug;

public:
FilesystemInterface() : connected(false) {}
FilesystemInterface() : connected(false), debug(false) {}

bool connect(std::string& path)
bool connect(std::string& path, bool is_debug)
{
connected = true;
fs::path Path(path);
Expand All @@ -2349,11 +2356,13 @@ class FilesystemInterface
}

dbPath = path;
debug = is_debug;

return true;
}

bool isConnected() const { return connected; }
bool isDebug() const { return debug; }
std::string& path() { return dbPath; }
};

Expand Down Expand Up @@ -2414,6 +2423,8 @@ class DBManager
return fs_interface.isConnected() || rmq_interface.isConnected();
}

bool isDebug() const { return fs_interface.isDebug(); }

/**
* @brief Create an object of the respective database.
* This should never be used for large scale simulations as txt/csv format will
Expand Down Expand Up @@ -2443,7 +2454,10 @@ class DBManager
return std::make_shared<csvDB>(fs_interface.path(), domainName, rId);
#ifdef __ENABLE_HDF5__
case AMSDBType::AMS_HDF5:
return std::make_shared<hdf5DB>(fs_interface.path(), domainName, rId);
return std::make_shared<hdf5DB>(fs_interface.path(),
domainName,
rId,
isDebug());
#endif
#ifdef __ENABLE_RMQ__
case AMSDBType::AMS_RMQ:
Expand Down Expand Up @@ -2507,7 +2521,9 @@ class DBManager
return db;
}

void instantiate_fs_db(AMSDBType type, std::string db_path)
void instantiate_fs_db(AMSDBType type,
std::string db_path,
bool is_debug = false)
{
CWARNING(DBManager,
isInitialized(),
Expand All @@ -2519,7 +2535,12 @@ class DBManager
dbType != AMSDBType::AMS_NONE,
"Setting DBManager default DB when already set")
dbType = type;
if (dbType != AMSDBType::AMS_NONE) fs_interface.connect(db_path);

CWARNING(DBManager,
(is_debug && dbType != AMSDBType::AMS_HDF5),
"Only HDF5 supports debug")

if (dbType != AMSDBType::AMS_NONE) fs_interface.connect(db_path, is_debug);
}
};

Expand Down
14 changes: 7 additions & 7 deletions src/AMSlib/wf/hdf5db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,10 @@ void hdf5DB::_store(size_t num_elements,
createDataSets(num_elements, num_in, num_out);
}

if (HDIsets.size() != num_in || HDOsets.size() != num_out) {
std::cerr << "The data dimensionality is different than the one in the "
"DB\n";
exit(-1);
}
CFATAL(HDF5DB,
(HDIsets.size() != num_in || HDOsets.size() != num_out),
"The data dimensionality is different than the one in the "
"DB")

writeDataToDataset(HDIsets, inputs, num_elements);
writeDataToDataset(HDOsets, outputs, num_elements);
Expand All @@ -175,8 +174,9 @@ void hdf5DB::_store(size_t num_elements,
}


hdf5DB::hdf5DB(std::string path, std::string fn, uint64_t rId)
: FileDB(path, fn, ".h5", rId)
hdf5DB::hdf5DB(std::string path, std::string fn, uint64_t rId, bool predicate)
: FileDB(path, fn, predicate ? ".debug.h5" : ".h5", rId),
predicateStore(predicate)
{
std::error_code ec;
bool exists = fs::exists(this->fn);
Expand Down
104 changes: 56 additions & 48 deletions src/AMSlib/wf/workflow.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#ifdef __ENABLE_MPI__
#include <mpi.h>

#include "wf/redist_load.hpp"
#endif

Expand Down Expand Up @@ -101,7 +102,8 @@ class AMSWorkflow
*/
void Store(size_t num_elements,
std::vector<FPTypeValue *> &inputs,
std::vector<FPTypeValue *> &outputs)
std::vector<FPTypeValue *> &outputs,
bool *predicate = nullptr)
{
// 1 MB of buffer size;
// TODO: Fix magic number
Expand All @@ -116,54 +118,53 @@ class AMSWorkflow
if (!DB) return;

std::vector<FPTypeValue *> hInputs, hOutputs;
bool *hPredicate = nullptr;

if (appDataLoc == AMSResourceType::AMS_HOST)
return DB->store(num_elements, inputs, outputs);

// Compute number of elements that fit inside the buffer
size_t bElements = bSize / sizeof(FPTypeValue);
FPTypeValue *pPtr =
rm.allocate<FPTypeValue>(bElements, AMSResourceType::AMS_PINNED);
// Total inner vector dimensions (inputs and outputs)
size_t totalDims = inputs.size() + outputs.size();
// Compute number of elements of each outer dimension that fit in buffer
size_t elPerDim = static_cast<int>(floor(bElements / totalDims));

for (int i = 0; i < inputs.size(); i++)
hInputs.push_back(&pPtr[i * elPerDim]);

for (int i = 0; i < outputs.size(); i++)
hOutputs.push_back(&pPtr[(i + inputs.size()) * elPerDim]);

// Iterate over all chunks
for (int i = 0; i < num_elements; i += elPerDim) {
size_t actualElems = std::min(elPerDim, num_elements - i);
// Copy input data to host
for (int k = 0; k < numIn; k++) {
rm.copy(&inputs[k][i],
AMSResourceType::AMS_DEVICE,
hInputs[k],
AMSResourceType::AMS_HOST,
actualElems);
}
return DB->store(num_elements, inputs, outputs, predicate);

// Copy output data to host
for (int k = 0; k < numIn; k++) {
rm.copy(&outputs[k][i],
AMSResourceType::AMS_DEVICE,
hOutputs[k],
AMSResourceType::AMS_HOST,
actualElems);
}
for (int i = 0; i < inputs.size(); i++) {
FPTypeValue *pPtr =
rm.allocate<FPTypeValue>(num_elements, AMSResourceType::AMS_HOST);
rm.copy(inputs[i], AMS_DEVICE, pPtr, AMS_HOST, num_elements);
hInputs.push_back(pPtr);
}

// Store to database
DB->store(actualElems, hInputs, hOutputs);
for (int i = 0; i < outputs.size(); i++) {
FPTypeValue *pPtr =
rm.allocate<FPTypeValue>(num_elements, AMSResourceType::AMS_HOST);
rm.copy(outputs[i], AMS_DEVICE, pPtr, AMS_HOST, num_elements);
hOutputs.push_back(pPtr);
}
rm.deallocate(pPtr, AMSResourceType::AMS_PINNED);

if (predicate) {
hPredicate = rm.allocate<bool>(num_elements, AMSResourceType::AMS_HOST);
rm.copy(predicate, AMS_DEVICE, hPredicate, AMS_HOST, num_elements);
}

// Store to database
DB->store(num_elements, hInputs, hOutputs, hPredicate);
rm.deallocate(hInputs, AMSResourceType::AMS_HOST);
rm.deallocate(hOutputs, AMSResourceType::AMS_HOST);
if (predicate) rm.deallocate(hPredicate, AMSResourceType::AMS_HOST);

return;
}

void Store(size_t num_elements,
std::vector<const FPTypeValue *> &inputs,
std::vector<FPTypeValue *> &outputs,
bool *predicate = nullptr)
{
std::vector<FPTypeValue *> mInputs;
for (auto I : inputs) {
mInputs.push_back(const_cast<FPTypeValue *>(I));
}

Store(num_elements, mInputs, outputs, predicate);
}


public:
AMSWorkflow()
: AppCall(nullptr),
Expand Down Expand Up @@ -320,14 +321,14 @@ class AMSWorkflow
}

// The predicate with which we will split the data on a later step
bool *p_ml_acceptable = rm.allocate<bool>(totalElements, appDataLoc);
bool *predicate = rm.allocate<bool>(totalElements, appDataLoc);

// -------------------------------------------------------------
// STEP 1: call the UQ module to look at input uncertainties
// to decide if making a ML inference makes sense
// -------------------------------------------------------------
CALIPER(CALI_MARK_BEGIN("UQ_MODULE");)
UQModel->evaluate(totalElements, origInputs, origOutputs, p_ml_acceptable);
UQModel->evaluate(totalElements, origInputs, origOutputs, predicate);
CALIPER(CALI_MARK_END("UQ_MODULE");)

DBG(Workflow, "Computed Predicates")
Expand All @@ -343,7 +344,6 @@ class AMSWorkflow

DBG(Workflow, "Allocated input resources")

bool *predicate = p_ml_acceptable;

// -----------------------------------------------------------------
// STEP 3: call physics module only where predicate = false
Expand Down Expand Up @@ -410,10 +410,18 @@ class AMSWorkflow

if (DB) {
CALIPER(CALI_MARK_BEGIN("DBSTORE");)
DBG(Workflow,
"Storing data (#elements = %d) to database",
packedElements);
Store(packedElements, packedInputs, packedOutputs);
if (!DB->storePredicate()) {
DBG(Workflow,
"Storing data (#elements = %d) to database",
packedElements);
Store(packedElements, packedInputs, packedOutputs);
} else {
DBG(Workflow,
"Storing data (#elements = %d) to database",
totalElements);
Store(totalElements, origInputs, origOutputs, predicate);
}

CALIPER(CALI_MARK_END("DBSTORE");)
}

Expand All @@ -425,7 +433,7 @@ class AMSWorkflow
for (int i = 0; i < outputDim; i++)
rm.deallocate(packedOutputs[i], appDataLoc);

rm.deallocate(p_ml_acceptable, appDataLoc);
rm.deallocate(predicate, appDataLoc);

DBG(Workflow, "Finished AMSExecution")
CINFO(Workflow,
Expand Down

0 comments on commit 8cfafc4

Please sign in to comment.