Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
OTA-4174: IP Secondary TCP server refactoring
Browse files Browse the repository at this point in the history
- improve the TCP server implementation
- add tests for the TCP server and the RPC (serialization/deserialization)

Signed-off-by: Mike Sul <ext-mykhaylo.sul@here.com>
  • Loading branch information
Mike Sul committed Jan 15, 2020
1 parent 4fd855c commit 770e25d
Show file tree
Hide file tree
Showing 17 changed files with 321 additions and 157 deletions.
7 changes: 5 additions & 2 deletions src/aktualizr_secondary/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(AKTUALIZR_SECONDARY_LIB_SRC
aktualizr_secondary_factory.cc
aktualizr_secondary_config.cc
aktualizr_secondary_metadata.cc
socket_server.cc
secondary_tcp_server.cc
)

# do not link tests with libaktualizr
Expand Down Expand Up @@ -50,7 +50,7 @@ set(ALL_AKTUALIZR_SECONDARY_HEADERS
aktualizr_secondary_factory.h
aktualizr_secondary_config.h
aktualizr_secondary_metadata.h
socket_server.h
secondary_tcp_server.h
)

include(AddAktualizrTest)
Expand All @@ -61,6 +61,9 @@ list(INSERT TEST_LIBS 0 aktualizr_secondary_lib)
add_aktualizr_test(NAME aktualizr_secondary_config
SOURCES aktualizr_secondary_config_test.cc PROJECT_WORKING_DIRECTORY LIBRARIES aktualizr_secondary_lib)

add_aktualizr_test(NAME secondary_tcp_server
SOURCES secondary_tcp_server_test.cc PROJECT_WORKING_DIRECTORY)

if(BUILD_OSTREE)
add_aktualizr_test(NAME aktualizr_secondary_ostree
SOURCES aktualizr_secondary_ostree_test.cc
Expand Down
1 change: 0 additions & 1 deletion src/aktualizr_secondary/aktualizr_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifdef BUILD_OSTREE
#include "package_manager/ostreemanager.h" // TODO: Hide behind PackageManagerInterface
#endif
#include "socket_server.h"
#include "utilities/utils.h"

AktualizrSecondary::AktualizrSecondary(const AktualizrSecondaryConfig& config,
Expand Down
4 changes: 1 addition & 3 deletions src/aktualizr_secondary/aktualizr_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "uptane/secondaryinterface.h"

#include "crypto/keymanager.h"
#include "socket_server.h"
#include "storage/invstorage.h"
#include "utilities/types.h"
#include "utilities/utils.h"
Expand All @@ -24,8 +23,7 @@ class AktualizrSecondary : public Uptane::SecondaryInterface {
using Ptr = std::shared_ptr<AktualizrSecondary>;

public:
AktualizrSecondary(const AktualizrSecondaryConfig& config,
const std::shared_ptr<INvStorage>& storage,
AktualizrSecondary(const AktualizrSecondaryConfig& config, const std::shared_ptr<INvStorage>& storage,
const std::shared_ptr<KeyManager>& key_mngr,
const std::shared_ptr<PackageManagerInterface>& pacman);

Expand Down
1 change: 0 additions & 1 deletion src/aktualizr_secondary/aktualizr_secondary_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ AktualizrSecondary::Ptr AktualizrSecondaryFactory::create(const AktualizrSeconda

AktualizrSecondary::Ptr AktualizrSecondaryFactory::create(const AktualizrSecondaryConfig& config,
const std::shared_ptr<INvStorage>& storage) {

auto key_mngr = std::make_shared<KeyManager>(storage, config.keymanagerConfig());
std::shared_ptr<PackageManagerInterface> pacman =
PackageManagerFactory::makePackageManager(config.pacman, config.bootloader, storage, nullptr);
Expand Down
3 changes: 2 additions & 1 deletion src/aktualizr_secondary/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "utilities/utils.h"

#include "logging/logging.h"
#include "secondary_tcp_server.h"

namespace bpo = boost::program_options;

Expand Down Expand Up @@ -87,7 +88,7 @@ int main(int argc, char *argv[]) {
LOG_DEBUG << "Current directory: " << boost::filesystem::current_path().string();

auto secondary = AktualizrSecondaryFactory::create(config);
// secondary->run();
SecondaryTcpServer(*secondary, config.network.primary_ip, config.network.primary_port, config.network.port).run();

} catch (std::runtime_error &exc) {
LOG_ERROR << "Error: " << exc.what();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
#include "socket_server.h"

#include <future>
#include "secondary_tcp_server.h"

#include "AKIpUptaneMes.h"
#include "asn1/asn1_message.h"
#include "logging/logging.h"
#include "uptane/secondaryinterface.h"
#include "utilities/dequeue_buffer.h"
#include "utilities/sockaddr_io.h"

#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>

void SocketServer::Run() {
if (listen(*socket_, SOMAXCONN) < 0) {
SecondaryTcpServer::SecondaryTcpServer(Uptane::SecondaryInterface &secondary, const std::string &primary_ip,
in_port_t primary_port, in_port_t port)
: SecondaryTcpServer(secondary, port) {
ConnectionSocket conn_socket(primary_ip, primary_port, listen_socket_.port());
if (conn_socket.connect() == 0) {
LOG_INFO << "Connected to Primary, sending info about this secondary...";
HandleOneConnection(*conn_socket);
} else {
LOG_INFO << "Failed to connect to Primary";
}
}

void SecondaryTcpServer::run() {
if (listen(*listen_socket_, SOMAXCONN) < 0) {
throw std::system_error(errno, std::system_category(), "listen");
}
LOG_INFO << "Listening on " << Utils::ipGetSockaddr(*socket_);
LOG_INFO << "Secondary TCP server listens on " << listen_socket_.toString();

while (true) {
while (keep_running_.load()) {
int con_fd;
sockaddr_storage peer_sa{};
socklen_t peer_sa_size = sizeof(sockaddr_storage);

LOG_DEBUG << "Waiting for connection from client...";
if ((con_fd = accept(*socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size)) == -1) {
if ((con_fd = accept(*listen_socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size)) == -1) {
LOG_INFO << "Socket accept failed. aborting";
break;
}
LOG_DEBUG << "Connected...";
HandleOneConnection(con_fd);
LOG_DEBUG << "Client disconnected";
}
LOG_INFO << "Secondary TCP server exit";
}

void SocketServer::HandleOneConnection(int socket) {
void SecondaryTcpServer::stop() {
keep_running_ = false;
// unblock accept
ConnectionSocket("localhost", listen_socket_.port()).connect();
}

in_port_t SecondaryTcpServer::port() const { return listen_socket_.port(); }

void SecondaryTcpServer::HandleOneConnection(int socket) {
// Outside the message loop, because one recv() may have parts of 2 messages
// Note that one recv() call returning 2+ messages doesn't work at the
// moment. This shouldn't be a problem until we have messages that aren't
Expand Down Expand Up @@ -66,9 +83,9 @@ void SocketServer::HandleOneConnection(int socket) {
Asn1Message::Ptr resp = Asn1Message::Empty();
switch (msg->present()) {
case AKIpUptaneMes_PR_getInfoReq: {
Uptane::EcuSerial serial = impl_->getSerial();
Uptane::HardwareIdentifier hw_id = impl_->getHwId();
PublicKey pk = impl_->getPublicKey();
Uptane::EcuSerial serial = impl_.getSerial();
Uptane::HardwareIdentifier hw_id = impl_.getHwId();
PublicKey pk = impl_.getPublicKey();
resp->present(AKIpUptaneMes_PR_getInfoResp);
auto r = resp->getInfoResp();
SetString(&r->ecuSerial, serial.ToString());
Expand All @@ -77,7 +94,7 @@ void SocketServer::HandleOneConnection(int socket) {
SetString(&r->key, pk.Value());
} break;
case AKIpUptaneMes_PR_manifestReq: {
std::string manifest = Utils::jsonToStr(impl_->getManifest());
std::string manifest = Utils::jsonToStr(impl_.getManifest());
resp->present(AKIpUptaneMes_PR_manifestResp);
auto r = resp->manifestResp();
r->manifest.present = manifest_PR_json;
Expand All @@ -103,7 +120,7 @@ void SocketServer::HandleOneConnection(int socket) {
}
bool ok;
try {
ok = impl_->putMetadata(meta_pack);
ok = impl_.putMetadata(meta_pack);
} catch (Uptane::SecurityException &e) {
LOG_WARNING << "Rejected metadata push because of security failure" << e.what();
ok = false;
Expand All @@ -115,10 +132,10 @@ void SocketServer::HandleOneConnection(int socket) {
case AKIpUptaneMes_PR_sendFirmwareReq: {
auto fw = msg->sendFirmwareReq();
auto fw_data = std::make_shared<std::string>(ToString(fw->firmware));
auto fut = std::async(std::launch::async, &Uptane::SecondaryInterface::sendFirmware, impl_, fw_data);
auto result = impl_.sendFirmware(fw_data);
resp->present(AKIpUptaneMes_PR_sendFirmwareResp);
auto r = resp->sendFirmwareResp();
r->result = fut.get() ? AKInstallationResult_success : AKInstallationResult_failure;
r->result = result ? AKInstallationResult_success : AKInstallationResult_failure;
} break;
default:
LOG_ERROR << "Unrecognised message type:" << msg->present();
Expand All @@ -145,34 +162,3 @@ void SocketServer::HandleOneConnection(int socket) {
// write error => Shutdown the socket
// Timeout on write => shutdown
}

SocketHandle SocketFromPort(in_port_t port) {
// manual socket creation
int socket_fd = socket(AF_INET6, SOCK_STREAM, 0);
if (socket_fd < 0) {
throw std::runtime_error("socket creation failed");
}
SocketHandle hdl(new int(socket_fd));
sockaddr_in6 sa{};

memset(&sa, 0, sizeof(sa));
sa.sin6_family = AF_INET6;
sa.sin6_port = htons(port);
sa.sin6_addr = IN6ADDR_ANY_INIT;

int v6only = 0;
if (setsockopt(*hdl, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) < 0) {
throw std::system_error(errno, std::system_category(), "setsockopt(IPV6_V6ONLY)");
}

int reuseaddr = 1;
if (setsockopt(*hdl, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) < 0) {
throw std::system_error(errno, std::system_category(), "setsockopt(SO_REUSEADDR)");
}

if (bind(*hdl, reinterpret_cast<const sockaddr *>(&sa), sizeof(sa)) < 0) {
throw std::system_error(errno, std::system_category(), "bind");
}

return hdl;
}
44 changes: 44 additions & 0 deletions src/aktualizr_secondary/secondary_tcp_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef AKTUALIZR_SECONDARY_TCP_SERVER_H_
#define AKTUALIZR_SECONDARY_TCP_SERVER_H_

#include "utilities/utils.h"

#include <atomic>

namespace Uptane {
class SecondaryInterface;
} // namespace Uptane
/**
* Listens on a socket, decodes calls (ASN.1) and forwards them to an Uptane Secondary
* implementation
*/
class SecondaryTcpServer {
public:
SecondaryTcpServer(Uptane::SecondaryInterface& secondary, const std::string& primary_ip, in_port_t primary_port,
in_port_t port = 0);

SecondaryTcpServer(Uptane::SecondaryInterface& secondary, in_port_t port = 0)
: keep_running_(true), impl_(secondary), listen_socket_(port) {}

SecondaryTcpServer(const SecondaryTcpServer&) = delete;
SecondaryTcpServer& operator=(const SecondaryTcpServer&) = delete;

public:
/**
* Accept connections on the socket, decode requests and respond using the secondary implementation
*/
void run();
void stop();

in_port_t port() const;

private:
void HandleOneConnection(int socket);

private:
std::atomic<bool> keep_running_;
Uptane::SecondaryInterface& impl_;
ListenSocket listen_socket_;
};

#endif // AKTUALIZR_SECONDARY_TCP_SERVER_H_
Loading

0 comments on commit 770e25d

Please sign in to comment.