Skip to content

Commit

Permalink
FlatBuffers: add support for high performance local sockets
Browse files Browse the repository at this point in the history
Windows/Unix domain sockets offer much higher performance than TCP, can be used by external grabbers
Local server name is: hyperhdr-domain

awawa-dev#215 (comment)
  • Loading branch information
awawa-dev authored and chbartsch committed Nov 29, 2022
1 parent 4424f26 commit 85f30a5
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 97 deletions.
23 changes: 13 additions & 10 deletions include/flatbufserver/FlatBufferConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <QColor>
#include <QImage>
#include <QTcpSocket>
#include <QLocalSocket>
#include <QTimer>
#include <QMap>

Expand All @@ -19,6 +20,8 @@ namespace hyperhdrnet
struct Reply;
}

#define HYPERHDR_DOMAIN_SERVER QStringLiteral("hyperhdr-domain")

///
/// Connection class to setup an connection to the hyperhdr server and execute commands.
///
Expand Down Expand Up @@ -113,22 +116,22 @@ private slots:

private:
/// The TCP-Socket with the connection to the server
QTcpSocket _socket;

QString _origin;
int _priority;
QTcpSocket* _socket;
QLocalSocket* _domain;
QString _origin;
int _priority;

/// Host address
QString _host;
QString _host;

/// Host port
uint16_t _port;
uint16_t _port;

/// buffer for reply
QByteArray _receiveBuffer;

QTimer _timer;
QAbstractSocket::SocketState _prevSocketState;
QByteArray _receiveBuffer;
QTimer _timer;
QAbstractSocket::SocketState _prevSocketState;
QLocalSocket::LocalSocketState _prevLocalState;

Logger* _log;
flatbuffers::FlatBufferBuilder _builder;
Expand Down
16 changes: 11 additions & 5 deletions include/flatbufserver/FlatBufferServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

class BonjourServiceRegister;
class QTcpServer;
class QLocalServer;
class FlatBufferClient;
class NetOrigin;

#define HYPERHDR_DOMAIN_SERVER QStringLiteral("hyperhdr-domain")

///
/// @brief A TcpServer to receive images of different formats with Google Flatbuffer
Expand Down Expand Up @@ -75,13 +77,17 @@ private slots:
///
void loadLutFile();

void setupClient(FlatBufferClient* client);


private:
QTcpServer* _server;
NetOrigin* _netOrigin;
Logger* _log;
int _timeout;
quint16 _port;
QTcpServer* _server;
QLocalServer* _domain;
NetOrigin* _netOrigin;
Logger* _log;
int _timeout;
quint16 _port;

const QJsonDocument _config;
BonjourServiceRegister* _serviceRegister = nullptr;

Expand Down
39 changes: 21 additions & 18 deletions sources/base/MessageForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,30 @@ void MessageForwarder::addJsonSlave(const QString& slave)

void MessageForwarder::addFlatbufferSlave(const QString& slave)
{
QStringList parts = slave.split(":");
if (parts.size() != 2)
if (slave != HYPERHDR_DOMAIN_SERVER)
{
Error(_log, "Unable to parse address (%s)", QSTRING_CSTR(slave));
return;
}
QStringList parts = slave.split(":");
if (parts.size() != 2)
{
Error(_log, "Unable to parse address (%s)", QSTRING_CSTR(slave));
return;
}

bool ok;
parts[1].toUShort(&ok);
if (!ok)
{
Error(_log, "Unable to parse port number (%s)", QSTRING_CSTR(parts[1]));
return;
}
bool ok;
parts[1].toUShort(&ok);
if (!ok)
{
Error(_log, "Unable to parse port number (%s)", QSTRING_CSTR(parts[1]));
return;
}

// verify loop with flatbufserver
const QJsonObject& obj = _hyperhdr->getSetting(settings::type::FLATBUFSERVER).object();
if (QHostAddress(parts[0]) == QHostAddress::LocalHost && parts[1].toInt() == obj["port"].toInt())
{
Error(_log, "Loop between Flatbuffer Server and Forwarder! (%s)", QSTRING_CSTR(slave));
return;
// verify loop with flatbufserver
const QJsonObject& obj = _hyperhdr->getSetting(settings::type::FLATBUFSERVER).object();
if (QHostAddress(parts[0]) == QHostAddress::LocalHost && parts[1].toInt() == obj["port"].toInt())
{
Error(_log, "Loop between Flatbuffer Server and Forwarder! (%s)", QSTRING_CSTR(slave));
return;
}
}

if (_forwarder_enabled && !_flatSlaves.contains(slave))
Expand Down
54 changes: 44 additions & 10 deletions sources/flatbufserver/FlatBufferClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,55 @@

// qt
#include <QTcpSocket>
#include <QLocalSocket>
#include <QHostAddress>
#include <QTimer>
#include <QRgb>

// util includes
#include <utils/FrameDecoder.h>

FlatBufferClient::FlatBufferClient(QTcpSocket* socket, int timeout, int hdrToneMappingEnabled, uint8_t* lutBuffer, QObject* parent)
FlatBufferClient::FlatBufferClient(QTcpSocket* socket, QLocalSocket* domain, int timeout, int hdrToneMappingEnabled, uint8_t* lutBuffer, QObject* parent)
: QObject(parent)
, _log(Logger::getInstance("FLATBUFSERVER"))
, _socket(socket)
, _clientAddress("@" + socket->peerAddress().toString())
, _domain(domain)
, _clientAddress("@LocalSocket")
, _timeoutTimer(new QTimer(this))
, _timeout(timeout * 1000)
, _priority()
, _hdrToneMappingMode(hdrToneMappingEnabled)
, _lutBuffer(lutBuffer)
{
if (_socket != nullptr)
_clientAddress = "@" + _socket->peerAddress().toString();

// timer setup
_timeoutTimer->setSingleShot(true);
_timeoutTimer->setInterval(_timeout);
connect(_timeoutTimer, &QTimer::timeout, this, &FlatBufferClient::forceClose);

// connect socket signals
connect(_socket, &QTcpSocket::readyRead, this, &FlatBufferClient::readyRead);
connect(_socket, &QTcpSocket::disconnected, this, &FlatBufferClient::disconnected);
if (_socket != nullptr)
{
connect(_socket, &QTcpSocket::readyRead, this, &FlatBufferClient::readyRead);
connect(_socket, &QTcpSocket::disconnected, this, &FlatBufferClient::disconnected);
}
else if (_domain != nullptr)
{
connect(_domain, &QLocalSocket::readyRead, this, &FlatBufferClient::readyRead);
connect(_domain, &QLocalSocket::disconnected, this, &FlatBufferClient::disconnected);
}
}

void FlatBufferClient::readyRead()
{
_timeoutTimer->start();

_receiveBuffer += _socket->readAll();
if (_socket != nullptr)
_receiveBuffer += _socket->readAll();
else if (_domain != nullptr)
_receiveBuffer += _domain->readAll();

// check if we can read a header
while (_receiveBuffer.size() >= 4)
Expand Down Expand Up @@ -67,7 +83,10 @@ void FlatBufferClient::readyRead()

void FlatBufferClient::forceClose()
{
_socket->close();
if (_socket != nullptr)
_socket->close();
if (_domain != nullptr)
_domain->close();
}

void FlatBufferClient::setHdrToneMappingEnabled(int mode, uint8_t* lutBuffer)
Expand All @@ -79,7 +98,12 @@ void FlatBufferClient::setHdrToneMappingEnabled(int mode, uint8_t* lutBuffer)
void FlatBufferClient::disconnected()
{
Debug(_log, "Socket Closed");
_socket->deleteLater();

if (_socket != nullptr)
_socket->deleteLater();
if (_domain != nullptr)
_domain->deleteLater();

if (_priority != 0 && _priority >= 100 && _priority < 200)
emit clearGlobalInput(_priority);

Expand Down Expand Up @@ -212,9 +236,19 @@ void FlatBufferClient::sendMessage()
auto size = _builder.GetSize();
const uint8_t* buffer = _builder.GetBufferPointer();
uint8_t sizeData[] = { uint8_t(size >> 24), uint8_t(size >> 16), uint8_t(size >> 8), uint8_t(size) };
_socket->write((const char*)sizeData, sizeof(sizeData));
_socket->write((const char*)buffer, size);
_socket->flush();

if (_socket != nullptr)
{
_socket->write((const char*)sizeData, sizeof(sizeData));
_socket->write((const char*)buffer, size);
_socket->flush();
}
else if (_domain != nullptr)
{
_domain->write((const char*)sizeData, sizeof(sizeData));
_domain->write((const char*)buffer, size);
_domain->flush();
}
}

void FlatBufferClient::sendSuccessReply()
Expand Down
16 changes: 9 additions & 7 deletions sources/flatbufserver/FlatBufferClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "hyperhdr_request_generated.h"

class QTcpSocket;
class QLocalSocket;
class QTimer;

///
Expand All @@ -26,7 +27,7 @@ class FlatBufferClient : public QObject
/// @param timeout The timeout when a client is automatically disconnected and the priority unregistered
/// @param parent The parent
///
explicit FlatBufferClient(QTcpSocket* socket, int timeout, int hdrToneMappingEnabled, uint8_t* lutBuffer, QObject* parent = nullptr);
explicit FlatBufferClient(QTcpSocket* socket, QLocalSocket* domain, int timeout, int hdrToneMappingEnabled, uint8_t* lutBuffer, QObject* parent = nullptr);

signals:
///
Expand Down Expand Up @@ -134,12 +135,13 @@ private slots:
void sendErrorReply(const std::string& error);

private:
Logger* _log;
QTcpSocket* _socket;
const QString _clientAddress;
QTimer* _timeoutTimer;
int _timeout;
int _priority;
Logger* _log;
QTcpSocket* _socket;
QLocalSocket* _domain;
QString _clientAddress;
QTimer* _timeoutTimer;
int _timeout;
int _priority;

QByteArray _receiveBuffer;

Expand Down
Loading

0 comments on commit 85f30a5

Please sign in to comment.