Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
OTA-4174: IP Secondary TCP server refactoring
Browse files Browse the repository at this point in the history
- improve the TCP server implementation
- add tests for the TCP server and the RPC (serialization/deserialization)

Signed-off-by: Mike Sul <ext-mykhaylo.sul@here.com>
  • Loading branch information
Mike Sul committed Jan 15, 2020
1 parent 4fd855c commit 27db9c8
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 156 deletions.
7 changes: 5 additions & 2 deletions src/aktualizr_secondary/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(AKTUALIZR_SECONDARY_LIB_SRC
aktualizr_secondary_factory.cc
aktualizr_secondary_config.cc
aktualizr_secondary_metadata.cc
socket_server.cc
secondary_tcp_server.cc
)

# do not link tests with libaktualizr
Expand Down Expand Up @@ -50,7 +50,7 @@ set(ALL_AKTUALIZR_SECONDARY_HEADERS
aktualizr_secondary_factory.h
aktualizr_secondary_config.h
aktualizr_secondary_metadata.h
socket_server.h
secondary_tcp_server.h
)

include(AddAktualizrTest)
Expand All @@ -61,6 +61,9 @@ list(INSERT TEST_LIBS 0 aktualizr_secondary_lib)
add_aktualizr_test(NAME aktualizr_secondary_config
SOURCES aktualizr_secondary_config_test.cc PROJECT_WORKING_DIRECTORY LIBRARIES aktualizr_secondary_lib)

add_aktualizr_test(NAME secondary_tcp_server
SOURCES secondary_tcp_server_test.cc PROJECT_WORKING_DIRECTORY)

if(BUILD_OSTREE)
add_aktualizr_test(NAME aktualizr_secondary_ostree
SOURCES aktualizr_secondary_ostree_test.cc
Expand Down
1 change: 0 additions & 1 deletion src/aktualizr_secondary/aktualizr_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifdef BUILD_OSTREE
#include "package_manager/ostreemanager.h" // TODO: Hide behind PackageManagerInterface
#endif
#include "socket_server.h"
#include "utilities/utils.h"

AktualizrSecondary::AktualizrSecondary(const AktualizrSecondaryConfig& config,
Expand Down
4 changes: 1 addition & 3 deletions src/aktualizr_secondary/aktualizr_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "uptane/secondaryinterface.h"

#include "crypto/keymanager.h"
#include "socket_server.h"
#include "storage/invstorage.h"
#include "utilities/types.h"
#include "utilities/utils.h"
Expand All @@ -24,8 +23,7 @@ class AktualizrSecondary : public Uptane::SecondaryInterface {
using Ptr = std::shared_ptr<AktualizrSecondary>;

public:
AktualizrSecondary(const AktualizrSecondaryConfig& config,
const std::shared_ptr<INvStorage>& storage,
AktualizrSecondary(const AktualizrSecondaryConfig& config, const std::shared_ptr<INvStorage>& storage,
const std::shared_ptr<KeyManager>& key_mngr,
const std::shared_ptr<PackageManagerInterface>& pacman);

Expand Down
1 change: 0 additions & 1 deletion src/aktualizr_secondary/aktualizr_secondary_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ AktualizrSecondary::Ptr AktualizrSecondaryFactory::create(const AktualizrSeconda

AktualizrSecondary::Ptr AktualizrSecondaryFactory::create(const AktualizrSecondaryConfig& config,
const std::shared_ptr<INvStorage>& storage) {

auto key_mngr = std::make_shared<KeyManager>(storage, config.keymanagerConfig());
std::shared_ptr<PackageManagerInterface> pacman =
PackageManagerFactory::makePackageManager(config.pacman, config.bootloader, storage, nullptr);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
#include "socket_server.h"

#include <future>
#include "secondary_tcp_server.h"

#include "AKIpUptaneMes.h"
#include "asn1/asn1_message.h"
#include "logging/logging.h"
#include "uptane/secondaryinterface.h"
#include "utilities/dequeue_buffer.h"
#include "utilities/sockaddr_io.h"

#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>

void SocketServer::Run() {
if (listen(*socket_, SOMAXCONN) < 0) {
SecondaryTcpServer::SecondaryTcpServer(Uptane::SecondaryInterface &secondary, const std::string &primary_ip,
in_port_t primary_port, in_port_t port)
: SecondaryTcpServer(secondary, port) {
ConnectionSocket conn_socket(primary_ip, primary_port, listen_socket_.port());
if (conn_socket.connect() == 0) {
LOG_INFO << "Connected to Primary, sending info about this secondary...";
HandleOneConnection(*conn_socket);
} else {
LOG_INFO << "Failed to connect to Primary";
}
}

void SecondaryTcpServer::run() {
if (listen(*listen_socket_, SOMAXCONN) < 0) {
throw std::system_error(errno, std::system_category(), "listen");
}
LOG_INFO << "Listening on " << Utils::ipGetSockaddr(*socket_);
LOG_INFO << "Secondary TCP server listens on " << listen_socket_.toString();

while (true) {
while (keep_running_.load()) {
int con_fd;
sockaddr_storage peer_sa{};
socklen_t peer_sa_size = sizeof(sockaddr_storage);

LOG_DEBUG << "Waiting for connection from client...";
if ((con_fd = accept(*socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size)) == -1) {
if ((con_fd = accept(*listen_socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size)) == -1) {
LOG_INFO << "Socket accept failed. aborting";
break;
}
LOG_DEBUG << "Connected...";
HandleOneConnection(con_fd);
LOG_DEBUG << "Client disconnected";
}
LOG_INFO << "Secondary TCP server exit";
}

void SocketServer::HandleOneConnection(int socket) {
void SecondaryTcpServer::stop() {
keep_running_ = false;
// unblock accept
ConnectionSocket("localhost", listen_socket_.port()).connect();
}

in_port_t SecondaryTcpServer::port() const { return listen_socket_.port(); }

void SecondaryTcpServer::HandleOneConnection(int socket) {
// Outside the message loop, because one recv() may have parts of 2 messages
// Note that one recv() call returning 2+ messages doesn't work at the
// moment. This shouldn't be a problem until we have messages that aren't
Expand Down Expand Up @@ -66,9 +83,9 @@ void SocketServer::HandleOneConnection(int socket) {
Asn1Message::Ptr resp = Asn1Message::Empty();
switch (msg->present()) {
case AKIpUptaneMes_PR_getInfoReq: {
Uptane::EcuSerial serial = impl_->getSerial();
Uptane::HardwareIdentifier hw_id = impl_->getHwId();
PublicKey pk = impl_->getPublicKey();
Uptane::EcuSerial serial = impl_.getSerial();
Uptane::HardwareIdentifier hw_id = impl_.getHwId();
PublicKey pk = impl_.getPublicKey();
resp->present(AKIpUptaneMes_PR_getInfoResp);
auto r = resp->getInfoResp();
SetString(&r->ecuSerial, serial.ToString());
Expand All @@ -77,7 +94,7 @@ void SocketServer::HandleOneConnection(int socket) {
SetString(&r->key, pk.Value());
} break;
case AKIpUptaneMes_PR_manifestReq: {
std::string manifest = Utils::jsonToStr(impl_->getManifest());
std::string manifest = Utils::jsonToStr(impl_.getManifest());
resp->present(AKIpUptaneMes_PR_manifestResp);
auto r = resp->manifestResp();
r->manifest.present = manifest_PR_json;
Expand All @@ -103,7 +120,7 @@ void SocketServer::HandleOneConnection(int socket) {
}
bool ok;
try {
ok = impl_->putMetadata(meta_pack);
ok = impl_.putMetadata(meta_pack);
} catch (Uptane::SecurityException &e) {
LOG_WARNING << "Rejected metadata push because of security failure" << e.what();
ok = false;
Expand All @@ -115,10 +132,10 @@ void SocketServer::HandleOneConnection(int socket) {
case AKIpUptaneMes_PR_sendFirmwareReq: {
auto fw = msg->sendFirmwareReq();
auto fw_data = std::make_shared<std::string>(ToString(fw->firmware));
auto fut = std::async(std::launch::async, &Uptane::SecondaryInterface::sendFirmware, impl_, fw_data);
auto result = impl_.sendFirmware(fw_data);
resp->present(AKIpUptaneMes_PR_sendFirmwareResp);
auto r = resp->sendFirmwareResp();
r->result = fut.get() ? AKInstallationResult_success : AKInstallationResult_failure;
r->result = result ? AKInstallationResult_success : AKInstallationResult_failure;
} break;
default:
LOG_ERROR << "Unrecognised message type:" << msg->present();
Expand All @@ -145,34 +162,3 @@ void SocketServer::HandleOneConnection(int socket) {
// write error => Shutdown the socket
// Timeout on write => shutdown
}

SocketHandle SocketFromPort(in_port_t port) {
// manual socket creation
int socket_fd = socket(AF_INET6, SOCK_STREAM, 0);
if (socket_fd < 0) {
throw std::runtime_error("socket creation failed");
}
SocketHandle hdl(new int(socket_fd));
sockaddr_in6 sa{};

memset(&sa, 0, sizeof(sa));
sa.sin6_family = AF_INET6;
sa.sin6_port = htons(port);
sa.sin6_addr = IN6ADDR_ANY_INIT;

int v6only = 0;
if (setsockopt(*hdl, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) < 0) {
throw std::system_error(errno, std::system_category(), "setsockopt(IPV6_V6ONLY)");
}

int reuseaddr = 1;
if (setsockopt(*hdl, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) < 0) {
throw std::system_error(errno, std::system_category(), "setsockopt(SO_REUSEADDR)");
}

if (bind(*hdl, reinterpret_cast<const sockaddr *>(&sa), sizeof(sa)) < 0) {
throw std::system_error(errno, std::system_category(), "bind");
}

return hdl;
}
44 changes: 44 additions & 0 deletions src/aktualizr_secondary/secondary_tcp_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef AKTUALIZR_SECONDARY_TCP_SERVER_H_
#define AKTUALIZR_SECONDARY_TCP_SERVER_H_

#include "utilities/utils.h"

#include <atomic>

namespace Uptane {
class SecondaryInterface;
} // namespace Uptane
/**
* Listens on a socket, decodes calls (ASN.1) and forwards them to an Uptane Secondary
* implementation
*/
class SecondaryTcpServer {
public:
SecondaryTcpServer(Uptane::SecondaryInterface& secondary, const std::string& primary_ip, in_port_t primary_port,
in_port_t port = 0);

SecondaryTcpServer(Uptane::SecondaryInterface& secondary, in_port_t port = 0)
: keep_running_(true), impl_(secondary), listen_socket_(port) {}

SecondaryTcpServer(const SecondaryTcpServer&) = delete;
SecondaryTcpServer& operator=(const SecondaryTcpServer&) = delete;

public:
/**
* Accept connections on the socket, decode requests and respond using the secondary implementation
*/
void run();
void stop();

in_port_t port() const;

private:
void HandleOneConnection(int socket);

private:
std::atomic<bool> keep_running_;
Uptane::SecondaryInterface& impl_;
ListenSocket listen_socket_;
};

#endif // AKTUALIZR_SECONDARY_TCP_SERVER_H_
130 changes: 130 additions & 0 deletions src/aktualizr_secondary/secondary_tcp_server_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include <gtest/gtest.h>

#include "ipuptanesecondary.h"
#include "logging/logging.h"
#include "secondary_tcp_server.h"
#include "test_utils.h"
#include "uptane/secondaryinterface.h"

class SecondaryMock : public Uptane::SecondaryInterface {
public:
SecondaryMock(const Uptane::EcuSerial& serial, const Uptane::HardwareIdentifier& hdw_id, const PublicKey& pub_key,
const Json::Value& manifest)
: _serial(serial), _hdw_id(hdw_id), _pub_key(pub_key), _manifest(manifest) {}

public:
virtual Uptane::EcuSerial getSerial() { return _serial; }
virtual Uptane::HardwareIdentifier getHwId() { return _hdw_id; }
virtual PublicKey getPublicKey() { return _pub_key; }
virtual Json::Value getManifest() { return _manifest; }
virtual bool putMetadata(const Uptane::RawMetaPack& meta_pack) {
_metapack = meta_pack;
return true;
}
virtual int32_t getRootVersion(bool director) {
(void)director;
return 0;
}
virtual bool putRoot(const std::string& root, bool director) {
(void)root;
(void)director;
return true;
}

virtual bool sendFirmware(const std::shared_ptr<std::string>& data) {
_data = *data;
return true;
}

public:
const Uptane::EcuSerial _serial;
const Uptane::HardwareIdentifier _hdw_id;
const PublicKey _pub_key;
const Json::Value _manifest;

Uptane::RawMetaPack _metapack;
std::string _data;
};

bool operator==(const Uptane::RawMetaPack& lhs, const Uptane::RawMetaPack& rhs) {
return (lhs.director_root == rhs.director_root) && (lhs.image_root == rhs.image_root) &&
(lhs.director_targets == rhs.director_targets) && (lhs.image_snapshot == rhs.image_snapshot) &&
(lhs.image_timestamp == rhs.image_timestamp) && (lhs.image_targets == rhs.image_targets);
}

// Test the serialization/deserialization and the TCP/IP communication implementation
// that occurs during communication between Primary and IP Secondary
TEST(SecondaryTcpServer, TestIpSecondaryRPC) {
// secondary object on Secondary ECU
SecondaryMock secondary(Uptane::EcuSerial("serial"), Uptane::HardwareIdentifier("hardware-id"),
PublicKey("pub-key", KeyType::kED25519), Json::Value::null);

// create Secondary on Secondary ECU, and run it in a dedicated thread
SecondaryTcpServer secondary_server{secondary};
std::thread secondary_server_thread{[&secondary_server]() { secondary_server.run(); }};

// create Secondary on Primary ECU, try it a few times since the secondary thread
// might not be ready at the moment of the first try
const int max_try = 5;
std::shared_ptr<Uptane::SecondaryInterface> ip_secondary;
for (int ii = 0; ii < max_try && ip_secondary == nullptr; ++ii) {
auto secondary_res = Uptane::IpUptaneSecondary::connectAndCreate("localhost", secondary_server.port());
if (secondary_res.first) {
ip_secondary = secondary_res.second;
break;
}
}

ASSERT_TRUE(ip_secondary != nullptr) << "Failed to create IP Secondary";
EXPECT_EQ(ip_secondary->getSerial(), secondary.getSerial());
EXPECT_EQ(ip_secondary->getHwId(), secondary.getHwId());
EXPECT_EQ(ip_secondary->getPublicKey(), secondary.getPublicKey());
EXPECT_EQ(ip_secondary->getManifest(), secondary.getManifest());

Uptane::RawMetaPack meta_pack{"director-root", "director-target", "image_root",
"image_targets", "image_timestamp", "image_snapshot"};

EXPECT_TRUE(ip_secondary->putMetadata(meta_pack));
EXPECT_TRUE(meta_pack == secondary._metapack);

std::string firmware = "firmware";
EXPECT_TRUE(ip_secondary->sendFirmware(std::make_shared<std::string>(firmware)));
EXPECT_EQ(firmware, secondary._data);

secondary_server.stop();
secondary_server_thread.join();
}

TEST(SecondaryTcpServer, TestIpSecondaryIfSecondaryIsNotRunning) {
in_port_t secondary_port = TestUtils::getFreePortAsInt();
std::shared_ptr<Uptane::SecondaryInterface> ip_secondary;

// trying to connect to a non-running Secondary and create a corresponding instance on Primary
auto secondary_res = Uptane::IpUptaneSecondary::connectAndCreate("localhost", secondary_port);
if (secondary_res.first) {
ip_secondary = secondary_res.second;
}
EXPECT_EQ(ip_secondary, nullptr);

// create Primary's secondary without connecting to Secondary
ip_secondary = std::make_shared<Uptane::IpUptaneSecondary>("localhost", secondary_port, Uptane::EcuSerial("serial"),
Uptane::HardwareIdentifier("hwid"),
PublicKey("key", KeyType::kED25519));

Uptane::RawMetaPack meta_pack{"director-root", "director-target", "image_root",
"image_targets", "image_timestamp", "image_snapshot"};

// expect failures since the secondary is not running
EXPECT_EQ(ip_secondary->getManifest(), Json::Value());
EXPECT_FALSE(ip_secondary->sendFirmware(std::make_shared<std::string>("firmware")));
EXPECT_FALSE(ip_secondary->putMetadata(meta_pack));
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);

logger_init();
logger_set_threshold(boost::log::trivial::info);

return RUN_ALL_TESTS();
}
Loading

0 comments on commit 27db9c8

Please sign in to comment.