Skip to content

Commit

Permalink
chore: use condition variable to shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Nerixyz committed Nov 20, 2024
1 parent 19f4498 commit 8b36069
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 30 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ set(SOURCE_FILES
util/LayoutHelper.hpp
util/LoadPixmap.cpp
util/LoadPixmap.hpp
util/OnceFlag.cpp
util/OnceFlag.hpp
util/RapidjsonHelpers.cpp
util/RapidjsonHelpers.hpp
util/RatelimitBucket.cpp
Expand Down
47 changes: 33 additions & 14 deletions src/providers/liveupdates/BasicPubSubManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "providers/twitch/PubSubHelpers.hpp"
#include "util/DebugCount.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"
#include "util/RenameThread.hpp"

#include <pajlada/signals/signal.hpp>
Expand Down Expand Up @@ -120,6 +121,11 @@ class BasicPubSubManager
this->work_ = std::make_shared<boost::asio::io_service::work>(
this->websocketClient_.get_io_service());
this->mainThread_.reset(new std::thread([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});

runThread();
}));

Expand All @@ -142,22 +148,34 @@ class BasicPubSubManager

this->work_.reset();

if (this->mainThread_->joinable())
if (!this->mainThread_->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->mainThread_.get());
if (joiner.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
qCWarning(chatterinoLiveupdates)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient_.stop();
}
return;
}

// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->mainThread_->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient_.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->mainThread_->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->mainThread_->detach();
}

protected:
Expand Down Expand Up @@ -394,6 +412,7 @@ class BasicPubSubManager

liveupdates::WebsocketClient websocketClient_;
std::unique_ptr<std::thread> mainThread_;
OnceFlag stoppedFlag_;

const QString host_;

Expand Down
49 changes: 33 additions & 16 deletions src/providers/twitch/PubSubManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <algorithm>
#include <exception>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
Expand Down Expand Up @@ -560,6 +559,11 @@ void PubSub::start()
this->work = std::make_shared<boost::asio::io_service::work>(
this->websocketClient.get_io_service());
this->thread = std::make_unique<std::thread>([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});

runThread();
});
renameThread(*this->thread, "PubSub");
Expand All @@ -578,23 +582,36 @@ void PubSub::stop()

this->work.reset();

if (this->thread->joinable())
if (!this->thread->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->thread.get());
if (joiner.wait_for(1s) == std::future_status::timeout)
{
qCWarning(chatterinoPubSub)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient.stop();
}
return;
}

// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->thread->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->thread->join();
return;
}

qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->thread->detach();
}

bool PubSub::listenToWhispers()
Expand Down
3 changes: 3 additions & 0 deletions src/providers/twitch/PubSubManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "providers/twitch/PubSubClientOptions.hpp"
#include "providers/twitch/PubSubWebsocket.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"

#include <boost/asio/io_service.hpp>
#include <boost/asio/ssl/context.hpp>
Expand Down Expand Up @@ -267,6 +268,8 @@ class PubSub
const QString host_;
const PubSubClientOptions clientOptions_;

OnceFlag stoppedFlag_;

bool stopping_{false};

#ifdef FRIEND_TEST
Expand Down
33 changes: 33 additions & 0 deletions src/util/OnceFlag.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "util/OnceFlag.hpp"

namespace chatterino {

OnceFlag::OnceFlag() = default;
OnceFlag::~OnceFlag() = default;

void OnceFlag::set()
{
{
std::unique_lock guard(this->mutex);
this->flag = true;
}
this->condvar.notify_all();
}

bool OnceFlag::waitFor(std::chrono::milliseconds ms)
{
std::unique_lock lock(this->mutex);
return this->condvar.wait_for(lock, ms, [this] {
return this->flag;
});
}

void OnceFlag::wait()
{
std::unique_lock lock(this->mutex);
this->condvar.wait(lock, [this] {
return this->flag;
});
}

} // namespace chatterino
41 changes: 41 additions & 0 deletions src/util/OnceFlag.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <chrono>
#include <condition_variable>
#include <mutex>

namespace chatterino {

/// @brief A flag that can only be set once which notifies waiters.
///
/// This can be used to synchronize with other threads. Note that waiting
/// threads will be suspended.
class OnceFlag
{
public:
OnceFlag();
~OnceFlag();

/// Set this flag and notify waiters
void set();

/// @brief Wait for at most `ms` until this flag is set.
///
/// The calling thread will be suspended during the wait.
///
/// @param ms The maximum time to wait for this flag
/// @returns `true` if this flag was set during the wait or before
bool waitFor(std::chrono::milliseconds ms);

/// @brief Wait until this flag is set by another thread
///
/// The calling thread will be suspended during the wait.
void wait();

private:
std::mutex mutex;
std::condition_variable condvar;
bool flag = false;
};

} // namespace chatterino
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(test_SOURCES
${CMAKE_CURRENT_LIST_DIR}/src/Plugins.cpp
${CMAKE_CURRENT_LIST_DIR}/src/TwitchIrc.cpp
${CMAKE_CURRENT_LIST_DIR}/src/IgnoreController.cpp
${CMAKE_CURRENT_LIST_DIR}/src/OnceFlag.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.hpp
# Add your new file above this line!
Expand Down
87 changes: 87 additions & 0 deletions tests/src/OnceFlag.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#include "util/OnceFlag.hpp"

#include "Test.hpp"

#include <thread>

using namespace chatterino;

TEST(OnceFlag, basic)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{50});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();
stoppedFlag.wait();

t.join();
}

TEST(OnceFlag, waitFor)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();

std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();

auto start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{200}));
auto stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{150});

start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{1000}));
stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{10});

start = std::chrono::system_clock::now();
stoppedFlag.wait();
stop = std::chrono::system_clock::now();

ASSERT_LT(stop - start, std::chrono::milliseconds{10});

t.join();
}

TEST(OnceFlag, waitForTimeout)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;

std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});

startedFlag.wait();
startedAckFlag.set();

ASSERT_FALSE(stoppedFlag.waitFor(std::chrono::milliseconds{25}));
stoppedFlag.wait();

t.join();
}

0 comments on commit 8b36069

Please sign in to comment.