Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leak if client reponse is never taken #201

Merged
merged 3 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <atomic>
#include <list>
#include <memory>
#include <utility>

#include "fastcdr/FastBuffer.h"

Expand All @@ -43,10 +45,7 @@ typedef struct CustomClientInfo
typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
eprosima::fastcdr::FastBuffer * buffer_;

CustomClientResponse()
: buffer_(nullptr) {}
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
} CustomClientResponse;

class ClientListener : public eprosima::fastrtps::SubscriberListener
Expand All @@ -63,10 +62,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
assert(sub);

CustomClientResponse response;
response.buffer_ = new eprosima::fastcdr::FastBuffer();
// Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(response.buffer_, &sinfo)) {
if (sub->takeNextData(response.buffer_.get(), &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;

Expand All @@ -75,44 +75,43 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.push_back(response);
list.emplace_back(std::move(response));
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list.push_back(response);
list.emplace_back(std::move(response));
list_has_data_.store(true);
}
}
}
}
}

CustomClientResponse
getResponse()
bool
getResponse(CustomClientResponse & response)
{
std::lock_guard<std::mutex> lock(internalMutex_);
CustomClientResponse response;

auto pop_response = [this](CustomClientResponse & response) -> bool
{
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
list_has_data_.store(!list.empty());
return true;
}
return false;
};

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
if (!list.empty()) {
response = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
} else {
if (!list.empty()) {
response = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
return pop_response(response);
}

return response;
return pop_response(response);
}

void
Expand Down
8 changes: 3 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ rmw_take_response(
auto info = static_cast<CustomClientInfo *>(client->data);
assert(info);

CustomClientResponse response = info->listener_->getResponse();
CustomClientResponse response;

if (response.buffer_ != nullptr) {
_deserialize_ros_message(response.buffer_, ros_response, info->response_type_support_,
if (info->listener_->getResponse(response)) {
_deserialize_ros_message(response.buffer_.get(), ros_response, info->response_type_support_,
info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;

delete response.buffer_;

*taken = true;
}

Expand Down