Skip to content

Commit

Permalink
use FastBuffer everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
wjwwood committed Jun 25, 2016
1 parent 677cb36 commit 35e9ba2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 90 deletions.
10 changes: 2 additions & 8 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,14 @@ namespace rmw_fastrtps_cpp
}
};

typedef struct Buffer
{
uint32_t length;
char *pointer;
} Buffer;

template <typename MembersType>
class TypeSupport : public eprosima::fastrtps::TopicDataType
{
public:

bool serializeROSmessage(const void *ros_message, Buffer *data);
bool serializeROSmessage(const void *ros_message, eprosima::fastcdr::FastBuffer *data);

bool deserializeROSmessage(const Buffer* data, void *ros_message);
bool deserializeROSmessage(eprosima::fastcdr::FastBuffer *data, void *ros_message);

bool serialize(void *data, SerializedPayload_t *payload);

Expand Down
36 changes: 13 additions & 23 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ template <typename MembersType>
void TypeSupport<MembersType>::deleteData(void* data)
{
assert(data);
free(data);
delete static_cast<eprosima::fastcdr::FastBuffer *>(data);
}

static inline void*
Expand Down Expand Up @@ -832,33 +832,31 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(

template <typename MembersType>
bool TypeSupport<MembersType>::serializeROSmessage(
const void *ros_message, Buffer *buffer)
const void *ros_message, eprosima::fastcdr::FastBuffer *buffer)
{
assert(buffer);
assert(ros_message);

// eprosima::fastcdr::FastBuffer fastbuffer(buffer->pointer, m_typeSize);
eprosima::fastcdr::FastBuffer fastbuffer;
eprosima::fastcdr::Cdr ser(fastbuffer);
// eprosima::fastcdr::FastBuffer fastbuffer;
eprosima::fastcdr::Cdr ser(*buffer);

if(members_->member_count_ != 0)
TypeSupport::serializeROSmessage(ser, members_, ros_message);
else
ser << (uint8_t)0;

buffer->length = (uint32_t)ser.getSerializedDataLength();
return true;
}

template <typename MembersType>
bool TypeSupport<MembersType>::deserializeROSmessage(
const Buffer* buffer, void *ros_message)
eprosima::fastcdr::FastBuffer* buffer, void *ros_message)
{
assert(buffer);
assert(ros_message);

eprosima::fastcdr::FastBuffer fastbuffer(buffer->pointer, buffer->length);
eprosima::fastcdr::Cdr deser(fastbuffer);
eprosima::fastcdr::Cdr deser(*buffer);

if(members_->member_count_ != 0)
TypeSupport::deserializeROSmessage(deser, members_, ros_message, false);
Expand All @@ -875,15 +873,7 @@ bool TypeSupport<MembersType>::deserializeROSmessage(
template <typename MembersType>
void* TypeSupport<MembersType>::createData()
{
Buffer *buffer = static_cast<Buffer*>(malloc(sizeof(Buffer) + m_typeSize));

if(buffer)
{
buffer->length = 0;
buffer->pointer = (char*)(buffer + 1);
}

return buffer;
return new eprosima::fastcdr::FastBuffer();
}

template <typename MembersType>
Expand All @@ -893,10 +883,10 @@ bool TypeSupport<MembersType>::serialize(
assert(data);
assert(payload);

Buffer *buffer = static_cast<Buffer*>(data);
payload->length = buffer->length;
eprosima::fastcdr::FastBuffer *buffer = static_cast<eprosima::fastcdr::FastBuffer*>(data);
payload->length = buffer->getBufferSize();
payload->encapsulation = CDR_LE;
memcpy(payload->data, buffer->pointer, buffer->length);
memcpy(payload->data, buffer->getBuffer(), buffer->getBufferSize());
return true;
}

Expand All @@ -906,9 +896,9 @@ bool TypeSupport<MembersType>::deserialize(SerializedPayload_t *payload, void *d
assert(data);
assert(payload);

Buffer *buffer = static_cast<Buffer*>(data);
buffer->length = payload->length;
memcpy(buffer->pointer, payload->data, payload->length);
eprosima::fastcdr::FastBuffer *buffer = static_cast<eprosima::fastcdr::FastBuffer*>(data);
buffer->resize(payload->length);
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}

Expand Down
126 changes: 67 additions & 59 deletions rmw_fastrtps_cpp/src/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void
_delete_typesupport(void * untyped_typesupport, const char* typesupport_identifier)
{
if (using_introspection_c_typesupport(typesupport_identifier)) {
auto typed_typesupport = static_cast<MessageTypeSupport_cpp *>(untyped_typesupport);
auto typed_typesupport = static_cast<MessageTypeSupport_c *>(untyped_typesupport);
if (typed_typesupport != nullptr)
delete typed_typesupport;
} else if (using_introspection_cpp_typesupport(typesupport_identifier)) {
Expand All @@ -261,39 +261,39 @@ _delete_typesupport(void * untyped_typesupport, const char* typesupport_identifi
}
}

rmw_fastrtps_cpp::Buffer *
_create_data(void * untyped_typesupport, const char* typesupport_identifier)
{
if (using_introspection_c_typesupport(typesupport_identifier)) {
auto typed_typesupport = static_cast<TypeSupport_c *>(untyped_typesupport);
return static_cast<rmw_fastrtps_cpp::Buffer *>(typed_typesupport->createData());
} else if (using_introspection_cpp_typesupport(typesupport_identifier)) {
auto typed_typesupport = static_cast<TypeSupport_cpp *>(untyped_typesupport);
return static_cast<rmw_fastrtps_cpp::Buffer *>(typed_typesupport->createData());
}
RMW_SET_ERROR_MSG("Unknown typesupport identifier");
return nullptr;
}

void
_delete_data(
rmw_fastrtps_cpp::Buffer * buffer, void * untyped_typesupport,
const char* typesupport_identifier)
{
if (using_introspection_c_typesupport(typesupport_identifier)) {
auto typed_typesupport = static_cast<TypeSupport_c *>(untyped_typesupport);
typed_typesupport->deleteData(buffer);
} else if (using_introspection_cpp_typesupport(typesupport_identifier)) {
auto typed_typesupport = static_cast<TypeSupport_cpp *>(untyped_typesupport);
typed_typesupport->deleteData(buffer);
} else {
RMW_SET_ERROR_MSG("Unknown typesupport identifier");
}
}
// rmw_fastrtps_cpp::Buffer *
// _create_data(void * untyped_typesupport, const char* typesupport_identifier)
// {
// if (using_introspection_c_typesupport(typesupport_identifier)) {
// auto typed_typesupport = static_cast<TypeSupport_c *>(untyped_typesupport);
// return static_cast<rmw_fastrtps_cpp::Buffer *>(typed_typesupport->createData());
// } else if (using_introspection_cpp_typesupport(typesupport_identifier)) {
// auto typed_typesupport = static_cast<TypeSupport_cpp *>(untyped_typesupport);
// return static_cast<rmw_fastrtps_cpp::Buffer *>(typed_typesupport->createData());
// }
// RMW_SET_ERROR_MSG("Unknown typesupport identifier");
// return nullptr;
// }

// void
// _delete_data(
// rmw_fastrtps_cpp::Buffer * buffer, void * untyped_typesupport,
// const char* typesupport_identifier)
// {
// if (using_introspection_c_typesupport(typesupport_identifier)) {
// auto typed_typesupport = static_cast<TypeSupport_c *>(untyped_typesupport);
// typed_typesupport->deleteData(buffer);
// } else if (using_introspection_cpp_typesupport(typesupport_identifier)) {
// auto typed_typesupport = static_cast<TypeSupport_cpp *>(untyped_typesupport);
// typed_typesupport->deleteData(buffer);
// } else {
// RMW_SET_ERROR_MSG("Unknown typesupport identifier");
// }
// }

bool
_serialize_ros_message(
const void *ros_message, rmw_fastrtps_cpp::Buffer * buffer, void * untyped_typesupport,
const void *ros_message, eprosima::fastcdr::FastBuffer * buffer, void * untyped_typesupport,
const char* typesupport_identifier)
{
if (using_introspection_c_typesupport(typesupport_identifier)) {
Expand All @@ -309,7 +309,7 @@ _serialize_ros_message(

bool
_deserialize_ros_message(
const rmw_fastrtps_cpp::Buffer * buffer, void *ros_message, void * untyped_typesupport,
eprosima::fastcdr::FastBuffer * buffer, void *ros_message, void * untyped_typesupport,
const char* typesupport_identifier)
{
if (using_introspection_c_typesupport(typesupport_identifier)) {
Expand Down Expand Up @@ -346,7 +346,7 @@ typedef struct CustomClientInfo
typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
rmw_fastrtps_cpp::Buffer *buffer_;
eprosima::fastcdr::FastBuffer *buffer_;

CustomClientResponse() : buffer_(nullptr) {}
} CustomClientResponse;
Expand All @@ -364,7 +364,8 @@ class ClientListener : public SubscriberListener
assert(sub);

CustomClientResponse response;
response.buffer_ = _create_data(info_->response_type_support_, info_->typesupport_identifier_);
// response.buffer_ = _create_data(info_->response_type_support_, info_->typesupport_identifier_);
response.buffer_ = new eprosima::fastcdr::FastBuffer();
SampleInfo_t sinfo;

if(sub->takeNextData(response.buffer_, &sinfo))
Expand Down Expand Up @@ -833,19 +834,20 @@ extern "C"
CustomPublisherInfo *info = (CustomPublisherInfo*)publisher->data;
assert(info);

rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
// rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
eprosima::fastcdr::FastBuffer buffer;

if(_serialize_ros_message(ros_message, buffer, info->type_support_, info->typesupport_identifier_))
if(_serialize_ros_message(ros_message, &buffer, info->type_support_, info->typesupport_identifier_))
{
if(info->publisher_->write((void*)buffer))
if(info->publisher_->write(&buffer))
returnedValue = RMW_RET_OK;
else
RMW_SET_ERROR_MSG("cannot publish data");
}
else
RMW_SET_ERROR_MSG("cannot serialize data");

_delete_data(buffer, info->type_support_, info->typesupport_identifier_);
// _delete_data(buffer, info->type_support_, info->typesupport_identifier_);

return returnedValue;
}
Expand Down Expand Up @@ -1085,21 +1087,22 @@ extern "C"
CustomSubscriberInfo *info = (CustomSubscriberInfo*)subscription->data;
assert(info);

rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
// rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
eprosima::fastcdr::FastBuffer buffer;
SampleInfo_t sinfo;

if(info->subscriber_->takeNextData(buffer, &sinfo))
if(info->subscriber_->takeNextData(&buffer, &sinfo))
{
info->listener_->data_taken();

if(sinfo.sampleKind == ALIVE)
{
_deserialize_ros_message(buffer, ros_message, info->type_support_, info->typesupport_identifier_);
_deserialize_ros_message(&buffer, ros_message, info->type_support_, info->typesupport_identifier_);
*taken = true;
}
}

_delete_data(buffer, info->type_support_, info->typesupport_identifier_);
// _delete_data(buffer, info->type_support_, info->typesupport_identifier_);

return RMW_RET_OK;
}
Expand Down Expand Up @@ -1130,16 +1133,17 @@ extern "C"
CustomSubscriberInfo *info = (CustomSubscriberInfo*)subscription->data;
assert(info);

rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
// rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->type_support_, info->typesupport_identifier_);
eprosima::fastcdr::FastBuffer buffer;
SampleInfo_t sinfo;

if(info->subscriber_->takeNextData(buffer, &sinfo))
if(info->subscriber_->takeNextData(&buffer, &sinfo))
{
info->listener_->data_taken();

if(sinfo.sampleKind == ALIVE)
{
_deserialize_ros_message(buffer, ros_message, info->type_support_, info->typesupport_identifier_);
_deserialize_ros_message(&buffer, ros_message, info->type_support_, info->typesupport_identifier_);
rmw_gid_t * sender_gid = &message_info->publisher_gid;
sender_gid->implementation_identifier = eprosima_fastrtps_identifier;
memset(sender_gid->data, 0, RMW_GID_STORAGE_SIZE);
Expand All @@ -1148,7 +1152,7 @@ extern "C"
}
}

_delete_data(buffer, info->type_support_, info->typesupport_identifier_);
// _delete_data(buffer, info->type_support_, info->typesupport_identifier_);

return RMW_RET_OK;
}
Expand Down Expand Up @@ -1335,7 +1339,7 @@ extern "C"
typedef struct CustomServiceRequest
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
rmw_fastrtps_cpp::Buffer *buffer_;
eprosima::fastcdr::FastBuffer *buffer_;

CustomServiceRequest() : buffer_(nullptr) {}
} CustomServiceRequest;
Expand All @@ -1353,7 +1357,8 @@ extern "C"
assert(sub);

CustomServiceRequest request;
request.buffer_ = _create_data(info_->request_type_support_, info_->typesupport_identifier_);
// request.buffer_ = _create_data(info_->request_type_support_, info_->typesupport_identifier_);
request.buffer_ = new eprosima::fastcdr::FastBuffer();
SampleInfo_t sinfo;

if(sub->takeNextData(request.buffer_, &sinfo))
Expand Down Expand Up @@ -1605,13 +1610,14 @@ extern "C"
CustomClientInfo *info = (CustomClientInfo*)client->data;
assert(info);

rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->request_type_support_, info->typesupport_identifier_);
// rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->request_type_support_, info->typesupport_identifier_);
eprosima::fastcdr::FastBuffer buffer;

if(_serialize_ros_message(ros_request, buffer, info->request_type_support_, info->typesupport_identifier_))
if(_serialize_ros_message(ros_request, &buffer, info->request_type_support_, info->typesupport_identifier_))
{
eprosima::fastrtps::rtps::WriteParams wparams;

if(info->request_publisher_->write((void*)buffer, wparams))
if(info->request_publisher_->write(&buffer, wparams))
{
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 | wparams.sample_identity().sequence_number().low;
Expand All @@ -1622,7 +1628,7 @@ extern "C"
else
RMW_SET_ERROR_MSG("cannot serialize data");

_delete_data(buffer, info->request_type_support_, info->typesupport_identifier_);
// _delete_data(buffer, info->request_type_support_, info->typesupport_identifier_);

return returnedValue;
}
Expand Down Expand Up @@ -1658,7 +1664,7 @@ extern "C"
memcpy(request_header->writer_guid, &request.sample_identity_.writer_guid(), sizeof(eprosima::fastrtps::rtps::GUID_t));
request_header->sequence_number = ((int64_t)request.sample_identity_.sequence_number().high) << 32 | request.sample_identity_.sequence_number().low;

_delete_data(request.buffer_, info->request_type_support_, info->typesupport_identifier_);
// _delete_data(request.buffer_, info->request_type_support_, info->typesupport_identifier_);

*taken = true;
}
Expand Down Expand Up @@ -1697,7 +1703,7 @@ extern "C"

*taken = true;

_delete_data(response.buffer_, info->request_type_support_, info->typesupport_identifier_);
// _delete_data(response.buffer_, info->request_type_support_, info->typesupport_identifier_);
}

return RMW_RET_OK;
Expand All @@ -1722,24 +1728,26 @@ extern "C"
CustomServiceInfo *info = (CustomServiceInfo*)service->data;
assert(info);

rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->response_type_support_, info->typesupport_identifier_);
// rmw_fastrtps_cpp::Buffer *buffer = _create_data(info->response_type_support_, info->typesupport_identifier_);
eprosima::fastcdr::FastBuffer buffer;

if(buffer != nullptr)
// if(buffer != nullptr)
if(true)
{
_serialize_ros_message(ros_response, buffer, info->response_type_support_, info->typesupport_identifier_);
_serialize_ros_message(ros_response, &buffer, info->response_type_support_, info->typesupport_identifier_);
eprosima::fastrtps::rtps::WriteParams wparams;
memcpy(&wparams.related_sample_identity().writer_guid(), request_header->writer_guid, sizeof(eprosima::fastrtps::rtps::GUID_t));
wparams.related_sample_identity().sequence_number().high = (int32_t)((request_header->sequence_number & 0xFFFFFFFF00000000) >> 32);
wparams.related_sample_identity().sequence_number().low = (int32_t)(request_header->sequence_number & 0xFFFFFFFF);

if(info->response_publisher_->write((void*)buffer, wparams))
if(info->response_publisher_->write(&buffer, wparams))
{
returnedValue = RMW_RET_OK;
}
else
RMW_SET_ERROR_MSG("cannot publish data");

_delete_data(buffer, info->response_type_support_, info->typesupport_identifier_);
// _delete_data(buffer, info->response_type_support_, info->typesupport_identifier_);
}


Expand Down

0 comments on commit 35e9ba2

Please sign in to comment.