From 622e08d8c6b8260cebaa6bee5705d11004bef25b Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 5 Dec 2024 12:41:54 -0500 Subject: [PATCH] Update UDP output to support a "blocking" mode to slow down the output a bit. Make that the default as very few shows would be large enough to need the non-blocking mode. --- src/channeloutput/UDPOutput.cpp | 72 ++++++++++++++++++++++----------- src/channeloutput/UDPOutput.h | 1 + src/util/GPIOUtils.cpp | 6 ++- www/co-universes.php | 9 ++++- www/js/fpp.js | 8 ++-- 5 files changed, 65 insertions(+), 31 deletions(-) diff --git a/src/channeloutput/UDPOutput.cpp b/src/channeloutput/UDPOutput.cpp index 79f2b1570..6286e5959 100644 --- a/src/channeloutput/UDPOutput.cpp +++ b/src/channeloutput/UDPOutput.cpp @@ -232,7 +232,8 @@ UDPOutput::UDPOutput(unsigned int startChannel, unsigned int channelCount) : doneWorkCount(0), numWorkThreads(0), runWorkThreads(true), - useThreadedOutput(true) { + useThreadedOutput(true), + blockingOutput(false) { INSTANCE = this; } UDPOutput::~UDPOutput() { @@ -284,9 +285,10 @@ int UDPOutput::Init(Json::Value config) { break; } } - if (config.isMember("threaded")) { - useThreadedOutput = config["threaded"].asInt() ? true : false; + int style = config["threaded"].asInt(); + useThreadedOutput = style == 1 || style == 3; + blockingOutput = style == 0 || style == 1; } if (config.isMember("interface")) { outInterface = config["interface"].asString(); @@ -432,7 +434,6 @@ void UDPOutput::GetRequiredChannelRanges(const std::function& ad void UDPOutput::addOutput(UDPOutputData* out) { outputs.push_back(out); } - int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo, std::vector& sendmsgs) { errno = 0; struct mmsghdr* msgs = &sendmsgs[0]; @@ -444,30 +445,43 @@ int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo, int newSockKey = socketKey; int sendSocket = socketInfo->sockets[socketInfo->curSocket]; errno = 0; - int oc = sendmmsg(sendSocket, msgs, msgCount, MSG_DONTWAIT); + // uint64_t st = GetTimeMicros(); + int outputCount = 0; - if (oc > 0) { - outputCount = oc; - } - if (outputCount != msgCount) { - // in many cases, a simple thread yield will allow the network stack - // to flush some data and free up space, give that a chance first - std::this_thread::yield(); - oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT); + if (blockingOutput) { + for (int x = 0; x < msgCount; x++) { + ssize_t s = sendmsg(sendSocket, &msgs[x].msg_hdr, 0); + if (s != -1) { + ++outputCount; + } else { + // didn't send, we'll yield and re-send + --x; + std::this_thread::yield(); + } + } + } else { + int oc = sendmmsg(sendSocket, msgs, msgCount, MSG_DONTWAIT); if (oc > 0) { outputCount += oc; } + if (outputCount != msgCount) { + // in many cases, a simple thread yield will allow the network stack + // to flush some data and free up space, give that a chance first + std::this_thread::sleep_for(std::chrono::microseconds(100)); + oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT); + while (oc > 0) { + outputCount += oc; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT); + } + } } + // uint64_t ed = GetTimeMicros(); + // uint64_t tm = ed - st; + // printf("MSG: %d/%d %d \n", outputCount, msgCount, (int)tm); + int errCount = 0; while (outputCount != msgCount) { - if (errno != 0) { - LogErr(VB_CHANNELOUT, "sendmmsg() failed for UDP output (key: %X Socket: %d output count: %d/%d) with error: %d %s\n", - socketKey, sendSocket, - outputCount, msgCount, - errno, - strerror(errno)); - } - if (errno == EAGAIN || errno == EWOULDBLOCK) { if (socketKey != BROADCAST_MESSAGES_KEY) { ++socketInfo->curSocket; @@ -490,12 +504,19 @@ int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo, } ++errCount; if (errCount >= 10) { + LogErr(VB_CHANNELOUT, "sendmmsg() failed for UDP output (key: %X Socket: %d output count: %d/%d) with error: %d %s\n", + socketKey, sendSocket, + outputCount, msgCount, + errno, + strerror(errno)); return outputCount; } errno = 0; int oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT); - if (oc > 0) { + while (oc > 0) { outputCount += oc; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT); } } return outputCount; @@ -703,6 +724,9 @@ void UDPOutput::PingControllers(bool failedOnly) { } void UDPOutput::DumpConfig() { ChannelOutput::DumpConfig(); + LogDebug(VB_CHANNELOUT, " Interface : %s\n", outInterface.c_str()); + LogDebug(VB_CHANNELOUT, " Threaded : %d\n", useThreadedOutput); + LogDebug(VB_CHANNELOUT, " Blocking : %d\n", blockingOutput); for (auto u : outputs) { u->DumpConfig(); } @@ -756,8 +780,8 @@ int UDPOutput::createSocket(int port, bool broadCast) { close(sendSocket); return -1; } - // make sure the send buffer is actually set to a reasonable size - int bufSize = 512 * 1024; + // make sure the send buffer is actually set to a reasonable size for non-blocking mode + int bufSize = 1024 * blockingOutput ? 4 : 512; setsockopt(sendSocket, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize)); // these sockets are for sending only, don't need a large receive buffer so // free some memory by setting to just a single page diff --git a/src/channeloutput/UDPOutput.h b/src/channeloutput/UDPOutput.h index 6492ced06..8a70d63db 100644 --- a/src/channeloutput/UDPOutput.h +++ b/src/channeloutput/UDPOutput.h @@ -166,4 +166,5 @@ class UDPOutput : public ChannelOutput { std::atomic_int numWorkThreads; volatile bool runWorkThreads; bool useThreadedOutput; + bool blockingOutput; }; diff --git a/src/util/GPIOUtils.cpp b/src/util/GPIOUtils.cpp index 98bf2cfcd..34a707896 100644 --- a/src/util/GPIOUtils.cpp +++ b/src/util/GPIOUtils.cpp @@ -91,7 +91,11 @@ static const std::set PLATFORM_IGNORES{ "gpio-brcmstb@107d508520", "gpio-brcmstb@107d517c00", "gpio-brcmstb@107d517c20", - "pinctrl-rp1" // Pi5's external GPIO chip + "pinctrl-rp1", // Pi5's external GPIO chip + "tps65219-gpio", // AM62x + "4201000.gpio", + "600000.gpio", + "601000.gpio" }; // No platform information on how to control pins static std::string PROCESS_NAME = "FPPD"; diff --git a/www/co-universes.php b/www/co-universes.php index 200471c80..999b9bcc7 100644 --- a/www/co-universes.php +++ b/www/co-universes.php @@ -162,9 +162,14 @@ function PopulateInterfaces()
style="display:none;" > -
Multi-Threaded:
+
Sending:
- +
diff --git a/www/js/fpp.js b/www/js/fpp.js index 1602ee3cd..d334b148c 100644 --- a/www/js/fpp.js +++ b/www/js/fpp.js @@ -3454,7 +3454,7 @@ function populateUniverseData (data, reload, input) { $('#selE131interfaces').val(channelData.interface).prop('selected', true); } if (channelData.hasOwnProperty('threaded')) { - $('#E131ThreadedOutput').prop('checked', channelData.threaded); + $('#E131ThreadedOutput').val(channelData.threaded).prop('selected', true); } } UniverseCount = channelData.universes.length; @@ -3972,9 +3972,9 @@ function postUniverseJSON (input) { if (!input) { // output only properties output.interface = document.getElementById('selE131interfaces').value; - output.threaded = document.getElementById('E131ThreadedOutput').checked - ? 1 - : 0; + output.threaded = parseInt( + document.getElementById('E131ThreadedOutput').value + ); } else { // input only properties output.timeout = parseInt(document.getElementById('bridgeTimeoutMS').value);