Skip to content

Commit

Permalink
net+transport: use std::array
Browse files Browse the repository at this point in the history
Change-Id: If5e73eeca86d2a568effb8a9ada79551a056ca07
  • Loading branch information
Pesa committed Jun 8, 2024
1 parent 2be774b commit 945d929
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 38 deletions.
10 changes: 6 additions & 4 deletions ndn-cxx/net/impl/netlink-socket.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2013-2023 Regents of the University of California.
* Copyright (c) 2013-2024 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
Expand Down Expand Up @@ -29,6 +29,8 @@
#include <linux/genetlink.h>
#include <sys/socket.h>

#include <array>

#ifndef SOL_NETLINK
#define SOL_NETLINK 270
#endif
Expand Down Expand Up @@ -247,14 +249,14 @@ NetlinkSocket::receiveAndValidate()
iovec iov{};
iov.iov_base = m_buffer.data();
iov.iov_len = m_buffer.size();
uint8_t cmsgBuffer[CMSG_SPACE(sizeof(nl_pktinfo))];
std::array<uint8_t, CMSG_SPACE(sizeof(nl_pktinfo))> cmsgBuffer{};
msghdr msg{};
msg.msg_name = &sender;
msg.msg_namelen = sizeof(sender);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsgBuffer;
msg.msg_controllen = sizeof(cmsgBuffer);
msg.msg_control = cmsgBuffer.data();
msg.msg_controllen = cmsgBuffer.size();

ssize_t nBytesRead = ::recvmsg(m_sock->native_handle(), &msg, 0);
if (nBytesRead < 0) {
Expand Down
60 changes: 26 additions & 34 deletions ndn-cxx/transport/detail/stream-transport-impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>

#include <array>
#include <list>
#include <queue>

Expand Down Expand Up @@ -108,7 +109,7 @@ class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportI
{
if (m_transport.getState() == Transport::State::PAUSED) {
m_transport.setState(Transport::State::RUNNING);
m_inputBufferSize = 0;
m_rxBufferSize = 0;
asyncReceive();
}
}
Expand Down Expand Up @@ -183,8 +184,8 @@ class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportI
void
asyncReceive()
{
m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
m_socket.async_receive(boost::asio::buffer(m_rxBuffer.data() + m_rxBufferSize,
m_rxBuffer.size() - m_rxBufferSize),
// capture a copy of the shared_ptr to "this" to prevent deallocation
[this, self = this->shared_from_this()] (const auto& error, size_t nBytesRecvd) {
if (error) {
Expand All @@ -196,52 +197,43 @@ class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportI
NDN_THROW(Transport::Error(error, "socket read error"));
}

m_inputBufferSize += nBytesRecvd;
// do magic
m_rxBufferSize += nBytesRecvd;
auto unparsedBytes = span(m_rxBuffer).first(m_rxBufferSize);
while (!unparsedBytes.empty()) {
auto [isOk, element] = Block::fromBuffer(unparsedBytes);
if (!isOk) {
break;
}
unparsedBytes = unparsedBytes.subspan(element.size());
m_transport.m_receiveCallback(element);
}

std::size_t offset = 0;
bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
if (unparsedBytes.empty()) {
// nothing left in the receive buffer
m_rxBufferSize = 0;
}
else if (unparsedBytes.data() != m_rxBuffer.data()) {
// move remaining unparsed bytes to the beginning of the receive buffer
std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_rxBuffer.begin());
m_rxBufferSize = unparsedBytes.size();
}
else if (unparsedBytes.size() == m_rxBuffer.size()) {
m_transport.close();
NDN_THROW(Transport::Error("receive buffer full, but a valid TLV cannot be decoded"));
}

if (offset > 0) {
if (offset != m_inputBufferSize) {
std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
m_inputBufferSize -= offset;
}
else {
m_inputBufferSize = 0;
}
}

asyncReceive();
});
}

bool
processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
{
while (offset < nBytesAvailable) {
auto [isOk, element] = Block::fromBuffer({buffer + offset, nBytesAvailable - offset});
if (!isOk) {
return false;
}
m_transport.m_receiveCallback(element);
offset += element.size();
}
return true;
}

protected:
BaseTransport& m_transport;
typename Protocol::endpoint m_endpoint;
typename Protocol::socket m_socket;
boost::asio::steady_timer m_connectTimer;
TransmissionQueue m_transmissionQueue;
size_t m_inputBufferSize = 0;
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
size_t m_rxBufferSize = 0;
std::array<uint8_t, MAX_NDN_PACKET_SIZE> m_rxBuffer;
};

} // namespace ndn::detail
Expand Down

0 comments on commit 945d929

Please sign in to comment.