Skip to content

Commit

Permalink
async data write, unified connection
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Feb 20, 2024
1 parent 2ff5a35 commit bcd5d90
Show file tree
Hide file tree
Showing 23 changed files with 1,432 additions and 1,437 deletions.
5 changes: 5 additions & 0 deletions src/fdb5/api/FDBStats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace fdb5 {

FDBStats::FDBStats() :
numArchive_(0),
numLocation_(0),
numFlush_(0),
numRetrieve_(0),
bytesArchive_(0),
Expand All @@ -48,6 +49,7 @@ FDBStats::~FDBStats() {}

FDBStats& FDBStats::operator+=(const FDBStats& rhs) {
numArchive_ += rhs.numArchive_;
numLocation_ += rhs.numLocation_;
numFlush_ += rhs.numFlush_;
numRetrieve_ += rhs.numRetrieve_;
bytesArchive_ += rhs.bytesArchive_;
Expand Down Expand Up @@ -81,6 +83,9 @@ void FDBStats::addArchive(size_t length, eckit::Timer& timer, size_t nfields) {
<< ", total: " << Seconds(elapsedArchive_) << std::endl;
}

void FDBStats::addLocation(size_t nfields) {
numLocation_ += nfields;
}

void FDBStats::addRetrieve(size_t length, eckit::Timer& timer) {

Expand Down
3 changes: 3 additions & 0 deletions src/fdb5/api/FDBStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class FDBStats : public eckit::Statistics {
~FDBStats();

size_t numArchive() const { return numArchive_; }
size_t numLocation() const { return numLocation_; }
size_t numFlush() const { return numFlush_; }

void addArchive(size_t length, eckit::Timer& timer, size_t nfields=1);
void addLocation(size_t nfields=1);
void addRetrieve(size_t length, eckit::Timer& timer);
void addFlush(eckit::Timer& timer);

Expand All @@ -49,6 +51,7 @@ class FDBStats : public eckit::Statistics {
private: // members

size_t numArchive_;
size_t numLocation_;
size_t numFlush_;
size_t numRetrieve_;

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/api/RemoteFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ void RemoteFDB::print(std::ostream& s) const {
}

// Client
bool RemoteFDB::handle(remote::Message message, uint32_t requestID) {
bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID) {

switch (message) {
case fdb5::remote::Message::Complete: {
Expand Down Expand Up @@ -273,7 +273,7 @@ bool RemoteFDB::handle(remote::Message message, uint32_t requestID) {
return false;
}
}
bool RemoteFDB::handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) {
bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) {

switch (message) {
case fdb5::remote::Message::Blob: {
Expand Down
5 changes: 3 additions & 2 deletions src/fdb5/api/RemoteFDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RemoteFDB : public LocalFDB, public remote::Client {
public: // method

RemoteFDB(const eckit::Configuration& config, const std::string& name);
~RemoteFDB() {}

ListIterator inspect(const metkit::mars::MarsRequest& request) override;

Expand Down Expand Up @@ -77,8 +78,8 @@ class RemoteFDB : public LocalFDB, public remote::Client {

// Client

bool handle(remote::Message message, uint32_t requestID) override;
bool handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) override;
bool handle(remote::Message message, bool control, uint32_t requestID) override;
bool handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override;
void handleException(std::exception_ptr e) override;

private: // members
Expand Down
9 changes: 7 additions & 2 deletions src/fdb5/database/Archiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ Archiver::Archiver(const Config& dbConfig) :
store_(nullptr) {}

Archiver::~Archiver() {

flush(); // certify that all sessions are flushed before closing them

databases_.clear(); //< explicitly delete the DBs before schemas are destroyed
// for (auto it = databases_.begin(); it != databases_.end(); it++) {
// databases_.erase(it);
// }

// std::cout << databases_.size();

// databases_.clear(); //< explicitly delete the DBs before schemas are destroyed
}

void Archiver::archive(const Key &key, const void* data, size_t len) {
Expand Down
18 changes: 8 additions & 10 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void Connection::readUnsafe(bool control, void* data, size_t length) {
} else {
read = dataSocket().read(data, length);
}
if (!exit_ && length != read) {
if (length != read) {
std::stringstream ss;
ss << "Read error. Expected " << length << " bytes, read " << read;
throw TCPException(ss.str(), Here());
Expand All @@ -59,7 +59,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {

readUnsafe(control, &hdr, sizeof(hdr));

std::cout << "READ [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() << ",message=" << hdr.message << ",clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;
// std::cout << "READ [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() << ",message=" << hdr.message << ",clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

ASSERT(hdr.marker == StartMarker);
ASSERT(hdr.version == CurrentVersion);
Expand All @@ -74,14 +74,14 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
ASSERT(tail == EndMarker);

if (hdr.message == Message::Error) {
std::cout << "ERROR while reading: ";
// std::cout << "ERROR while reading: ";

char msg[hdr.payloadSize+1];
if (hdr.payloadSize) {
char msg[hdr.payloadSize+1];
std::cout << static_cast<char*>(payload.data());
// std::cout << static_cast<char*>(payload.data());
}
std::cout << std::endl;
// std::cout << std::endl;
}

return payload;
Expand All @@ -106,11 +106,9 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin

MessageHeader message{msg, control, clientID, requestID, payloadLength};

std::cout << "WRITE [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() <<
",message=" << message.message << ",clientID=" << message.clientID() << ",requestID=" << message.requestID << ",payload=" << message.payloadSize << "]" << std::endl;
// std::cout << "WRITE [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() << ",message=" << message.message << ",clientID=" << message.clientID() << ",requestID=" << message.requestID << ",payload=" << message.payloadSize << "]" << std::endl;

eckit::Log::debug<LibFdb5>() << "Connection::write [endpoint=" << ((control || single_) ? controlSocket() : dataSocket()) <<
",message=" << msg << ",clientID=" << message.clientID() << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;
eckit::Log::debug<LibFdb5>() << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;

std::lock_guard<std::mutex> lock((control || single_) ? controlMutex_ : dataMutex_);
writeUnsafe(control, &message, sizeof(message));
Expand All @@ -133,7 +131,7 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin
// write(msg, false, clientID, requestID, data);
// }
void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) {
write(Message::Error, true, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
write(Message::Error, false, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
}
// void Connection::error(const std::string& msg, const Handler& clientID, uint32_t requestID) {
// write(Message::Error, true, clientID.clientId(), requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
Expand Down
11 changes: 2 additions & 9 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@ namespace fdb5::remote {
class Connection : eckit::NonCopyable {

public: // methods
Connection() : single_(false), exit_(false) {}
Connection() : single_(false) {}
virtual ~Connection() {}

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length);
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});
// void write(Message msg, bool control, const Handler& clientID, uint32_t requestID, const void* data, uint32_t length);
// void write(Message msg, bool control, const Handler& clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});
// void writeControl(Message msg, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length);
// void writeControl(Message msg, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});
// void writeData(Message msg, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length);
// void writeData(Message msg, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});

void error(const std::string& msg, uint32_t clientID, uint32_t requestID);
// void error(const std::string& msg, const Handler& clientID, uint32_t requestID);

eckit::Buffer readControl(MessageHeader& hdr);
eckit::Buffer readData(MessageHeader& hdr);
Expand All @@ -58,7 +51,7 @@ class Connection : eckit::NonCopyable {
protected: // members

bool single_;
bool exit_;
// bool exit_;

private: // members

Expand Down
Loading

0 comments on commit bcd5d90

Please sign in to comment.