From 92016f8532ef34f294ace65aa38bde9a51079e3a Mon Sep 17 00:00:00 2001 From: Marcos Bento Date: Wed, 4 Sep 2024 16:32:14 +0100 Subject: [PATCH] Ensure single thread replying to HTTP request Re ECFLOW-1957 --- libs/base/src/ecflow/base/HttpClient.cpp | 28 ++++++++++----- libs/base/src/ecflow/base/HttpClient.hpp | 12 +++---- .../src/ecflow/client/ClientInvoker.cpp | 9 +++++ libs/core/src/ecflow/core/Ecf.cpp | 18 ++-------- libs/core/src/ecflow/core/Ecf.hpp | 35 ++++++++++++++----- libs/rest/test/TestApiV1.cpp | 1 + libs/server/src/ecflow/server/Server.hpp | 15 +++++++- 7 files changed, 79 insertions(+), 39 deletions(-) diff --git a/libs/base/src/ecflow/base/HttpClient.cpp b/libs/base/src/ecflow/base/HttpClient.cpp index 3b0184297..590777c61 100644 --- a/libs/base/src/ecflow/base/HttpClient.cpp +++ b/libs/base/src/ecflow/base/HttpClient.cpp @@ -18,8 +18,7 @@ #include "ecflow/core/Converter.hpp" HttpClient::HttpClient(Cmd_ptr cmd_ptr, const std::string& host, const std::string& port, int timeout) - : stopped_(false), - host_(host), + : host_(host), port_(port), client_(host, ecf::convert_to(port)) { @@ -34,16 +33,29 @@ void HttpClient::run() { std::string outbound; ecf::save_as_string(outbound, outbound_request_); - auto result = client_.Post("/v1/ecflow", outbound, "application/json"); - auto response = result.value(); - - ecf::restore_from_string(response.body, inbound_response_); -}; + auto result = client_.Post("/v1/ecflow", outbound, "application/json"); + if (result) { + auto response = result.value(); + ecf::restore_from_string(response.body, inbound_response_); + } + else { + status_ = result.error(); + reason_ = httplib::to_string(status_); + } +} bool HttpClient::handle_server_response(ServerReply& server_reply, bool debug) const { if (debug) { std::cout << " Client::handle_server_response" << std::endl; } server_reply.set_host_port(host_, port_); // client context, needed by some commands, ie. SServerLoadCmd - return inbound_response_.handle_server_response(server_reply, outbound_request_.get_cmd(), debug); + + if (status_ == httplib::Error::Success) { + return inbound_response_.handle_server_response(server_reply, outbound_request_.get_cmd(), debug); + } + else { + std::stringstream ss; + ss << "HttpClient::handle_server_response: Error: " << status_ << " " << reason_; + throw std::runtime_error(ss.str()); + } } diff --git a/libs/base/src/ecflow/base/HttpClient.hpp b/libs/base/src/ecflow/base/HttpClient.hpp index 500b2f204..62055a160 100644 --- a/libs/base/src/ecflow/base/HttpClient.hpp +++ b/libs/base/src/ecflow/base/HttpClient.hpp @@ -18,11 +18,7 @@ #include "ecflow/base/ServerToClientResponse.hpp" /// -/// \brief This class acts as the client part. ( in client/server architecture) -/// -/// \note The plug command can move a node to another server hence the server -/// itself will NEED to ACT as a client. This is why client lives in Base and -/// not the Client project +/// \brief This class acts as an HTTP client /// class HttpClient { @@ -38,10 +34,14 @@ class HttpClient { bool handle_server_response(ServerReply&, bool debug) const; private: - bool stopped_; std::string host_; /// the servers name std::string port_; /// the port on the server httplib::Client client_; + + httplib::Response response_; + httplib::Error status_ = httplib::Error::Success; + std::string reason_ = ""; + ClientToServerRequest outbound_request_; /// The request we will send to the server ServerToClientResponse inbound_response_; /// The response we get back from the server }; diff --git a/libs/client/src/ecflow/client/ClientInvoker.cpp b/libs/client/src/ecflow/client/ClientInvoker.cpp index 3360b098a..962409a66 100644 --- a/libs/client/src/ecflow/client/ClientInvoker.cpp +++ b/libs/client/src/ecflow/client/ClientInvoker.cpp @@ -84,6 +84,15 @@ #include "ecflow/base/HttpClient.hpp" +#if defined(ADD) +// undefine to avoid conflict with /usr/include/arpa/nameser_compat.h #define ADD ns_uop_add + #undef ADD +#endif +#if defined(STATUS) +// undefine to avoid conflict with /usr/include/arpa/nameser_compat.h #define STATUS ns_o_status + #undef STATUS +#endif + using namespace std; using namespace ecf; using namespace boost::posix_time; diff --git a/libs/core/src/ecflow/core/Ecf.cpp b/libs/core/src/ecflow/core/Ecf.cpp index 4aabeb762..6bcb4d451 100644 --- a/libs/core/src/ecflow/core/Ecf.cpp +++ b/libs/core/src/ecflow/core/Ecf.cpp @@ -13,8 +13,8 @@ bool Ecf::server_ = false; bool Ecf::debug_equality_ = false; unsigned int Ecf::debug_level_ = 0; -thread_local Ecf::atomic_counter_t Ecf::state_change_no_ = 0; -thread_local Ecf::atomic_counter_t Ecf::modify_change_no_ = 0; +Ecf::atomic_counter_t Ecf::state_change_no_ = 0; +Ecf::atomic_counter_t Ecf::modify_change_no_ = 0; bool DebugEquality::ignore_server_variables_ = false; const char* Ecf::SERVER_NAME() { @@ -80,20 +80,6 @@ const std::string& Ecf::URL() { return URL; } -Ecf::counter_t Ecf::incr_state_change_no() { - if (server_) { - return ++state_change_no_; - } - return state_change_no_; -} - -Ecf::counter_t Ecf::incr_modify_change_no() { - if (server_) { - return ++modify_change_no_; - } - return modify_change_no_; -} - // ======================================================= EcfPreserveChangeNo::EcfPreserveChangeNo() diff --git a/libs/core/src/ecflow/core/Ecf.hpp b/libs/core/src/ecflow/core/Ecf.hpp index 69f94f6ff..d940d7168 100644 --- a/libs/core/src/ecflow/core/Ecf.hpp +++ b/libs/core/src/ecflow/core/Ecf.hpp @@ -16,6 +16,7 @@ /// #include +#include #include /** @@ -47,14 +48,32 @@ class Ecf { const Ecf& operator=(const Ecf&) = delete; /// Increment and then return state change no - static counter_t incr_state_change_no(); - static counter_t state_change_no() { return state_change_no_; } - static void set_state_change_no(counter_t x) { state_change_no_ = x; } + static counter_t incr_state_change_no() { + if (server_) { + ++state_change_no_; + } + return state_change_no_; + } + static counter_t state_change_no() { + return state_change_no_; + } + static void set_state_change_no(counter_t x) { + state_change_no_ = x; + } /// The modify_change_no_ is used for node addition and deletion and re-ordering - static counter_t incr_modify_change_no(); - static counter_t modify_change_no() { return modify_change_no_; } - static void set_modify_change_no(counter_t x) { modify_change_no_ = x; } + static counter_t incr_modify_change_no() { + if (server_) { + ++modify_change_no_; + } + return modify_change_no_; + } + static counter_t modify_change_no() { + return modify_change_no_; + } + static void set_modify_change_no(counter_t x) { + modify_change_no_ = x; + } /// Returns true if we are on the server side. /// Only in server side do we increment state/modify numbers @@ -91,8 +110,8 @@ class Ecf { static bool server_; static bool debug_equality_; static unsigned int debug_level_; - static thread_local atomic_counter_t state_change_no_; - static thread_local atomic_counter_t modify_change_no_; + static atomic_counter_t state_change_no_; + static atomic_counter_t modify_change_no_; }; /// Make sure the Ecf number don't change diff --git a/libs/rest/test/TestApiV1.cpp b/libs/rest/test/TestApiV1.cpp index f2e925ac8..fbcef67f1 100644 --- a/libs/rest/test/TestApiV1.cpp +++ b/libs/rest/test/TestApiV1.cpp @@ -12,6 +12,7 @@ #define CPPHTTPLIB_OPENSSL_SUPPORT #endif +#define CPPHTTPLIB_THREAD_POOL_COUNT 1 #include #include diff --git a/libs/server/src/ecflow/server/Server.hpp b/libs/server/src/ecflow/server/Server.hpp index cf10de47b..18f617f3f 100644 --- a/libs/server/src/ecflow/server/Server.hpp +++ b/libs/server/src/ecflow/server/Server.hpp @@ -11,6 +11,8 @@ #ifndef ecflow_server_Server_HPP #define ecflow_server_Server_HPP +#define CPPHTTPLIB_THREAD_POOL_COUNT 1 + #include #include "ecflow/base/stc/PreAllocatedReply.hpp" @@ -67,7 +69,7 @@ class HttpServer : public BaseServer { { // See what kind of message we got from the client if (serverEnv_.debug()) { - std::cout << " TcpBaseServer::handle_request : client request " << inbound_request << std::endl; + std::cout << " HTTPServer::handle_request : client request " << inbound_request << std::endl; } try { @@ -87,8 +89,19 @@ class HttpServer : public BaseServer { std::string outbound; ecf::save_as_string(outbound, outbound_response); + outbound_response.cleanup(); + // 4) ship response response.set_content(outbound, "text/plain"); + + if (inbound_request.terminateRequest()) { + if (serverEnv_.debug()) + std::cout << " <-- HttpServer::handle_terminate_request exiting server via terminate() port " + << serverEnv_.port() << std::endl; + // terminate(); + + // TODO: Terminate the server! + } }); try {