diff --git a/rtb/messaging/communicator.hpp b/rtb/messaging/communicator.hpp index c2f6c52..c3be2a2 100644 --- a/rtb/messaging/communicator.hpp +++ b/rtb/messaging/communicator.hpp @@ -31,7 +31,9 @@ namespace vanilla { namespace messaging { - +using data_segment = std::pair; +using data_segments_vector = std::vector; + template std::string serialize( Serializable && data ) { std::stringstream ss(std::ios_base::out|std::ios_base::binary); @@ -169,7 +171,6 @@ class sender : ConnectionPolicy { public: using data_type = std::array ; - template sender(boost::asio::io_service& io_service, const unsigned short port, IPAddress && ...addresses) : socket_{io_service}, @@ -192,6 +193,17 @@ class sender : ConnectionPolicy [](const boost::system::error_code&, std::size_t) { }); } + + void send_async(data_segments_vector const & data) { + size_t total_size{}; + std::for_each(data.begin(), data.end(), [&total_size](auto &it) { total_size += it.second;}); + out_data_.reserve(total_size); + std::for_each(data.begin(), data.end(), [&](auto &it) { out_data_.append(it.first, it.second);}); + socket_.async_send_to( + boost::asio::buffer(out_data_), to_endpoint_, + [](const boost::system::error_code&, std::size_t) {} + ); + } template void receive_async(Handler handler) { @@ -273,6 +285,13 @@ class communicator { } return *this; } + + self_type & distribute_segments(data_segments_vector const & data) { + if(distributor_) { + distributor_->send_async(data); + } + return *this; + } template self_type & process(Handler handler) {