Skip to content

Commit

Permalink
Merge pull request #1373 from eisenhauer/SstFfsLocal
Browse files Browse the repository at this point in the history
Enable local arrays with FFS marshaling
  • Loading branch information
eisenhauer authored Apr 12, 2019
2 parents 8a84920 + 8db552e commit be7e94f
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 29 deletions.
75 changes: 63 additions & 12 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,25 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
* setup shape of array variable as global (I.E. Count == Shape,
* Start == 0)
*/
for (int i = 0; i < DimCount; i++)
if (Shape)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(0);
VecCount.push_back(Shape[i]);
for (int i = 0; i < DimCount; i++)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(0);
VecCount.push_back(Shape[i]);
}
}
else
{
VecShape = {};
VecStart = {};
for (int i = 0; i < DimCount; i++)
{
VecCount.push_back(Count[i]);
}
}

if (Type == "compound")
{
return (void *)NULL;
Expand Down Expand Up @@ -332,10 +345,29 @@ void SstReader::Init()
{ \
if (m_WriterMarshalMethod == SstMarshalFFS) \
{ \
SstFFSGetDeferred( \
m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
size_t *Start = NULL; \
size_t *Count = NULL; \
size_t DimCount = 0; \
\
if (variable.m_SelectionType == \
adios2::SelectionType::BoundingBox) \
{ \
DimCount = variable.m_Shape.size(); \
Start = variable.m_Start.data(); \
Count = variable.m_Count.data(); \
SstFFSGetDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, Start, \
Count, data); \
} \
else if (variable.m_SelectionType == \
adios2::SelectionType::WriteBlock) \
{ \
DimCount = variable.m_Count.size(); \
Count = variable.m_Count.data(); \
SstFFSGetLocalDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, \
variable.m_BlockID, Count, data); \
} \
SstFFSPerformGets(m_Input); \
} \
if (m_WriterMarshalMethod == SstMarshalBP) \
Expand All @@ -352,10 +384,29 @@ void SstReader::Init()
{ \
if (m_WriterMarshalMethod == SstMarshalFFS) \
{ \
SstFFSGetDeferred( \
m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
size_t *Start = NULL; \
size_t *Count = NULL; \
size_t DimCount = 0; \
\
if (variable.m_SelectionType == \
adios2::SelectionType::BoundingBox) \
{ \
DimCount = variable.m_Shape.size(); \
Start = variable.m_Start.data(); \
Count = variable.m_Count.data(); \
SstFFSGetDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, Start, \
Count, data); \
} \
else if (variable.m_SelectionType == \
adios2::SelectionType::WriteBlock) \
{ \
DimCount = variable.m_Count.size(); \
Count = variable.m_Count.data(); \
SstFFSGetLocalDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, \
variable.m_BlockID, Count, data); \
} \
} \
if (m_WriterMarshalMethod == SstMarshalBP) \
{ \
Expand Down
22 changes: 19 additions & 3 deletions source/adios2/engine/sst/SstWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,26 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values)

if (m_MarshalMethod == SstMarshalFFS)
{
size_t *Shape = NULL;
size_t *Start = NULL;
size_t *Count = NULL;
size_t DimCount = 0;

if (variable.m_ShapeID == ShapeID::GlobalArray)
{
DimCount = variable.m_Shape.size();
Shape = variable.m_Shape.data();
Start = variable.m_Start.data();
Count = variable.m_Count.data();
}
else if (variable.m_ShapeID == ShapeID::LocalArray)
{
DimCount = variable.m_Count.size();
Count = variable.m_Count.data();
}
SstFFSMarshal(m_Output, (void *)&variable, variable.m_Name.c_str(),
variable.m_Type.c_str(), variable.m_ElementSize,
variable.m_Shape.size(), variable.m_Shape.data(),
variable.m_Count.data(), variable.m_Start.data(), values);
variable.m_Type.c_str(), variable.m_ElementSize, DimCount,
Shape, Count, Start, values);
}
else if (m_MarshalMethod == SstMarshalBP)
{
Expand Down
7 changes: 2 additions & 5 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm)
writer_data_t ReturnData;
struct _ReaderActivateMsg Msg;
struct timeval Start, Stop, Diff;
int i;
char *Filename = strdup(Name);
CMConnection rank0_to_rank0_conn = NULL;

Expand Down Expand Up @@ -427,7 +426,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm)
}
}

for (i = 0; i < ReturnData->WriterCohortSize; i++)
for (int i = 0; i < ReturnData->WriterCohortSize; i++)
{
attr_list attrs =
attr_list_from_string(ReturnData->CP_WriterInfo[i]->ContactInfo);
Expand Down Expand Up @@ -933,14 +932,13 @@ static void FreeTimestep(SstStream Stream, long Timestep)

static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep)
{
struct _TimestepMetadataList *Next;
TSMetadataList FoundTS = NULL;
pthread_mutex_lock(&Stream->DataLock);
Next = Stream->Timesteps;
CP_verbose(Stream, "Wait for next metadata after last timestep %d\n",
LastTimestep);
while (1)
{
struct _TimestepMetadataList *Next;
Next = Stream->Timesteps;
while (Next)
{
Expand Down Expand Up @@ -1090,7 +1088,6 @@ static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg,

extern void SstReleaseStep(SstStream Stream)
{
long MaxTimestep;
long Timestep = Stream->ReaderTimestep;
struct _ReleaseTimestepMsg Msg;

Expand Down
82 changes: 76 additions & 6 deletions source/adios2/toolkit/sst/cp/ffs_marshal.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
// Build request structure and enter it into requests list
FFSArrayRequest Req = malloc(sizeof(*Req));
Req->VarRec = Var;
Req->RequestType = Global;
// make a copy of Start and Count request
Req->Start = malloc(sizeof(Start[0]) * Var->DimCount);
memcpy(Req->Start, Start, sizeof(Start[0]) * Var->DimCount);
Expand All @@ -729,8 +730,49 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
}
}

extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const int BlockID, const size_t *Count,
void *Data)
{
struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData;
int GetFromWriter = 0;
FFSVarRec Var = LookupVarByKey(Stream, Variable);

// if Variable is in Metadata (I.E. DimCount == 0), move incoming data to
// Data area
if (DimCount == 0)
{
void *IncomingDataBase =
((char *)Info->MetadataBaseAddrs[GetFromWriter]) +
Var->PerWriterMetaFieldDesc[GetFromWriter]->field_offset;
memcpy(Data, IncomingDataBase,
Var->PerWriterMetaFieldDesc[GetFromWriter]->field_size);
}
else
{
// Build request structure and enter it into requests list
FFSArrayRequest Req = malloc(sizeof(*Req));
memset(Req, 0, sizeof(*Req));
Req->VarRec = Var;
Req->RequestType = Local;
Req->NodeID = BlockID;
// make a copy of Count request
Req->Count = malloc(sizeof(Count[0]) * Var->DimCount);
memcpy(Req->Count, Count, sizeof(Count[0]) * Var->DimCount);
Req->Data = Data;
Req->Next = Info->PendingVarRequests;
Info->PendingVarRequests = Req;
}
}

static int NeedWriter(FFSArrayRequest Req, int i)
{
if (Req->RequestType == Local)
{
return (Req->NodeID == i);
}
// else Global case
for (int j = 0; j < Req->VarRec->DimCount; j++)
{
size_t SelOffset = Req->Start[j];
Expand Down Expand Up @@ -1168,6 +1210,20 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs)
size_t IncomingSize = Reqs->VarRec->PerWriterIncomingSize[i];
int FreeIncoming = 0;

if (Reqs->RequestType == Local)
{
RankOffset = calloc(DimCount, sizeof(RankOffset[0]));
GlobalDimensions =
calloc(DimCount, sizeof(GlobalDimensions[0]));
if (SelOffset == NULL)
{
SelOffset = calloc(DimCount, sizeof(RankOffset[0]));
}
for (int i = 0; i < DimCount; i++)
{
GlobalDimensions[i] = RankSize[i];
}
}
if ((Stream->WriterConfigParams->CompressionMethod ==
SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
Expand Down Expand Up @@ -1244,9 +1300,17 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep)
if (!Info->MetaFormat)
{
struct FFSFormatBlock *Block = malloc(sizeof(*Block));
FMFormat Format = FMregister_simple_format(
Info->LocalFMContext, "MetaData", Info->MetaFields,
FMstruct_size_field_list(Info->MetaFields, sizeof(char *)));
FMStructDescRec struct_list[4] = {
{NULL, NULL, 0, NULL},
{"complex4", fcomplex_field_list, sizeof(fcomplex_struct), NULL},
{"complex8", dcomplex_field_list, sizeof(dcomplex_struct), NULL},
{NULL, NULL, 0, NULL}};
struct_list[0].format_name = "MetaData";
struct_list[0].field_list = Info->MetaFields;
struct_list[0].struct_size =
FMstruct_size_field_list(Info->MetaFields, sizeof(char *));
FMFormat Format =
register_data_format(Info->LocalFMContext, &struct_list[0]);
Info->MetaFormat = Format;
Block->FormatServerRep =
get_server_rep_FMformat(Format, &Block->FormatServerRepLen);
Expand Down Expand Up @@ -1667,7 +1731,7 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData,
VarRec->ElementSize = ElementSize;
VarRec->Variable = Stream->ArraySetupUpcall(
Stream->SetupUpcallReader, ArrayName, Type, meta_base->Dims,
meta_base->Shape, meta_base->Count, meta_base->Offsets);
meta_base->Shape, meta_base->Offsets, meta_base->Count);
}
if (WriterRank == 0)
{
Expand Down Expand Up @@ -1813,9 +1877,15 @@ extern void SstFFSMarshal(SstStream Stream, void *Variable, const char *Name,

/* handle metadata */
MetaEntry->Dims = DimCount;
MetaEntry->Shape = CopyDims(DimCount, Shape);
if (Shape)
MetaEntry->Shape = CopyDims(DimCount, Shape);
else
MetaEntry->Shape = NULL;
MetaEntry->Count = CopyDims(DimCount, Count);
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
if (Offsets)
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
else
MetaEntry->Offsets = NULL;

if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
Expand Down
8 changes: 8 additions & 0 deletions source/adios2/toolkit/sst/cp/ffs_marshal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ typedef struct FFSVarRec
size_t *PerWriterIncomingSize; // important for compression
} * FFSVarRec;

enum FFSRequestTypeEnum
{
Global = 0,
Local = 1
};

typedef struct FFSArrayRequest
{
FFSVarRec VarRec;
enum FFSRequestTypeEnum RequestType;
size_t NodeID;
size_t *Start;
size_t *Count;
void *Data;
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/sst/sst.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const size_t *Start, const size_t *Count,
void *Data);
extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const int BlockID, const size_t *Count,
void *Data);

extern SstStatusValue SstFFSPerformGets(SstStream Stream);

Expand Down
3 changes: 0 additions & 3 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*FFS.*")
# remove Fto anything tests that use CommMin because we can't spec it
list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*CommMin.*")

# remove Local tests that use FFS, this doesn't work yet
list(FILTER SST_TESTS EXCLUDE REGEX "Local.*FFS")

list( LENGTH SST_TESTS afterlistlen )
message (STATUS "Staging tests list before was ${beforelistlen}, after is ${afterlistlen}")

Expand Down
2 changes: 2 additions & 0 deletions testing/adios2/engine/staging-common/TestCommonReadLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8)

writerSize = var_time.Shape()[0];

// std::cout << "Writer size is " << writerSize << std::endl;

int rankToRead = mpiRank;
if (writerSize < mpiSize)
{
Expand Down

0 comments on commit be7e94f

Please sign in to comment.