Skip to content

Commit

Permalink
[core] intrdoduce new SServiceIDResponse, ServiceIDResponseVecT, Resp…
Browse files Browse the repository at this point in the history
…onseIDCallbackT for v6 using SServiceMethodId for service/method unique attributed instead single elements (see old SServiceResponse) (#1881)

fixing error (assigning m_service_id to sid, EntityID -> std::string) without conversion
  • Loading branch information
rex-schilasky authored Jan 9, 2025
1 parent 7a18429 commit c35f4dc
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 129 deletions.
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ namespace eCAL
* @return True if all calls were successful.
**/
ECAL_API_EXPORTED_MEMBER
bool CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_, ServiceResponseVecT& service_response_vec_) const;
bool CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_, ServiceIDResponseVecT& service_response_vec_) const;

/**
* @brief Blocking call (with timeout) of a service method for all existing service instances, using callback
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_client_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace eCAL
* @return success state and service response
**/
ECAL_API_EXPORTED_MEMBER
std::pair<bool, SServiceResponse> CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_ = -1);
std::pair<bool, SServiceIDResponse> CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_ = -1);

/**
* @brief Blocking call of a service method, using callback
Expand Down
17 changes: 15 additions & 2 deletions ecal/core/include/ecal/ecal_service_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
namespace eCAL
{
/**
* @brief Service response struct containing the (responding) server informations and the response itself.
* @brief Service response struct containing the (responding) server informations and the response itself. (deprecated)
**/
struct SServiceResponse
{
Expand All @@ -55,6 +55,19 @@ namespace eCAL
};
using ServiceResponseVecT = std::vector<SServiceResponse>; //!< vector of multiple service responses (deprecated)

/**
* @brief Service response struct containing the (responding) server informations and the response itself.
**/
struct SServiceIDResponse
{
Registration::SServiceMethodId service_method_id; //!< service method information (service id (entity id, process id, host name), service name, method name)
std::string error_msg; //!< human readable error message
int ret_state = 0; //!< return state of the called service method
eCallState call_state = call_state_none; //!< call state (see eCallState)
std::string response; //!< service response
};
using ServiceIDResponseVecT = std::vector<SServiceIDResponse>; //!< vector of multiple service responses

/**
* @brief Service method callback function type (low level server interface).
*
Expand All @@ -79,7 +92,7 @@ namespace eCAL
* @param entity_id_ Unique service id (entity id, process id, host name, service name, method name)
* @param service_response_ Service response struct containing the (responding) server informations and the response itself.
**/
using ResponseIDCallbackT = std::function<void (const Registration::SEntityId& entity_id_, const struct SServiceResponse& service_response_)>;
using ResponseIDCallbackT = std::function<void (const Registration::SEntityId& entity_id_, const struct SServiceIDResponse& service_response_)>;

/**
* @brief Map of <method name, method information (like request type, reponse type)>.
Expand Down
10 changes: 5 additions & 5 deletions ecal/core/src/service/ecal_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ namespace eCAL
return instances;
}

bool CServiceClient::CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_, ServiceResponseVecT& service_response_vec_) const
bool CServiceClient::CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_, ServiceIDResponseVecT& service_response_vec_) const
{
auto instances = GetClientInstances();
size_t num_instances = instances.size();

// vector to hold futures for the return values and responses
std::vector<std::future<std::pair<bool, SServiceResponse>>> futures;
std::vector<std::future<std::pair<bool, SServiceIDResponse>>> futures;
futures.reserve(num_instances);

// launch asynchronous calls for each instance
Expand All @@ -106,9 +106,9 @@ namespace eCAL
try
{
// explicitly unpack the pair
std::pair<bool, SServiceResponse> result = future.get();
std::pair<bool, SServiceIDResponse> result = future.get();
bool success = result.first;
SServiceResponse response = result.second;
SServiceIDResponse response = result.second;

// add response to the vector
service_response_vec_.emplace_back(response);
Expand All @@ -119,7 +119,7 @@ namespace eCAL
catch (const std::exception& e)
{
// handle exceptions and add an error response
SServiceResponse error_response;
SServiceIDResponse error_response;
error_response.error_msg = e.what();
error_response.call_state = call_state_failed;
service_response_vec_.emplace_back(error_response);
Expand Down
146 changes: 79 additions & 67 deletions ecal/core/src/service/ecal_service_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,56 +47,18 @@ namespace
return request_shared_ptr;
}

// DeSerializes the response string into a service response
eCAL::SServiceResponse DeserializedResponse(const std::string& response_pb_)
{
eCAL::SServiceResponse service_reponse;
eCAL::Service::Response response;
if (eCAL::DeserializeFromBuffer(response_pb_.c_str(), response_pb_.size(), response))
{
const auto& response_header = response.header;
service_reponse.host_name = response_header.hname;
service_reponse.service_name = response_header.sname;
service_reponse.service_id = response_header.sid;
service_reponse.method_name = response_header.mname;
service_reponse.error_msg = response_header.error;
service_reponse.ret_state = static_cast<int>(response.ret_state);

switch (response_header.state)
{
case eCAL::Service::eMethodCallState::executed:
service_reponse.call_state = call_state_executed;
break;
case eCAL::Service::eMethodCallState::failed:
service_reponse.call_state = call_state_failed;
break;
default:
break;
}

service_reponse.response = std::string(response.response.data(), response.response.size());
}
else
{
service_reponse.error_msg = "Could not parse server response";
service_reponse.ret_state = 0;
service_reponse.call_state = eCallState::call_state_failed;
service_reponse.response = "";
}

return service_reponse;
}

eCAL::SServiceResponse CreateErrorResponse(const eCAL::Registration::SEntityId& entity_id_, const std::string& service_name_, const std::string& method_name_, const std::string& error_message_)
eCAL::SServiceIDResponse CreateErrorResponse(const eCAL::Registration::SEntityId& entity_id_, const std::string& service_name_, const std::string& method_name_, const std::string& error_message_)
{
eCAL::SServiceResponse error_response;
error_response.host_name = entity_id_.host_name;
error_response.service_name = service_name_;
error_response.service_id = entity_id_.entity_id;
error_response.method_name = method_name_;
error_response.error_msg = error_message_;
error_response.ret_state = 0;
error_response.call_state = call_state_failed;
eCAL::SServiceIDResponse error_response;
// service/method id
error_response.service_method_id.service_id = entity_id_;
error_response.service_method_id.service_name = service_name_;
error_response.service_method_id.method_name = method_name_;

// error message, return state and call state
error_response.error_msg = error_message_;
error_response.ret_state = 0;
error_response.call_state = call_state_failed;
return error_response;
}

Expand Down Expand Up @@ -200,7 +162,7 @@ namespace eCAL
}

// Calls a service method synchronously, blocking until a response is received or timeout occurs
std::pair<bool, SServiceResponse> CServiceClientImpl::CallWithCallback(
std::pair<bool, SServiceIDResponse> CServiceClientImpl::CallWithCallback(
const Registration::SEntityId& entity_id_, const std::string& method_name_,
const std::string& request_, int timeout_ms_, const ResponseIDCallbackT& response_callback_)
{
Expand All @@ -212,7 +174,7 @@ namespace eCAL
if (!GetClientByEntity(entity_id_, client))
{
eCAL::Logging::Log(log_level_warning, "CServiceClientImpl::CallWithCallback: Failed to find client for entity ID: " + entity_id_.entity_id);
return { false, SServiceResponse() };
return { false, SServiceIDResponse() };
}

auto response = CallMethodWithTimeout(entity_id_, client, method_name_, request_, std::chrono::milliseconds(timeout_ms_));
Expand Down Expand Up @@ -269,7 +231,7 @@ namespace eCAL
auto response_data = PrepareInitialResponse(client, method_name_);

// Create the response callback
auto response = [response_data, entity_id_, response_callback_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
auto response = [client, response_data, entity_id_, response_callback_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
{
const std::lock_guard<std::mutex> lock(*response_data->mutex);
if (!*response_data->block_modifying_response)
Expand All @@ -288,7 +250,7 @@ namespace eCAL
eCAL::Logging::Log(log_level_debug1, "CServiceClientImpl::CallWithCallbackAsync: Asynchronous call succeded");
#endif
response_data->response->first = true;
response_data->response->second = DeserializedResponse(*response_);
response_data->response->second = DeserializedResponse(client, *response_);
}
}
*response_data->finished = true;
Expand Down Expand Up @@ -455,21 +417,21 @@ namespace eCAL
}

// Blocking call to a service with a specified timeout
std::pair<bool, SServiceResponse> CServiceClientImpl::CallMethodWithTimeout(
std::pair<bool, SServiceIDResponse> CServiceClientImpl::CallMethodWithTimeout(
const Registration::SEntityId& entity_id_, SClient& client_, const std::string& method_name_,
const std::string& request_, std::chrono::nanoseconds timeout_)
{
if (method_name_.empty())
return { false, SServiceResponse() };
return { false, SServiceIDResponse() };

// Serialize the request
auto request_shared_ptr = SerializeRequest(method_name_, request_);
if (!request_shared_ptr)
return { false, SServiceResponse() };
return { false, SServiceIDResponse() };

// Prepare response data
auto response_data = PrepareInitialResponse(client_, method_name_);
auto response_callback = CreateResponseCallback(response_data);
auto response_data = PrepareInitialResponse(client_, method_name_);
auto response_callback = CreateResponseCallback(client_, response_data);

// Send the service call
const bool call_success = client_.client_session->async_call_service(request_shared_ptr, response_callback);
Expand Down Expand Up @@ -559,21 +521,25 @@ namespace eCAL
m_event_callback(service_id, callback_data);
}

std::shared_ptr<CServiceClientImpl::SResponseData> CServiceClientImpl::PrepareInitialResponse(SClient& client_, const std::string& method_name_)
std::shared_ptr<CServiceClientImpl::SResponseData> CServiceClientImpl::PrepareInitialResponse(const SClient& client_, const std::string& method_name_)
{
auto data = std::make_shared<SResponseData>();
data->response->first = false;
data->response->second.host_name = client_.service_attr.hname;
data->response->second.service_name = client_.service_attr.sname;
data->response->second.service_id = client_.service_attr.key;
data->response->second.method_name = method_name_;
data->response->second.call_state = eCallState::call_state_none;

data->response->second.service_method_id.service_id.entity_id = client_.service_attr.sid;
data->response->second.service_method_id.service_id.process_id = client_.service_attr.pid;
data->response->second.service_method_id.service_id.host_name = client_.service_attr.hname;

data->response->second.service_method_id.service_name = client_.service_attr.sname;
data->response->second.service_method_id.method_name = method_name_;

data->response->second.call_state = eCallState::call_state_none;
return data;
}

eCAL::service::ClientResponseCallbackT CServiceClientImpl::CreateResponseCallback(const std::shared_ptr<SResponseData>& response_data_)
eCAL::service::ClientResponseCallbackT CServiceClientImpl::CreateResponseCallback(const SClient& client_, const std::shared_ptr<SResponseData>& response_data_)
{
return [response_data_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
return [client_, response_data_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
{
const std::lock_guard<std::mutex> lock(*response_data_->mutex);
if (!*response_data_->block_modifying_response)
Expand All @@ -588,11 +554,57 @@ namespace eCAL
else
{
response_data_->response->first = true;
response_data_->response->second = DeserializedResponse(*response_);
response_data_->response->second = DeserializedResponse(client_, *response_);
}
}
*response_data_->finished = true;
response_data_->condition_variable->notify_all();
};
}

// DeSerializes the response string into a service response
eCAL::SServiceIDResponse CServiceClientImpl::DeserializedResponse(const SClient& client_, const std::string& response_pb_)
{
eCAL::SServiceIDResponse service_reponse;
eCAL::Service::Response response;
if (eCAL::DeserializeFromBuffer(response_pb_.c_str(), response_pb_.size(), response))
{
const auto& response_header = response.header;
// service/method id
service_reponse.service_method_id.service_id.entity_id = client_.service_attr.sid;
service_reponse.service_method_id.service_id.process_id = client_.service_attr.pid;
service_reponse.service_method_id.service_id.host_name = response_header.hname;

// service and method name
service_reponse.service_method_id.service_name = response_header.sname;
service_reponse.service_method_id.method_name = response_header.mname;

// error message and return state
service_reponse.error_msg = response_header.error;
service_reponse.ret_state = static_cast<int>(response.ret_state);

switch (response_header.state)
{
case eCAL::Service::eMethodCallState::executed:
service_reponse.call_state = call_state_executed;
break;
case eCAL::Service::eMethodCallState::failed:
service_reponse.call_state = call_state_failed;
break;
default:
break;
}

service_reponse.response = std::string(response.response.data(), response.response.size());
}
else
{
service_reponse.error_msg = "Could not parse server response";
service_reponse.ret_state = 0;
service_reponse.call_state = eCallState::call_state_failed;
service_reponse.response = "";
}

return service_reponse;
}
}
24 changes: 13 additions & 11 deletions ecal/core/src/service/ecal_service_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ namespace eCAL
// Retrieve service IDs of all matching services
std::vector<Registration::SEntityId> GetServiceIDs();

// Blocking call to a specific service; returns response as pair<bool, SServiceResponse>
// Blocking call to a specific service; returns response as pair<bool, SServiceIDResponse>
// if a callback is provided call the callback as well
std::pair<bool, SServiceResponse> CallWithCallback(
std::pair<bool, SServiceIDResponse> CallWithCallback(
const Registration::SEntityId& entity_id_, const std::string& method_name_,
const std::string& request_, int timeout_ms_, const ResponseIDCallbackT& response_callback_ = nullptr);

Expand Down Expand Up @@ -107,7 +107,7 @@ namespace eCAL
bool GetClientByEntity(const Registration::SEntityId& entity_id_, SClient& client_);

// Blocking call to a specific service method with timeout
std::pair<bool, SServiceResponse> CallMethodWithTimeout(const Registration::SEntityId& entity_id_, SClient& client_,
std::pair<bool, SServiceIDResponse> CallMethodWithTimeout(const Registration::SEntityId& entity_id_, SClient& client_,
const std::string& method_name_, const std::string& request_, std::chrono::nanoseconds timeout_);

// Update the connection states for client sessions
Expand All @@ -122,23 +122,25 @@ namespace eCAL
// SResponseData struct for handling response callbacks
struct SResponseData
{
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> condition_variable;
std::shared_ptr<std::pair<bool, SServiceResponse>> response;
std::shared_ptr<bool> block_modifying_response;
std::shared_ptr<bool> finished;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> condition_variable;
std::shared_ptr<std::pair<bool, SServiceIDResponse>> response;
std::shared_ptr<bool> block_modifying_response;
std::shared_ptr<bool> finished;

SResponseData() :
mutex(std::make_shared<std::mutex>()),
condition_variable(std::make_shared<std::condition_variable>()),
response(std::make_shared<std::pair<bool, SServiceResponse>>(false, SServiceResponse())),
response(std::make_shared<std::pair<bool, SServiceIDResponse>>(false, SServiceIDResponse())),
block_modifying_response(std::make_shared<bool>(false)),
finished(std::make_shared<bool>(false))
{}
};

static std::shared_ptr<SResponseData> PrepareInitialResponse(SClient& client_, const std::string& method_name_);
static eCAL::service::ClientResponseCallbackT CreateResponseCallback(const std::shared_ptr<SResponseData>& response_data_);
static std::shared_ptr<SResponseData> PrepareInitialResponse(const SClient& client_, const std::string& method_name_);
static eCAL::service::ClientResponseCallbackT CreateResponseCallback(const SClient& client_, const std::shared_ptr<SResponseData>& response_data_);

static SServiceIDResponse DeserializedResponse(const SClient& client_, const std::string& response_pb_);

// Client version (incremented for protocol or functionality changes)
static constexpr int m_client_version = 1;
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/service/ecal_service_client_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace eCAL
assert(m_service_client_impl && "service_client_id_impl_ must not be null");
}

std::pair<bool, SServiceResponse> CClientInstance::CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_)
std::pair<bool, SServiceIDResponse> CClientInstance::CallWithResponse(const std::string& method_name_, const std::string& request_, int timeout_)
{
return m_service_client_impl->CallWithCallback(m_entity_id, method_name_, request_, timeout_);
}
Expand Down
Loading

0 comments on commit c35f4dc

Please sign in to comment.