Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use condition variable to shutdown websocket pools #5721

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
- Dev: Unified parsing of historic and live IRC messages. (#5678)
- Dev: 7TV's `entitlement.reset` is now explicitly ignored. (#5685)
- Dev: Qt 6.8 and later now default to the GDI fontengine. (#5710)
- Dev: Moved to condition variables when shutting down worker threads. (#5721)

## 2.5.1

Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ set(SOURCE_FILES
util/LayoutHelper.hpp
util/LoadPixmap.cpp
util/LoadPixmap.hpp
util/OnceFlag.cpp
util/OnceFlag.hpp
util/RapidjsonHelpers.cpp
util/RapidjsonHelpers.hpp
util/RatelimitBucket.cpp
Expand Down
48 changes: 34 additions & 14 deletions src/providers/liveupdates/BasicPubSubManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
#include "providers/twitch/PubSubHelpers.hpp"
#include "util/DebugCount.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"
#include "util/RenameThread.hpp"

#include <pajlada/signals/signal.hpp>
#include <QJsonObject>
#include <QScopeGuard>
#include <QString>
#include <QStringBuilder>
#include <websocketpp/client.hpp>
Expand Down Expand Up @@ -120,6 +122,11 @@ class BasicPubSubManager
this->work_ = std::make_shared<boost::asio::io_service::work>(
this->websocketClient_.get_io_service());
this->mainThread_.reset(new std::thread([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});

runThread();
}));

Expand All @@ -142,22 +149,34 @@ class BasicPubSubManager

this->work_.reset();

if (this->mainThread_->joinable())
if (!this->mainThread_->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->mainThread_.get());
if (joiner.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
qCWarning(chatterinoLiveupdates)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient_.stop();
}
return;
}

// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->mainThread_->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient_.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->mainThread_->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->mainThread_->detach();
}

protected:
Expand Down Expand Up @@ -394,6 +413,7 @@ class BasicPubSubManager

liveupdates::WebsocketClient websocketClient_;
std::unique_ptr<std::thread> mainThread_;
OnceFlag stoppedFlag_;

const QString host_;

Expand Down
50 changes: 34 additions & 16 deletions src/providers/twitch/PubSubManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#include "util/RenameThread.hpp"

#include <QJsonArray>
#include <QScopeGuard>

#include <algorithm>
#include <exception>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
Expand Down Expand Up @@ -560,6 +560,11 @@ void PubSub::start()
this->work = std::make_shared<boost::asio::io_service::work>(
this->websocketClient.get_io_service());
this->thread = std::make_unique<std::thread>([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});

runThread();
});
renameThread(*this->thread, "PubSub");
Expand All @@ -578,23 +583,36 @@ void PubSub::stop()

this->work.reset();

if (this->thread->joinable())
if (!this->thread->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->thread.get());
if (joiner.wait_for(1s) == std::future_status::timeout)
{
qCWarning(chatterinoPubSub)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient.stop();
}
return;
}

// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->thread->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->thread->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->thread->detach();
}

bool PubSub::listenToWhispers()
Expand Down
3 changes: 3 additions & 0 deletions src/providers/twitch/PubSubManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "providers/twitch/PubSubClientOptions.hpp"
#include "providers/twitch/PubSubWebsocket.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"

#include <boost/asio/io_service.hpp>
#include <boost/asio/ssl/context.hpp>
Expand Down Expand Up @@ -267,6 +268,8 @@ class PubSub
const QString host_;
const PubSubClientOptions clientOptions_;

OnceFlag stoppedFlag_;

bool stopping_{false};

#ifdef FRIEND_TEST
Expand Down
33 changes: 33 additions & 0 deletions src/util/OnceFlag.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "util/OnceFlag.hpp"

namespace chatterino {

OnceFlag::OnceFlag() = default;
OnceFlag::~OnceFlag() = default;

void OnceFlag::set()
{
{
std::unique_lock guard(this->mutex);
this->flag = true;
}
this->condvar.notify_all();
}

bool OnceFlag::waitFor(std::chrono::milliseconds ms)
{
std::unique_lock lock(this->mutex);
return this->condvar.wait_for(lock, ms, [this] {
return this->flag;
});
}

void OnceFlag::wait()
{
std::unique_lock lock(this->mutex);
this->condvar.wait(lock, [this] {
return this->flag;
});
}

} // namespace chatterino
41 changes: 41 additions & 0 deletions src/util/OnceFlag.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <chrono>
#include <condition_variable>
#include <mutex>

namespace chatterino {

/// @brief A flag that can only be set once which notifies waiters.
///
/// This can be used to synchronize with other threads. Note that waiting
/// threads will be suspended.
class OnceFlag
{
public:
OnceFlag();
~OnceFlag();

/// Set this flag and notify waiters
void set();

/// @brief Wait for at most `ms` until this flag is set.
///
/// The calling thread will be suspended during the wait.
///
/// @param ms The maximum time to wait for this flag
/// @returns `true` if this flag was set during the wait or before
bool waitFor(std::chrono::milliseconds ms);

/// @brief Wait until this flag is set by another thread
///
/// The calling thread will be suspended during the wait.
void wait();

private:
std::mutex mutex;
std::condition_variable condvar;
bool flag = false;
};

} // namespace chatterino
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(test_SOURCES
${CMAKE_CURRENT_LIST_DIR}/src/Plugins.cpp
${CMAKE_CURRENT_LIST_DIR}/src/TwitchIrc.cpp
${CMAKE_CURRENT_LIST_DIR}/src/IgnoreController.cpp
${CMAKE_CURRENT_LIST_DIR}/src/OnceFlag.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.hpp
# Add your new file above this line!
Expand Down
88 changes: 88 additions & 0 deletions tests/src/OnceFlag.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "util/OnceFlag.hpp"

#include "Test.hpp"

#include <thread>

using namespace chatterino;

// this test shouldn't time out (no assert necessary)
TEST(OnceFlag, basic)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{50});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();
stoppedFlag.wait();

t.join();
}

TEST(OnceFlag, waitFor)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();

std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();

auto start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{200}));
auto stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{150});

start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{1000}));
stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{10});

start = std::chrono::system_clock::now();
stoppedFlag.wait();
stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{10});

t.join();
}

TEST(OnceFlag, waitForTimeout)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();

ASSERT_FALSE(stoppedFlag.waitFor(std::chrono::milliseconds{25}));
stoppedFlag.wait();

t.join();
}
Loading