Skip to content

Commit

Permalink
Add coroutine option and rename some file. (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
starrysky9959 committed Sep 26, 2023
1 parent eacb6e1 commit a173e96
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 162 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"name": "(gdb) Launch monograph",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/debug/mongo/mongod",
"program": "${workspaceFolder}/build/Debug/mongo/mongod",
"args": [
"--config",
"src/mongo/db/modules/monograph/test_folder/config/monograph/debug_coroutine.conf"
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/db/server_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ struct ServerGlobalParams {
std::string serviceExecutor;

size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
bool enableCoroutine{false};
size_t reservedThreadNum = 8;

int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket

std::string keyFile; // Path to keyfile, or empty if none.
Expand Down
11 changes: 11 additions & 0 deletions src/mongo/db/server_options_server_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ Status addGeneralServerOptions(moe::OptionSection* options) {
.setDefault(moe::Value("synchronous"));

// register options in config file
options->addOptionChaining(
"net.enableCoroutine", "enableCoroutine", moe::Bool, "whether to enable coroutine");
options->addOptionChaining("net.reservedThreadNum",
"reservedThreadNum",
moe::Unsigned,
Expand Down Expand Up @@ -557,6 +559,15 @@ Status storeServerOptions(const moe::Environment& params) {
serverGlobalParams.serviceExecutor = "synchronous";
}

if (params.count("net.enableCoroutine")) {
serverGlobalParams.enableCoroutine = params["net.enableCoroutine"].as<bool>();
if (serverGlobalParams.enableCoroutine &&
serverGlobalParams.serviceExecutor != "adaptive") {
return Status(ErrorCodes::BadValue,
"Coroutine mode can only work with adaptive ServiceExecutor");
}
}

if (params.count("net.reservedThreadNum")) {
serverGlobalParams.reservedThreadNum = params["net.reservedThreadNum"].as<unsigned>();
if (serverGlobalParams.reservedThreadNum < 1) {
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/transport/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ tlEnv.Library(
target='service_executor',
source=[
'service_executor_adaptive.cpp',
'service_executor_reserved.cpp',
'service_executor_coroutine.cpp',
'service_executor_synchronous.cpp',
'thread_idle_callback.cpp',
],
Expand Down
28 changes: 16 additions & 12 deletions src/mongo/transport/service_entry_point_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s

_maxNumConnections = supportedMax;

if (serverGlobalParams.reservedThreadNum) {
_adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
if (serverGlobalParams.enableCoroutine && serverGlobalParams.reservedThreadNum) {
_coroutineExecutor = std::make_unique<transport::ServiceExecutorCoroutine>(
_svcCtx, serverGlobalParams.reservedThreadNum);
}
}

Status ServiceEntryPointImpl::start() {
if (_adminInternalPool)
return _adminInternalPool->start();
else
if (_coroutineExecutor) {
return _coroutineExecutor->start();
} else
return Status::OK();
}

Expand Down Expand Up @@ -111,23 +111,27 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
}
}

// work balance
size_t targetThreadGroupId = connectionCount % serverGlobalParams.reservedThreadNum;
ssm->setThreadGroupId(targetThreadGroupId);
MONGO_LOG(0) << "Current ssm is assigned to thread group " << targetThreadGroupId;
if (serverGlobalParams.enableCoroutine) {
MONGO_LOG(0) << "use coroutine service executor";
ssm->setServiceExecutor(_coroutineExecutor.get());

// work balance
size_t targetThreadGroupId = connectionCount % serverGlobalParams.reservedThreadNum;
ssm->setThreadGroupId(targetThreadGroupId);
MONGO_LOG(0) << "Current ssm is assigned to thread group " << targetThreadGroupId;
}

// Checking if we successfully added a connection above. Separated from the lock so we don't log
// while holding it.
if (connectionCount > _maxNumConnections) {
if (!quiet) {
log() << "connection refused because too many open connections: " << connectionCount;
// log() << "connection refused because too many open connections: " << connectionCount;
log() << "too many open connections: " << connectionCount;
}

// return;
}

ssm->setServiceExecutor(_adminInternalPool.get());
MONGO_LOG(0) << "use reserved service executor";

if (!quiet) {
const auto word = (connectionCount == 1 ? " connection"_sd : " connections"_sd);
Expand Down
4 changes: 2 additions & 2 deletions src/mongo/transport/service_entry_point_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_coroutine.h"
#include "mongo/transport/service_state_machine.h"

namespace mongo {
Expand Down Expand Up @@ -86,7 +86,7 @@ class ServiceEntryPointImpl : public ServiceEntryPoint {
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};

std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool;
std::unique_ptr<transport::ServiceExecutorCoroutine> _coroutineExecutor;
};

} // namespace mongo
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor;

#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_coroutine.h"
#include "mongo/db/server_parameters.h"
#include "mongo/platform/basic.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/transport/service_executor_task_names.h"
#include "mongo/transport/thread_idle_callback.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"

namespace mongo {
namespace transport {
Expand Down Expand Up @@ -84,15 +81,15 @@ void ThreadGroup::Terminate() {
}


thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
thread_local int ServiceExecutorReserved::_localRecursionDepth = 0;
thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0;
thread_local std::deque<ServiceExecutor::Task> ServiceExecutorCoroutine::_localWorkQueue = {};
thread_local int ServiceExecutorCoroutine::_localRecursionDepth = 0;
thread_local int64_t ServiceExecutorCoroutine::_localThreadIdleCounter = 0;

ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx, size_t reservedThreads)
ServiceExecutorCoroutine::ServiceExecutorCoroutine(ServiceContext* ctx, size_t reservedThreads)
: _name{"coroutine"}, _reservedThreads(reservedThreads), _threadGroups(reservedThreads) {}

Status ServiceExecutorReserved::start() {
MONGO_LOG(0) << "ServiceExecutorReserved::start";
Status ServiceExecutorCoroutine::start() {
MONGO_LOG(0) << "ServiceExecutorCoroutine::start";
{
stdx::unique_lock<stdx::mutex> lk(_mutex);
_stillRunning.store(true, std::memory_order_relaxed);
Expand All @@ -109,7 +106,7 @@ Status ServiceExecutorReserved::start() {
return Status::OK();
}

Status ServiceExecutorReserved::_startWorker(uint16_t groupId) {
Status ServiceExecutorCoroutine::_startWorker(uint16_t groupId) {
MONGO_LOG(0) << "Starting new worker thread for " << _name << " service executor. "
<< " group id: " << groupId;
return launchServiceWorkerThread([this, threadGroupId = groupId] {
Expand All @@ -123,12 +120,9 @@ Status ServiceExecutorReserved::_startWorker(uint16_t groupId) {
_shutdownCondition.notify_one();
});
lk.unlock();
// _numStartingThreads--;
// _numReadyThreads++;

while (_stillRunning.load()) {


if (!_stillRunning.load(std::memory_order_relaxed)) {
break;
}
Expand Down Expand Up @@ -168,47 +162,22 @@ Status ServiceExecutorReserved::_startWorker(uint16_t groupId) {
continue;
}

// _numReadyThreads -= 1;

// bool launchReplacement = false;
// if (_numReadyThreads + _numStartingThreads < _reservedThreads) {
// _numStartingThreads++;
// launchReplacement = true;
// }

// lk.unlock();

// if (launchReplacement) {
// auto threadStartStatus = _startWorker(threadGroupId);
// if (!threadStartStatus.isOK()) {
// warning() << "Could not start new reserve worker thread: " <<
// threadStartStatus;
// }
// }

while (!_localWorkQueue.empty() && _stillRunning.load(std::memory_order_relaxed)) {
_localRecursionDepth = 1;
MONGO_LOG(1) << "thread " << threadGroupId << " do task";
_localWorkQueue.front()();
MONGO_LOG(1) << "thread " << threadGroupId << " do task done";
_localWorkQueue.pop_front();
}

// lk.lock();
// if (_numReadyThreads + 1 > _reservedThreads) {
// break;
// } else {
// _numReadyThreads += 1;
// }
}

LOG(3) << "Exiting worker thread in " << _name << " service executor";
});
}


Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
LOG(3) << "Shutting down reserved executor";
Status ServiceExecutorCoroutine::shutdown(Milliseconds timeout) {
LOG(3) << "Shutting down coroutine executor";

stdx::unique_lock<stdx::mutex> lock(_mutex);
_stillRunning.store(false, std::memory_order_relaxed);
Expand All @@ -225,54 +194,19 @@ Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
return result
? Status::OK()
: Status(ErrorCodes::Error::ExceededTimeLimit,
"reserved executor couldn't shutdown all worker threads within time limit.");
"coroutine executor couldn't shutdown all worker threads within time limit.");
}

Status ServiceExecutorReserved::schedule(Task task,
ScheduleFlags flags,
ServiceExecutorTaskName taskName) {
Status ServiceExecutorCoroutine::schedule(Task task,
ScheduleFlags flags,
ServiceExecutorTaskName taskName) {
return schedule(task, flags, taskName, 0);
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
}

if (!_localWorkQueue.empty()) {
/*
* In perf testing we found that yielding after running a each request produced
* at 5% performance boost in microbenchmarks if the number of worker threads
* was greater than the number of available cores.
*/
if (flags & ScheduleFlags::kMayYieldBeforeSchedule) {
if ((_localThreadIdleCounter++ & 0xf) == 0) {
markThreadIdle();
}
}

// Execute task directly (recurse) if allowed by the caller as it produced better
// performance in testing. Try to limit the amount of recursion so we don't blow up the
// stack, even though this shouldn't happen with this executor that uses blocking network
// I/O.
if ((flags & ScheduleFlags::kMayRecurse) &&
(_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
++_localRecursionDepth;
task();
} else {
_localWorkQueue.emplace_back(std::move(task));
}
return Status::OK();
}

stdx::lock_guard<stdx::mutex> lk(_mutex);
_readyTasks.push_back(std::move(task));
_threadWakeup.notify_one();

return Status::OK();
}

Status ServiceExecutorReserved::schedule(Task task,
ScheduleFlags flags,
ServiceExecutorTaskName taskName,
uint16_t thd_group_id) {
Status ServiceExecutorCoroutine::schedule(Task task,
ScheduleFlags flags,
ServiceExecutorTaskName taskName,
uint16_t thd_group_id) {
MONGO_LOG(1) << "schedule with group id: " << thd_group_id;
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
Expand Down Expand Up @@ -309,15 +243,15 @@ Status ServiceExecutorReserved::schedule(Task task,
return Status::OK();
}

std::function<void()> ServiceExecutorReserved::CoroutineResumeFunctor(uint16_t thd_group_id,
Task task) {
std::function<void()> ServiceExecutorCoroutine::CoroutineResumeFunctor(uint16_t thd_group_id,
Task task) {
assert(thd_group_id < _threadGroups.size());
return [thd_group = &_threadGroups[thd_group_id], tsk = std::move(task)]() {
thd_group->ResumeTask(std::move(tsk));
};
}

void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
void ServiceExecutorCoroutine::appendStats(BSONObjBuilder* bob) const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
*bob << kExecutorLabel << kExecutorName << kThreadsRunning
<< static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ThreadGroup {
std::condition_variable _sleep_cv;
std::atomic<bool> _is_terminated{false};

friend class ServiceExecutorReserved;
friend class ServiceExecutorCoroutine;
};

/**
Expand All @@ -63,9 +63,9 @@ class ThreadGroup {
* accept work. When threads exit, they will go back to waiting for work if there are fewer
* than reservedThreads available.
*/
class ServiceExecutorReserved : public ServiceExecutor {
class ServiceExecutorCoroutine : public ServiceExecutor {
public:
explicit ServiceExecutorReserved(ServiceContext* ctx, size_t reservedThreads = 1);
explicit ServiceExecutorCoroutine(ServiceContext* ctx, size_t reservedThreads = 1);

Status start() override;

Expand Down Expand Up @@ -98,11 +98,7 @@ class ServiceExecutorReserved : public ServiceExecutor {
stdx::condition_variable _threadWakeup;
stdx::condition_variable _shutdownCondition;

std::deque<Task> _readyTasks;

AtomicUInt32 _numRunningWorkerThreads{0};
// size_t _numReadyThreads{0};
// size_t _numStartingThreads{0};

const std::string _name;
const size_t _reservedThreads;
Expand Down
Loading

0 comments on commit a173e96

Please sign in to comment.