Skip to content

Commit

Permalink
[MessageControl] Fixing UDP output and allowing it to store more than…
Browse files Browse the repository at this point in the history
… one message (#315)

* Reviving the UDP functionality of MessageControl plugin

* Removing unnecessary comments and adding the size as a static constexpr
  • Loading branch information
VeithMetro authored Sep 20, 2024
1 parent 3977a3c commit 216520c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion MessageControl/MessageControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace Thunder {
Announce(new Publishers::FileOutput(abbreviate, _config.FileName.Value()));
}
if ((_config.Remote.Binding.Value().empty() == false) && (_config.Remote.Port.Value() != 0)) {
Announce(new Publishers::UDPOutput(Core::NodeId(_config.Remote.NodeId())));
Announce(new Publishers::UDPOutput(abbreviate, Core::NodeId(_config.Remote.NodeId())));
}

_webSocketExporter.Initialize(service, _config.MaxExportConnections.Value());
Expand Down
24 changes: 13 additions & 11 deletions MessageControl/MessageOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,32 +142,34 @@ namespace Publishers {
{
}

void UDPOutput::Channel::Output(const Core::Messaging::Metadata& metadata, const Core::Messaging::IEvent* message)
void UDPOutput::Channel::Output(const string& text)
{
_adminLock.Lock();

uint16_t length = 0;
ASSERT(metadata.Type() != Core::Messaging::Metadata::INVALID);
ASSERT((_loaded + text.length() + 1) < sizeof(_sendBuffer));

length += metadata.Serialize(_sendBuffer + length, sizeof(_sendBuffer) - length);
length += message->Serialize(_sendBuffer + length, sizeof(_sendBuffer) - length);
_loaded = length;
if ((_loaded + text.length() + 1) < sizeof(_sendBuffer)) {
Core::FrameType<0> frame(_sendBuffer + _loaded, sizeof(_sendBuffer) - _loaded, sizeof(_sendBuffer) - _loaded);
Core::FrameType<0>::Writer frameWriter(frame, 0);
frameWriter.NullTerminatedText(text);
_loaded += frameWriter.Offset();
}

_adminLock.Unlock();

Trigger();
}

UDPOutput::UDPOutput(const Core::NodeId& nodeId)
: _output(nodeId) {
UDPOutput::UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId)
: _convertor(abbreviate)
, _output(nodeId)
{
_output.Open(0);
}

void UDPOutput::Message(const Core::Messaging::MessageInfo& metadata, const string& text) /* override */
{
//yikes, recreating stuff from received pieces
Messaging::TextMessage textMessage(text);
_output.Output(metadata, &textMessage);
_output.Output(_convertor.Convert(metadata, text));
}

} // namespace Publishers
Expand Down
9 changes: 6 additions & 3 deletions MessageControl/MessageOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ namespace Publishers {

class UDPOutput : public IPublish {
private:
static constexpr uint16_t UDPBufferSize = 4 * 1024;

class Channel : public Core::SocketDatagram {
public:
Channel() = delete;
Expand All @@ -318,15 +320,15 @@ namespace Publishers {
explicit Channel(const Core::NodeId& nodeId);
~Channel() override;

void Output(const Core::Messaging::Metadata& metadata, const Core::Messaging::IEvent* message);
void Output(const string& text);

private:
uint16_t SendData(uint8_t* dataFrame, const uint16_t maxSendSize) override;
// Unused
uint16_t ReceiveData(uint8_t*, const uint16_t) override;
void StateChange() override;

uint8_t _sendBuffer[Messaging::MessageUnit::TempDataBufferSize];
uint8_t _sendBuffer[UDPBufferSize];
uint16_t _loaded;
Core::CriticalSection _adminLock;
};
Expand All @@ -336,12 +338,13 @@ namespace Publishers {
UDPOutput(const UDPOutput&) = delete;
UDPOutput& operator=(const UDPOutput&) = delete;

explicit UDPOutput(const Core::NodeId& nodeId);
explicit UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId);
~UDPOutput() = default;

void Message(const Core::Messaging::MessageInfo& metadata, const string& text);

private:
Text _convertor;
Channel _output;
};

Expand Down

0 comments on commit 216520c

Please sign in to comment.