diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 15f29beeac46..040d35e3d3a9 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -4,6 +4,7 @@ #include "httpserver.h" +#include "init.h" #include "chainparamsbase.h" #include "compat.h" #include "util.h" @@ -20,7 +21,6 @@ #include #include #include -#include #include #include @@ -37,6 +37,10 @@ #endif #endif +#include +#include +#include + /** Maximum size of http request (request line + headers) */ static const size_t MAX_HEADERS_SIZE = 8192; @@ -73,34 +77,13 @@ class WorkQueue std::deque> queue; bool running; size_t maxDepth; - int numThreads; - - /** RAII object to keep track of number of running worker threads */ - class ThreadCounter - { - public: - WorkQueue &wq; - ThreadCounter(WorkQueue &w): wq(w) - { - std::lock_guard lock(wq.cs); - wq.numThreads += 1; - } - ~ThreadCounter() - { - std::lock_guard lock(wq.cs); - wq.numThreads -= 1; - wq.cond.notify_all(); - } - }; public: WorkQueue(size_t _maxDepth) : running(true), - maxDepth(_maxDepth), - numThreads(0) + maxDepth(_maxDepth) { } - /** Precondition: worker threads have all stopped - * (call WaitExit) + /** Precondition: worker threads have all stopped (they have been joined). */ ~WorkQueue() { @@ -119,7 +102,6 @@ class WorkQueue /** Thread function */ void Run() { - ThreadCounter count(*this); while (true) { std::unique_ptr i; { @@ -141,14 +123,6 @@ class WorkQueue running = false; cond.notify_all(); } - /** Wait for worker threads to exit */ - void WaitExit() - { - std::unique_lock lock(cs); - while (numThreads > 0){ - cond.wait(lock); - } - } }; struct HTTPPathHandler @@ -450,20 +424,17 @@ bool UpdateHTTPServerLogging(bool enable) { } std::thread threadHTTP; -std::future threadResult; +static std::vector g_thread_http_workers; bool StartHTTPServer() { LogPrint(BCLog::HTTP, "Starting HTTP server\n"); int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); - std::packaged_task task(ThreadHTTP); - threadResult = task.get_future(); - threadHTTP = std::thread(std::move(task), eventBase, eventHTTP); + threadHTTP = std::thread(ThreadHTTP, eventBase, eventHTTP); for (int i = 0; i < rpcThreads; i++) { - std::thread rpc_worker(HTTPWorkQueueRun, workQueue); - rpc_worker.detach(); + g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue); } return true; } @@ -472,10 +443,6 @@ void InterruptHTTPServer() { LogPrint(BCLog::HTTP, "Interrupting HTTP server\n"); if (eventHTTP) { - // Unlisten sockets - for (evhttp_bound_socket *socket : boundSockets) { - evhttp_del_accept_socket(eventHTTP, socket); - } // Reject requests on current connections evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); } @@ -488,27 +455,21 @@ void StopHTTPServer() LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); if (workQueue) { LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); -#ifndef WIN32 - // ToDo: Disabling WaitExit() for Windows platforms is an ugly workaround for the wallet not - // closing during a repair-restart. It doesn't hurt, though, because threadHTTP.timed_join - // below takes care of this and sends a loopbreak. - workQueue->WaitExit(); -#endif + for (auto& thread: g_thread_http_workers) { + thread.join(); + } + g_thread_http_workers.clear(); delete workQueue; workQueue = nullptr; } + // Unlisten sockets, these are what make the event loop running, which means + // that after this and all connections are closed the event loop will quit. + for (evhttp_bound_socket *socket : boundSockets) { + evhttp_del_accept_socket(eventHTTP, socket); + } + boundSockets.clear(); if (eventBase) { LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n"); - // Give event loop a few seconds to exit (to send back last RPC responses), then break it - // Before this was solved with event_base_loopexit, but that didn't work as expected in - // at least libevent 2.0.21 and always introduced a delay. In libevent - // master that appears to be solved, so in the future that solution - // could be used again (if desirable). - // (see discussion in https://github.com/bitcoin/bitcoin/pull/6990) - if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) { - LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n"); - event_base_loopbreak(eventBase); - } threadHTTP.join(); } if (eventHTTP) { @@ -613,6 +574,9 @@ void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value) void HTTPRequest::WriteReply(int nStatus, const std::string& strReply) { assert(!replySent && req); + if (ShutdownRequested()) { + WriteHeader("Connection", "close"); + } // Send event to main http thread to send reply message struct evbuffer* evb = evhttp_request_get_output_buffer(req); assert(evb); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 94caa6b6b0dc..38d2439943bc 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -182,6 +182,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "echojson", 7, "arg7" }, { "echojson", 8, "arg8" }, { "echojson", 9, "arg9" }, + { "stop", 0, "wait" }, }; class CRPCConvertTable diff --git a/src/rpc/server.cpp b/src/rpc/server.cpp index 7ca85307a9c9..d72d8d87d54a 100644 --- a/src/rpc/server.cpp +++ b/src/rpc/server.cpp @@ -293,6 +293,9 @@ UniValue help(const JSONRPCRequest& jsonRequest) UniValue stop(const JSONRPCRequest& jsonRequest) { // Accept the deprecated and ignored 'detach' boolean argument + // Also accept the hidden 'wait' integer argument (milliseconds) + // For instance, 'stop 1000' makes the call wait 1 second before returning + // to the client (intended for testing) if (jsonRequest.fHelp || jsonRequest.params.size() > 1) throw std::runtime_error( "stop\n" @@ -300,6 +303,9 @@ UniValue stop(const JSONRPCRequest& jsonRequest) // Event loop will exit after current HTTP requests have been handled, so // this reply will get back to the client. StartShutdown(); + if (jsonRequest.params[0].isNum()) { + MilliSleep(jsonRequest.params[0].get_int()); + } return "Dash Core server stopping"; } @@ -327,7 +333,7 @@ static const CRPCCommand vRPCCommands[] = // --------------------- ------------------------ ----------------------- ------ ---------- /* Overall control/query calls */ { "control", "help", &help, true, {"command"} }, - { "control", "stop", &stop, true, {} }, + { "control", "stop", &stop, true, {"wait"} }, { "control", "uptime", &uptime, true, {} }, }; diff --git a/test/functional/feature_shutdown.py b/test/functional/feature_shutdown.py new file mode 100755 index 000000000000..b633fabb1fac --- /dev/null +++ b/test/functional/feature_shutdown.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# Copyright (c) 2018 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test bitcoind shutdown.""" + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal, get_rpc_proxy +from threading import Thread + +def test_long_call(node): + block = node.waitfornewblock() + assert_equal(block['height'], 0) + +class ShutdownTest(BitcoinTestFramework): + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + + def run_test(self): + node = get_rpc_proxy(self.nodes[0].url, 1, timeout=600, coveragedir=self.nodes[0].coverage_dir) + Thread(target=test_long_call, args=(node,)).start() + # wait 1 second to ensure event loop waits for current connections to close + self.stop_node(0, wait=1000) + +if __name__ == '__main__': + ShutdownTest().main() diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index ecd0822b881e..3d592aa0ce04 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -280,16 +280,16 @@ def start_nodes(self, extra_args=None, stderr=None): for node in self.nodes: coverage.write_all_rpc_commands(self.options.coveragedir, node.rpc) - def stop_node(self, i): + def stop_node(self, i, wait=0): """Stop a dashd test node""" - self.nodes[i].stop_node() + self.nodes[i].stop_node(wait=wait) self.nodes[i].wait_until_stopped() - def stop_nodes(self): + def stop_nodes(self, wait=0): """Stop multiple dashd test nodes""" for node in self.nodes: # Issue RPC to stop nodes - node.stop_node() + node.stop_node(wait=wait) for node in self.nodes: # Wait for nodes to stop diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index bf8cab88dd35..97d55a63f76b 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -119,13 +119,13 @@ def get_wallet_rpc(self, wallet_name): wallet_path = "wallet/%s" % wallet_name return self.rpc / wallet_path - def stop_node(self): + def stop_node(self, wait=0): """Stop the node.""" if not self.running: return self.log.debug("Stopping node") try: - self.stop() + self.stop(wait=wait) except http.client.CannotSendRequest: self.log.exception("Unable to stop node.") del self.p2ps[:] diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 441d7d1d7dff..4efd057935dc 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -137,6 +137,7 @@ 'resendwallettransactions.py', 'minchainwork.py', 'p2p-acceptblock.py', # NOTE: needs dash_hash to pass + 'feature_shutdown.py', ] EXTENDED_SCRIPTS = [