Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Jan 24, 2025
1 parent 04890ca commit 8864fa1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/fdb5/database/Inspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ ListIterator Inspector::inspect(const metkit::mars::MarsRequest& request,
const Schema& schema,
const fdb5::Notifier& notifyee) const {

std::unique_ptr<InspectIterator> iterator(new InspectIterator());
auto iterator = std::make_unique<InspectIterator>();
MultiRetrieveVisitor visitor(notifyee, *iterator, databases_, dbConfig_);

LOG_DEBUG_LIB(LibFdb5) << "Using schema: " << schema << std::endl;
Expand Down
8 changes: 4 additions & 4 deletions src/fdb5/toc/TocCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,14 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() {
if (!indexInSubtoc[i]) {
Index& idx(readIndexes[i]);
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildClearRecord(*r, idx)).second;
combinedSize += recordSizes(*r, buildClearRecord(*r, idx)).second;
Log::info() << "Masking index: " << idx.location().uri() << std::endl;
}
}

for (const std::string& subtoc_path : subtocs) {
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r, subtoc_path)).second;
combinedSize += recordSizes(*r, buildSubTocMaskRecord(*r, subtoc_path)).second;
Log::info() << "Masking sub-toc: " << subtoc_path << std::endl;
}

Expand Down Expand Up @@ -401,14 +401,14 @@ void TocCatalogueWriter::compactSubTocIndexes() {

idx.flush();
TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_INDEX);
combinedSize += roundRecord(*r, buildIndexRecord(*r, idx)).second;
combinedSize += recordSizes(*r, buildIndexRecord(*r, idx)).second;
}
}

// And add the masking record for the subtoc

TocRecord* r = new (&buf[combinedSize]) TocRecord(serialisationVersion().used(), TocRecord::TOC_CLEAR);
combinedSize += roundRecord(*r, buildSubTocMaskRecord(*r)).second;
combinedSize += recordSizes(*r, buildSubTocMaskRecord(*r)).second;

// Write all of these records to the toc in one go.

Expand Down
52 changes: 23 additions & 29 deletions src/fdb5/toc/TocHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <fcntl.h>
#include <sys/types.h>
#include <pwd.h>
#include <utility>
#include <cstddef>

#include "eckit/config/Resource.h"
#include "eckit/io/FileHandle.h"
Expand Down Expand Up @@ -343,13 +345,13 @@ void TocHandler::append(TocRecord &r, size_t payloadSize ) {

void TocHandler::appendRound(TocRecord &r, size_t payloadSize) {
// Obtain the rounded size, and set it in the record header.
auto [realSize, roundedSize] = roundRecord(r, payloadSize);
auto [realSize, roundedSize] = recordSizes(r, payloadSize);

eckit::Buffer buf(roundedSize);
buf.zero();
buf.copy(static_cast<const void*>(&r), realSize);

appendRaw(buf, roundedSize);
appendRaw(buf, buf.size());
}

void TocHandler::appendRaw(const void *data, size_t size) {
Expand Down Expand Up @@ -392,7 +394,7 @@ size_t TocHandler::recordRoundSize() {
return fdbRoundTocRecords;
}

std::pair<size_t, size_t> TocHandler::roundRecord(TocRecord &r, size_t payloadSize) {
std::pair<size_t, size_t> TocHandler::recordSizes(TocRecord &r, size_t payloadSize) {

size_t dataSize = sizeof(TocRecord::Header) + payloadSize;
r.header_.size_ = eckit::round(dataSize, recordRoundSize());
Expand Down Expand Up @@ -528,7 +530,7 @@ std::vector<PathName> TocHandler::subTocPaths() const {
openForRead();
TocHandlerCloser close(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

std::vector<eckit::PathName> paths;

Expand Down Expand Up @@ -607,7 +609,7 @@ void TocHandler::allMaskableEntries(Offset startOffset, Offset endOffset,
Offset ret = proxy.seek(startOffset);
ASSERT(ret == startOffset);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while (proxy.position() < endOffset) {

Expand Down Expand Up @@ -838,8 +840,7 @@ void TocHandler::preloadSubTocs(bool readMasked) const {

eckit::Timer preloadTimer("subtocs.preload", Log::debug<LibFdb5>());
{
std::unique_ptr<TocRecord> r(
new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

// n.b. we call databaseKey() directly, as this preload will normally be called before we have walked
// the toc at all --> TOC_INIT not yet read --> parentKey_ not yet set.
Expand Down Expand Up @@ -885,7 +886,7 @@ void TocHandler::populateMaskedEntriesList() const {

maskedEntries_.clear();

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t countSubTocs = 0;

Expand Down Expand Up @@ -963,7 +964,7 @@ void TocHandler::writeInitRecord(const Key& key) {

TocHandlerCloser closer(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

size_t len = readNext(*r);
if (len == 0) {
Expand Down Expand Up @@ -999,7 +1000,7 @@ void TocHandler::writeInitRecord(const Key& key) {
eckit::LocalPathName::rename(tmp, schemaPath_);
}

std::unique_ptr<TocRecord> r2(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_INIT)); // allocate TocRecord on heap (MARS-779)
auto r2 = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_INIT); // allocate (large) TocRecord on heap not stack (MARS-779)
eckit::MemoryStream s(&r2->payload_[0], r2->maxPayloadSize);
s << key;
s << isSubToc_;
Expand All @@ -1016,14 +1017,14 @@ void TocHandler::writeInitRecord(const Key& key) {

void TocHandler::writeClearRecord(const Index &index) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

appendBlock(*r, buildClearRecord(*r, index));
}

void TocHandler::writeClearAllRecord() {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);
s << std::string {"*"};
Expand All @@ -1038,7 +1039,7 @@ void TocHandler::writeSubTocRecord(const TocHandler& subToc) {
openForAppend();
TocHandlerCloser closer(*this);

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_SUB_TOC)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_SUB_TOC); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);

Expand Down Expand Up @@ -1066,7 +1067,7 @@ void TocHandler::writeIndexRecord(const Index& index) {

const TocIndexLocation& location = reinterpret_cast<const TocIndexLocation&>(l);

std::unique_ptr<TocRecord> r(new TocRecord(handler_.serialisationVersion_.used(), TocRecord::TOC_INDEX)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(handler_.serialisationVersion_.used(), TocRecord::TOC_INDEX); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);

Expand Down Expand Up @@ -1117,7 +1118,7 @@ void TocHandler::writeIndexRecord(const Index& index) {

void TocHandler::writeSubTocMaskRecord(const TocHandler &subToc) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_CLEAR)); // allocate (large) TocRecord on heap not stack (MARS-779)
auto r = std::make_unique<TocRecord>(serialisationVersion_.used(), TocRecord::TOC_CLEAR); // allocate (large) TocRecord on heap not stack (MARS-779)

// We use a relative path to this subtoc if it belongs to the current DB
// but an absolute one otherwise (e.g. for fdb-overlay).
Expand Down Expand Up @@ -1165,8 +1166,7 @@ uid_t TocHandler::dbUID() const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while ( readNext(*r) ) {
if (r->header_.tag_ == TocRecord::TOC_INIT) {
Expand All @@ -1182,8 +1182,7 @@ Key TocHandler::databaseKey() {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = false;
while ( readNext(*r, walkSubTocs) ) {
Expand All @@ -1203,8 +1202,7 @@ size_t TocHandler::numberOfRecords() const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = true;
bool hideSubTocEntries = false;
Expand Down Expand Up @@ -1244,8 +1242,7 @@ std::vector<Index> TocHandler::loadIndexes(const Catalogue& catalogue, bool sort
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)
count_ = 0;

// A record of all the index entries found (to process later)
Expand Down Expand Up @@ -1408,8 +1405,7 @@ void TocHandler::dump(std::ostream& out, bool simple, bool walkSubTocs) const {
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool hideSubTocEntries = false;
bool hideClearEntries = false;
Expand Down Expand Up @@ -1478,8 +1474,7 @@ void TocHandler::dumpIndexFile(std::ostream& out, const eckit::PathName& indexFi
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

bool walkSubTocs = true;
bool hideSubTocEntries = true;
Expand Down Expand Up @@ -1591,8 +1586,7 @@ void TocHandler::enumerateMasked(const Catalogue& catalogue, std::set<std::pair<
openForRead();
TocHandlerCloser close(*this);

// Allocate (large) TocRecord on heap not stack (MARS-779)
std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used()));
auto r = std::make_unique<TocRecord>(serialisationVersion_.used()); // allocate (large) TocRecord on heap not stack (MARS-779)

while ( readNextInternal(*r) ) {
if (r->header_.tag_ == TocRecord::TOC_INDEX) {
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/toc/TocHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <map>
#include <memory>
#include <utility>

#include "eckit/filesystem/PathName.h"
#include "eckit/filesystem/LocalPathName.h"
Expand Down Expand Up @@ -193,7 +194,7 @@ class TocHandler : public TocCommon, private eckit::NonCopyable {

// Given the payload size, returns the record size

static std::pair<size_t, size_t> roundRecord(TocRecord &r, size_t payloadSize);
static std::pair<size_t, size_t> recordSizes(TocRecord &r, size_t payloadSize);

void appendBlock(const void* data, size_t size);
void appendBlock(TocRecord &r, size_t payloadSize);
Expand Down
44 changes: 22 additions & 22 deletions src/fdb5/toc/TocStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <dirent.h>
#include <fcntl.h>
#include <memory>

#include "eckit/log/Timer.h"

Expand Down Expand Up @@ -128,7 +129,7 @@ std::unique_ptr<const FieldLocation> TocStore::archive(const Key& idxKey, const

ASSERT(len == length);

return std::unique_ptr<const FieldLocation>(new TocFieldLocation(dataPath, position, length, Key()));
return std::make_unique<TocFieldLocation>(dataPath, position, length, Key());
}

size_t TocStore::flush() {
Expand Down Expand Up @@ -165,16 +166,15 @@ void TocStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostre
}

eckit::DataHandle* TocStore::getCachedHandle( const eckit::PathName &path ) const {
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
HandleStore::const_iterator j = handles_.find( path );
if ( j != handles_.end() )
std::lock_guard lock(handlesMutex_);
if (auto j = handles_.find(path); j != handles_.end()) {
return j->second.get();
else
return nullptr;
}
return nullptr;
}

void TocStore::closeDataHandles() {
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
std::lock_guard lock(handlesMutex_);
for (const auto& [p, dh] : handles_) {
dh->close();
}
Expand All @@ -191,14 +191,14 @@ std::unique_ptr<eckit::DataHandle> TocStore::createFileHandle(const eckit::PathN
<< " buffer size " << sizeBuffer
<< std::endl;

return std::unique_ptr<eckit::DataHandle>(new LustreFileHandle<FDBFileHandle>(path, sizeBuffer, stripeDataLustreSettings()));
return std::make_unique<LustreFileHandle<FDBFileHandle>>(path, sizeBuffer, stripeDataLustreSettings());
}

LOG_DEBUG_LIB(LibFdb5) << "Creating FDBFileHandle to " << path
<< " with buffer of " << eckit::Bytes(sizeBuffer)
<< std::endl;

return std::unique_ptr<eckit::DataHandle>(new FDBFileHandle(path, sizeBuffer));
return std::make_unique<FDBFileHandle>(path, sizeBuffer);
}

std::unique_ptr<eckit::DataHandle> TocStore::createAsyncHandle(const eckit::PathName &path) {
Expand All @@ -213,17 +213,18 @@ std::unique_ptr<eckit::DataHandle> TocStore::createAsyncHandle(const eckit::Path
<< " buffer each with " << eckit::Bytes(sizeBuffer)
<< std::endl;

return std::unique_ptr<eckit::DataHandle>(new LustreFileHandle<eckit::AIOHandle>(path, nbBuffers, sizeBuffer, stripeDataLustreSettings()));
return std::make_unique<LustreFileHandle<eckit::AIOHandle>>(path, nbBuffers, sizeBuffer, stripeDataLustreSettings());
}

return std::unique_ptr<eckit::DataHandle>(new eckit::AIOHandle(path, nbBuffers, sizeBuffer));
return std::make_unique<eckit::AIOHandle>(path, nbBuffers, sizeBuffer);
}

std::unique_ptr<eckit::DataHandle> TocStore::createDataHandle(const eckit::PathName &path) {

static bool fdbWriteToNull = eckit::Resource<bool>("fdbWriteToNull;$FDB_WRITE_TO_NULL", false);
if(fdbWriteToNull)
return std::unique_ptr<eckit::DataHandle>(new eckit::EmptyHandle());
if(fdbWriteToNull) {
return std::make_unique<eckit::EmptyHandle>();
}

static bool fdbAsyncWrite = eckit::Resource<bool>("fdbAsyncWrite;$FDB_ASYNC_WRITE", false);
if(fdbAsyncWrite)
Expand All @@ -233,16 +234,15 @@ std::unique_ptr<eckit::DataHandle> TocStore::createDataHandle(const eckit::PathN
}

eckit::DataHandle& TocStore::getDataHandle( const eckit::PathName &path ) {
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
std::lock_guard lock(handlesMutex_);
eckit::DataHandle* dh = getCachedHandle(path);
if ( !dh ) {
auto dataHandle = createDataHandle(path);
ASSERT(dataHandle);
dataHandle->openForAppend(0);
dh = dataHandle.get();
handles_[path] = std::move(dataHandle);
if (dh) {
return *dh;
}
return *dh;
auto dataHandle = createDataHandle(path);
ASSERT(dataHandle);
dataHandle->openForAppend(0);
return *(handles_[path] = std::move(dataHandle));
}

eckit::PathName TocStore::generateDataPath(const Key& key) const {
Expand All @@ -266,7 +266,7 @@ eckit::PathName TocStore::getDataPath(const Key& key) const {
}

void TocStore::flushDataHandles() {
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
std::lock_guard lock(handlesMutex_);
for (const auto& [p, dh] : handles_) {
dh->flush();
}
Expand Down
Loading

0 comments on commit 8864fa1

Please sign in to comment.