diff --git a/.gitignore b/.gitignore index a21df2e8e0ff..65b239a68ff8 100644 --- a/.gitignore +++ b/.gitignore @@ -243,3 +243,5 @@ website/package-lock.json # ccls cache /.ccls-cache + +/compile_commands.json diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index 6cc9759aba17..c44868ddd32d 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -1029,25 +1029,56 @@ class Client : public Poco::Util::Application InterruptListener interrupt_listener; bool cancelled = false; + // TODO: get the poll_interval from commandline. + const auto receive_timeout = connection->getTimeouts().receive_timeout; + constexpr size_t default_poll_interval = 1000000; /// in microseconds + constexpr size_t min_poll_interval = 5000; /// in microseconds + const size_t poll_interval + = std::max(min_poll_interval, std::min(receive_timeout.totalMicroseconds(), default_poll_interval)); + while (true) { - /// Has the Ctrl+C been pressed and thus the query should be cancelled? - /// If this is the case, inform the server about it and receive the remaining packets - /// to avoid losing sync. - if (!cancelled) + Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE); + + while (true) { - if (interrupt_listener.check()) + /// Has the Ctrl+C been pressed and thus the query should be cancelled? + /// If this is the case, inform the server about it and receive the remaining packets + /// to avoid losing sync. + if (!cancelled) { - connection->sendCancel(); - cancelled = true; - if (is_interactive) - std::cout << "Cancelling query." << std::endl; + auto cancelQuery = [&] { + connection->sendCancel(); + cancelled = true; + if (is_interactive) + std::cout << "Cancelling query." << std::endl; + + /// Pressing Ctrl+C twice results in shut down. + interrupt_listener.unblock(); + }; - /// Pressing Ctrl+C twice results in shut down. - interrupt_listener.unblock(); + if (interrupt_listener.check()) + { + cancelQuery(); + } + else + { + double elapsed = receive_watch.elapsedSeconds(); + if (elapsed > receive_timeout.totalSeconds()) + { + std::cout << "Timeout exceeded while receiving data from server." + << " Waited for " << static_cast(elapsed) << " seconds," + << " timeout is " << receive_timeout.totalSeconds() << " seconds." << std::endl; + + cancelQuery(); + } + } } - else if (!connection->poll(1000000)) - continue; /// If there is no new data, continue checking whether the query was cancelled after a timeout. + + /// Poll for changes after a cancellation check, otherwise it never reached + /// because of progress updates from server. + if (connection->poll(poll_interval)) + break; } if (!receiveAndProcessPacket()) @@ -1303,7 +1334,11 @@ class Client : public Poco::Util::Application void onProgress(const Progress & value) { - progress.incrementPiecewiseAtomically(value); + if (!progress.incrementPiecewiseAtomically(value)) + { + // Just a keep-alive update. + return; + } if (block_out_stream) block_out_stream->onProgress(value); writeProgress(); @@ -1648,9 +1683,12 @@ class Client : public Poco::Util::Application } /// Extract settings from the options. -#define EXTRACT_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) \ - if (options.count(#NAME)) \ - context.setSetting(#NAME, options[#NAME].as()); +#define EXTRACT_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) \ + if (options.count(#NAME)) \ + { \ + context.setSetting(#NAME, options[#NAME].as()); \ + config().setString(#NAME, options[#NAME].as()); \ + } APPLY_FOR_SETTINGS(EXTRACT_SETTING) #undef EXTRACT_SETTING diff --git a/dbms/programs/client/ConnectionParameters.h b/dbms/programs/client/ConnectionParameters.h index 67fd7b030fff..d1c055129eb5 100644 --- a/dbms/programs/client/ConnectionParameters.h +++ b/dbms/programs/client/ConnectionParameters.h @@ -81,8 +81,8 @@ struct ConnectionParameters timeouts = ConnectionTimeouts( Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0), - Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0), Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0), + Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0), Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0)); } }; diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index 62e32c5df6dd..a0fe83b0501c 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -302,10 +302,10 @@ void TCPHandler::runImpl() void TCPHandler::readData(const Settings & global_settings) { - auto receive_timeout = query_context.getSettingsRef().receive_timeout.value; + const auto receive_timeout = query_context.getSettingsRef().receive_timeout.value; /// Poll interval should not be greater than receive_timeout - size_t default_poll_interval = global_settings.poll_interval.value * 1000000; + const size_t default_poll_interval = global_settings.poll_interval.value * 1000000; size_t current_poll_interval = static_cast(receive_timeout.totalMicroseconds()); constexpr size_t min_poll_interval = 5000; // 5 ms size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval)); @@ -409,7 +409,7 @@ void TCPHandler::processOrdinaryQuery() } else { - if (state.progress.rows && after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay) + if (after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay) { /// Some time passed and there is a progress. after_send_progress.restart(); diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index 5185cafe86b0..f0ca53690d8e 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -121,6 +121,12 @@ class Connection : private boost::noncopyable UInt16 getPort() const; const String & getDefaultDatabase() const; + /// For proper polling. + inline const auto & getTimeouts() const + { + return timeouts; + } + /// If last flag is true, you need to call sendExternalTablesData after. void sendQuery( const String & query, diff --git a/dbms/src/IO/Progress.h b/dbms/src/IO/Progress.h index 7dca03f03c25..54c1d230455e 100644 --- a/dbms/src/IO/Progress.h +++ b/dbms/src/IO/Progress.h @@ -53,11 +53,16 @@ struct Progress void writeJSON(WriteBuffer & out) const; /// Each value separately is changed atomically (but not whole object). - void incrementPiecewiseAtomically(const Progress & rhs) + bool incrementPiecewiseAtomically(const Progress & rhs) { + if (!rhs.rows) + return false; + rows += rhs.rows; bytes += rhs.bytes; total_rows += rhs.total_rows; + + return true; } void reset()