Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the buffer copy logic outside operators #3514

Merged
merged 6 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/adios2/core/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Dims Operator::ConvertDims(const Dims &dimensions, const DataType type,
return ret;
}

size_t Operator::GetHeaderSize() const { return 0; }

// PRIVATE
void Operator::CheckCallbackType(const std::string type) const
{
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/core/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class Operator
const std::string &, const size_t, const Dims &,
const Dims &, const Dims &) const;

virtual size_t GetHeaderSize() const;

/**
* @param dataIn
* @param blockStart
Expand Down
10 changes: 10 additions & 0 deletions source/adios2/helper/adiosMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ namespace adios2
namespace helper
{

size_t CopyMemoryWithOpHeader(const char *src, const Dims &blockCount,
const DataType type, char *dest,
size_t destOffset, const MemorySpace memSpace)
{
const size_t sizeIn = GetTotalSize(blockCount, GetDataTypeSize(type));
CopyContiguousMemory(src, sizeIn, dest + destOffset,
/* endianReverse */ false, memSpace);
return destOffset + sizeIn;
}

namespace
{

Expand Down
4 changes: 4 additions & 0 deletions source/adios2/helper/adiosMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ void ClipContiguousMemory(T *dest, const Dims &destStart, const Dims &destCount,
const bool endianReverse = false,
const MemorySpace memSpace = MemorySpace::Host);

size_t CopyMemoryWithOpHeader(const char *src, const Dims &blockCount,
const DataType type, char *dest,
size_t headerSize, const MemorySpace memSpace);

template <class T>
void CopyContiguousMemory(const char *src, const size_t stride, T *dest,
const bool endianReverse = false,
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/helper/adiosMemory.inl
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,11 @@ template <class T>
void CopyContiguousMemory(const char *src, const size_t payloadStride, T *dest,
const bool endianReverse, const MemorySpace memSpace)
{
if (payloadStride == 0)
{
return;
}

#ifdef ADIOS2_HAVE_GPU_SUPPORT
if (memSpace == MemorySpace::GPU)
{
Expand Down
12 changes: 10 additions & 2 deletions source/adios2/operator/OperatorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,23 @@ std::shared_ptr<Operator> MakeOperator(const std::string &type,
}

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op)
MemorySpace memSpace, std::shared_ptr<Operator> op)
{
Operator::OperatorType compressorType;
std::memcpy(&compressorType, bufferIn, 1);
if (op == nullptr || op->m_TypeEnum != compressorType)
{
op = MakeOperator(OperatorTypeToString(compressorType), {});
}
return op->InverseOperate(bufferIn, sizeIn, dataOut);
size_t sizeOut = op->InverseOperate(bufferIn, sizeIn, dataOut);
if (sizeOut == 0) // the inverse operator was not applied
{
size_t headerSize = op->GetHeaderSize();
sizeOut = sizeIn - headerSize;
helper::CopyContiguousMemory(bufferIn + headerSize, sizeOut, dataOut,
/*endianReverse*/ false, memSpace);
}
return sizeOut;
}

} // end namespace core
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/OperatorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ std::shared_ptr<Operator> MakeOperator(const std::string &type,
const Params &parameters);

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op = nullptr);
MemorySpace memSpace, std::shared_ptr<Operator> op = nullptr);

} // end namespace core
} // end namespace adios2
23 changes: 17 additions & 6 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,27 @@ size_t CompressBlosc::Operate(const char *dataIn, const Dims &blockStart,
}
}

headerSize = bufferOutOffset;
if (useMemcpy)
{
std::memcpy(bufferOut + bufferOutOffset, dataIn + inputOffset, sizeIn);
bufferOutOffset += sizeIn;
headerPtr->SetNumChunks(0u);
bufferOutOffset = 0; // indicate that the operator was not applied
}

blosc2_destroy();
return bufferOutOffset;
}

size_t CompressBlosc::GetHeaderSize() const { return headerSize; }

size_t CompressBlosc::InverseOperate(const char *bufferIn, const size_t sizeIn,
char *dataOut)
{
size_t bufferInOffset = 1; // skip operator type
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes
headerSize = bufferInOffset;

if (bufferVersion == 1)
{
Expand Down Expand Up @@ -263,6 +266,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
bool isCompressed = true;

m_VersionInfo =
" Data is compressed using BLOSC Version " +
Expand Down Expand Up @@ -294,18 +298,26 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
DecompressOldFormat(bufferIn + bufferInOffset,
sizeIn - bufferInOffset, dataOut, sizeOut);
}
if (decompressedSize == 0) // the decompression was not applied
{
isCompressed = false;
decompressedSize = bufferDecompressedSize;
}
if (decompressedSize != sizeOut)
{
helper::Throw<std::runtime_error>("Operator", "CompressBlosc",
"DecompressV1", m_VersionInfo);
}
headerSize += sizeIn - sizeOut;
if (!isCompressed)
return 0; // indicate that the inverse operator was not applied
return sizeOut;
}

size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
const size_t sizeIn,
char *dataOut,
const size_t sizeOut) const
const size_t sizeOut)
{
const DataHeader *dataPtr = reinterpret_cast<const DataHeader *>(bufferIn);
uint32_t num_chunks = dataPtr->GetNumChunks();
Expand Down Expand Up @@ -386,9 +398,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
}
else
{
std::memcpy(dataOut, inputDataBuff, inputDataSize);
currentOutputSize = inputDataSize;
inputOffset += inputDataSize;
bufferDecompressedSize = inputDataSize;
return 0; // the inverse operator was not applied
}

assert(currentOutputSize == uncompressedSize);
Expand Down
6 changes: 5 additions & 1 deletion source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ class CompressBlosc : public Operator

bool IsDataTypeValid(const DataType type) const final;

size_t GetHeaderSize() const;

private:
using bloscSize_t = int32_t;
size_t headerSize = 0;
size_t bufferDecompressedSize = 0;

/** Decompress chunked data */
size_t DecompressChunkedFormat(const char *bufferIn, const size_t sizeIn,
char *dataOut, const size_t sizeOut) const;
char *dataOut, const size_t sizeOut);

/** Decompress data written before ADIOS2 supported large variables larger
* 2GiB. */
Expand Down
49 changes: 38 additions & 11 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ size_t CompressMGARD::Operate(const char *dataIn, const Dims &blockStart,
double s = 0.0;
auto errorBoundType = mgard_x::error_bound_type::REL;

// input size under this bound will not compress
size_t thresholdSize = 100000;

auto itThreshold = m_Parameters.find("threshold");
if (itThreshold != m_Parameters.end())
{
thresholdSize = std::stod(itThreshold->second);
}
auto itAccuracy = m_Parameters.find("accuracy");
if (itAccuracy != m_Parameters.end())
{
Expand Down Expand Up @@ -142,15 +150,26 @@ size_t CompressMGARD::Operate(const char *dataIn, const Dims &blockStart,
// let mgard know the output buffer size
size_t sizeOut =
helper::GetTotalSize(blockCount, helper::GetDataTypeSize(type));

if (sizeOut < thresholdSize)
{
/* disable compression and add marker in the header*/
PutParameter(bufferOut, bufferOutOffset, false);
headerSize = bufferOutOffset;
return 0;
}

PutParameter(bufferOut, bufferOutOffset, true);
void *compressedData = bufferOut + bufferOutOffset;
mgard_x::compress(mgardDim, mgardType, mgardCount, tolerance, s,
errorBoundType, dataIn, compressedData, sizeOut, true);

bufferOutOffset += sizeOut;

return bufferOutOffset;
}

size_t CompressMGARD::GetHeaderSize() const { return headerSize; }

size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut)
{
Expand All @@ -175,6 +194,8 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

const bool isCompressed = GetParameter<bool>(bufferIn, bufferInOffset);

size_t sizeOut =
helper::GetTotalSize(blockCount, helper::GetDataTypeSize(type));

Expand All @@ -183,19 +204,24 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,
sizeOut /= 2;
}

try
{
void *dataOutVoid = dataOut;
mgard_x::decompress(bufferIn + bufferInOffset, sizeIn - bufferInOffset,
dataOutVoid, true);
}
catch (...)
if (isCompressed)
{
helper::Throw<std::runtime_error>("Operator", "CompressMGARD",
"DecompressV1", m_VersionInfo);
try
{
void *dataOutVoid = dataOut;
mgard_x::decompress(bufferIn + bufferInOffset,
sizeIn - bufferInOffset, dataOutVoid, true);
}
catch (...)
{
helper::Throw<std::runtime_error>("Operator", "CompressMGARD",
"DecompressV1", m_VersionInfo);
}
return sizeOut;
}

return sizeOut;
headerSize += bufferInOffset;
return 0;
}

size_t CompressMGARD::InverseOperate(const char *bufferIn, const size_t sizeIn,
Expand All @@ -205,6 +231,7 @@ size_t CompressMGARD::InverseOperate(const char *bufferIn, const size_t sizeIn,
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes
headerSize = bufferInOffset;

if (bufferVersion == 1)
{
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/operator/compress/CompressMGARD.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class CompressMGARD : public Operator

bool IsDataTypeValid(const DataType type) const final;

size_t GetHeaderSize() const;

private:
size_t headerSize = 0;
/**
* Decompress function for V1 buffer. Do NOT remove even if the buffer
* version is updated. Data might be still in lagacy formats. This function
Expand Down
9 changes: 9 additions & 0 deletions source/adios2/operator/compress/CompressMGARDPlus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart,
CompressMGARD mgard(m_Parameters);
size_t mgardBufferSize = mgard.Operate(dataIn, blockStart, blockCount, type,
bufferOut + bufferOutOffset);
if (mgardBufferSize == 0)
{
headerSize += (bufferOutOffset + mgard.GetHeaderSize());
return 0;
}

if (*reinterpret_cast<OperatorType *>(bufferOut + bufferOutOffset) ==
COMPRESS_MGARD)
Expand Down Expand Up @@ -90,6 +95,8 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart,
return bufferOutOffset;
}

size_t CompressMGARDPlus::GetHeaderSize() const { return headerSize; }

size_t CompressMGARDPlus::DecompressV1(const char *bufferIn,
const size_t sizeIn, char *dataOut)
{
Expand All @@ -114,6 +121,7 @@ size_t CompressMGARDPlus::DecompressV1(const char *bufferIn,
// sizeOut. Here you may want to do your magic to change the decompressed
// data somehow to improve its accuracy :)

headerSize += (bufferInOffset + mgard.GetHeaderSize());
return sizeOut;
}

Expand All @@ -124,6 +132,7 @@ size_t CompressMGARDPlus::InverseOperate(const char *bufferIn,
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes
headerSize = bufferInOffset;

if (bufferVersion == 1)
{
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/operator/compress/CompressMGARDPlus.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ class CompressMGARDPlus : public Operator

bool IsDataTypeValid(const DataType type) const final;

size_t GetHeaderSize() const;

private:
size_t headerSize = 0;

/**
* Decompress function for V1 buffer. Do NOT remove even if the buffer
* version is updated. Data might be still in lagacy formats. This function
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/toolkit/format/bp/BPSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,17 @@ void BPSerializer::PutOperationPayloadInBuffer(
const core::Variable<T> &variable,
const typename core::Variable<T>::BPInfo &blockInfo)
{
const size_t outputSize = blockInfo.Operations[0]->Operate(
size_t outputSize = blockInfo.Operations[0]->Operate(
reinterpret_cast<char *>(blockInfo.Data), blockInfo.Start,
blockInfo.Count, variable.m_Type,
m_Data.m_Buffer.data() + m_Data.m_Position);

if (outputSize == 0) // the operator was not applied
outputSize = helper::CopyMemoryWithOpHeader(
reinterpret_cast<char *>(blockInfo.Data), blockInfo.Count,
variable.m_Type, m_Data.m_Buffer.data() + m_Data.m_Position,
blockInfo.Operations[0]->GetHeaderSize(), blockInfo.MemSpace);

m_Data.m_Position += outputSize;
m_Data.m_AbsolutePosition += outputSize;

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ void BP3Deserializer::PostDataRead(
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize,
preOpData, op);
preOpData, blockInfo.MemSpace, op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ void BP4Deserializer::PostDataRead(
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize,
preOpData, op);
preOpData, blockInfo.MemSpace, op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr)
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataBlockSize[Read.BlockID],
decompressBuffer.data());
decompressBuffer.data(), Req.MemSpace);
}
IncomingData = decompressBuffer.data();
VirtualIncomingData = IncomingData;
Expand Down
Loading