Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Xrootd remote fixes #4167

Merged
merged 4 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/hello/hdf5SubFile/hdf5SubFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
#include <iostream> //std::cout
#include <mpi.h>
#include <stdexcept> //std::invalid_argument std::exception
#ifndef _MSC_VER
#include <unistd.h>
#endif
anagainaru marked this conversation as resolved.
Show resolved Hide resolved
#include <vector>

void writeMe(adios2::IO &hdf5IO, int rank, int size, const char *testFileName)
Expand All @@ -34,8 +36,8 @@ void writeMe(adios2::IO &hdf5IO, int rank, int size, const char *testFileName)
const std::size_t Nx = 1024;
const std::size_t Ny = 1024 * scale;

std::vector<float> myFloats(Nx * Ny, 0.1 * rank);
std::vector<int> myInts(Nx * Ny, 1 + rank);
std::vector<float> myFloats(Nx * Ny, 0.1f * rank);
std::vector<int> myInts(Nx * Ny, (int)(1 + rank));

hdf5IO.SetParameter("IdleH5Writer",
"true"); // set this if not all ranks are writting
Expand Down Expand Up @@ -101,7 +103,7 @@ void ReadVarData(adios2::IO h5IO, adios2::Engine &h5Reader, const std::string &n

if (var)
{
int nDims = var.Shape().size();
int nDims = (int)var.Shape().size();
size_t totalSize = 1;
for (int i = 0; i < nDims; i++)
{
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void BP5Reader::PerformGets()
if (getenv("DoXRootD"))
{
m_Remote = std::unique_ptr<XrootdRemote>(new XrootdRemote());
m_Remote->Open("localhost", 1049, m_Name, m_OpenMode, RowMajorOrdering);
m_Remote->Open("localhost", 1094, m_Name, m_OpenMode, RowMajorOrdering);
anagainaru marked this conversation as resolved.
Show resolved Hide resolved
}
else
#endif
Expand Down Expand Up @@ -535,7 +535,7 @@ void BP5Reader::Init()
// Don't try to open the remote file when we open local metadata. Do that on demand.
if (!m_Parameters.RemoteDataPath.empty())
m_dataIsRemote = true;
if (getenv("DoRemote"))
if (getenv("DoRemote") || getenv("DoXRootD"))
m_dataIsRemote = true;
}

Expand Down
6 changes: 3 additions & 3 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t Blo
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
CMCondition_wait(ev_state.cm, GetMsg.GetResponseCondition);
return GetMsg.GetResponseCondition;
return (Remote::GetHandle)(intptr_t)GetMsg.GetResponseCondition;
}

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
Expand All @@ -186,12 +186,12 @@ EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest
ReadMsg.Dest = Dest;
CMwrite(m_conn, ev_state.ReadRequestFormat, &ReadMsg);
CMCondition_wait(ev_state.cm, ReadMsg.ReadResponseCondition);
return ReadMsg.ReadResponseCondition;
return (Remote::GetHandle)(intptr_t)ReadMsg.ReadResponseCondition;
}

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)handle);
return CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);
}
#else

Expand Down
2 changes: 0 additions & 2 deletions source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class EVPathRemote : public Remote

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

bool WaitForGet(GetHandle handle);
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Remote::GetHandle Remote::Get(char *VarName, size_t Step, size_t BlockID, Dims &
void *dest)
{
ThrowUp("RemoteGet");
return 0;
return (Remote::GetHandle)(intptr_t)0;
};

bool Remote::WaitForGet(GetHandle handle)
Expand All @@ -49,7 +49,7 @@ bool Remote::WaitForGet(GetHandle handle)
Remote::GetHandle Remote::Read(size_t Start, size_t Size, void *Dest)
{
ThrowUp("RemoteRead");
return 0;
return (Remote::GetHandle)0;
};
Remote::~Remote() {}
Remote::Remote() {}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Remote
virtual void OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename);

typedef int GetHandle;
typedef void *GetHandle;

virtual GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest);
Expand Down
45 changes: 31 additions & 14 deletions source/adios2/toolkit/remote/XrootdRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* accompanying file Copyright.txt for details.
* ganyushin@gmail.com
*/
#include <future>

#include "XrootdRemote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/helper/adiosLog.h"
Expand Down Expand Up @@ -100,6 +102,8 @@ class myRequest : public XrdSsiRequest
static myRequest *currentRequest;
char *responseBuffer;
int responseBufferLen;
char *dest;
std::promise<void> *promise;

private:
XrdSsiResource rSpec;
Expand Down Expand Up @@ -130,8 +134,9 @@ void myRequest::Alert(XrdSsiRespInfoMsg &aMsg)

// Print what we received
//
fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %d bytes alert: '%s'\n", rName, GetEndPoint().c_str(),
theMsz, theMsg);
// fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %d bytes alert: '%s'\n", rName,
// GetEndPoint().c_str(),
// theMsz, theMsg);

// Recycle the message
//
Expand Down Expand Up @@ -199,15 +204,15 @@ bool myRequest::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo
// theMD = GetMetadata(theML);
if (rInfo.mdlen)
{
fprintf(XrdSsiCl::outFile,
"%s@%s: Rcvd %s response "
"with %d metabytes '%s'\n",
rName, GetEndPoint().c_str(), rInfo.State(), rInfo.mdlen, rInfo.mdata);
// fprintf(XrdSsiCl::outFile,
// "%s@%s: Rcvd %s response "
// "with %d metabytes '%s'\n",
// rName, GetEndPoint().c_str(), rInfo.State(), rInfo.mdlen, rInfo.mdata);
}
else
{
fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %s response\n", rName, GetEndPoint().c_str(),
rInfo.State());
// fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %s response\n", rName, GetEndPoint().c_str(),
// rInfo.State());
}

// While a response can have one of several forms a good response can only be
Expand All @@ -232,6 +237,7 @@ void myRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int
"%s@%s; %s\n",
rName, GetEndPoint().c_str(), eInfo.Get().c_str());
Finished();
promise->set_value();
delete this;
return;
}
Expand All @@ -256,12 +262,12 @@ void myRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int

// End with new line character
//
fprintf(XrdSsiCl::outFile, "\nReceived %d bytes from %s@%s\n", totbytes, rName,
GetEndPoint().c_str());
memcpy(dest, responseBuffer, responseBufferLen);

// We are done with our request. We avoid calling Finished if we got here
// because we were cancelled.
//
promise->set_value();
Finished();
// delete this;
}
Expand Down Expand Up @@ -325,11 +331,19 @@ void XrootdRemote::Open(const std::string hostname, const int32_t port, const st
return;
}

bool XrootdRemote::WaitForGet(GetHandle handle)
{
std::promise<void> *p = (std::promise<void> *)handle;
p->get_future().wait();
delete p;
return true;
}

Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
#ifdef ADIOS2_HAVE_XROOTD
char rName[512] = "/adios";
char rName[512] = "/home/eisen/xroot/data";
XrdSsiResource rSpec((std::string)rName);
myRequest *reqP;
std::string reqData = "get " + fileName + " " + std::string(VarName);
Expand Down Expand Up @@ -366,15 +380,18 @@ Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID,
char *reqDataStr = strdup(reqData.c_str());
reqP = new myRequest(clUI, rName, GetReqID(), reqDataStr, reqLen);
reqP->SetResource(rSpec);
reqP->dest = (char *)dest;
reqP->promise = new std::promise<void>();
// We simply hand off the request to the service to deal with it. When a
// response is ready or an error occured our callback is invoked.
//
clUI.ssiService->ProcessRequest(*reqP, rSpec);
// thread synchronization
sleep(1);
memcpy(dest, reqP->responseBuffer, reqP->responseBufferLen);
WaitForGet((void *)(reqP->promise));
return (intptr_t)0;
#else
return (intptr_t)0;
#endif
return 0;
}

XrootdRemote::GetHandle XrootdRemote::Read(size_t Start, size_t Size, void *Dest)
Expand Down
3 changes: 1 addition & 2 deletions source/adios2/toolkit/remote/XrootdRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@ class XrootdRemote : public Remote
void Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

GetHandle Read(size_t Start, size_t Size, void *Dest);
bool WaitForGet(GetHandle handle);
};

} // end namespace adios2
Expand Down
9 changes: 4 additions & 5 deletions source/utils/xrootd-plugin/XrdSsiSvService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
m_engine = m_io.Open(reqData, adios2::Mode::ReadRandomAccess);
std::string VarName = requestParams[0];
auto var = m_io.InquireVariable(VarName);

adios2::DataType TypeOfVar = m_io.InquireVariableType(VarName);
try
{
Expand All @@ -469,7 +468,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
adios2::Variable<T> var = m_io.InquireVariable<T>(VarName); \
std::vector<T> resBuffer; \
size_t step = std::stoi(requestParams[1]); \
var.SetStepSelection({step, step + 1}); \
var.SetStepSelection({step, 1}); \
size_t paramLength = (requestParams.size() - 3) / 2; \
adios2::Dims s(paramLength); \
adios2::Dims c(paramLength); \
Expand All @@ -482,9 +481,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
var.SetSelection(varSel); \
m_engine.Get(var, resBuffer, adios2::Mode::Sync); \
size_t responseSize = resBuffer.size(); \
responseBuffer = new char[responseSize * sizeof(float)]; \
responseBufferSize = responseSize * sizeof(float); \
memcpy(responseBuffer, resBuffer.data(), responseSize * sizeof(float)); \
responseBuffer = new char[responseSize * sizeof(T)]; \
responseBufferSize = responseSize * sizeof(T); \
memcpy(responseBuffer, resBuffer.data(), responseSize * sizeof(T)); \
XrdSysThread::Run(&tid, SvAdiosGet, (void *)this, 0, "get"); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(GET)
Expand Down
Loading