diff --git a/be/src/env/env.h b/be/src/env/env.h index 5ad360b815eb9d..eb5471c4421c73 100644 --- a/be/src/env/env.h +++ b/be/src/env/env.h @@ -350,4 +350,4 @@ class RandomRWFile { virtual const std::string& filename() const = 0; }; -} +} // namespace doris diff --git a/be/src/env/env_util.cpp b/be/src/env/env_util.cpp index 07bf874ae0a02a..b383439b764cf8 100644 --- a/be/src/env/env_util.cpp +++ b/be/src/env/env_util.cpp @@ -18,6 +18,7 @@ #include "env/env_util.h" #include "env/env.h" +#include "util/faststring.h" using std::shared_ptr; using std::string; @@ -30,21 +31,73 @@ Status open_file_for_write(Env* env, const string& path, shared_ptr *file) { +Status open_file_for_write(const WritableFileOptions& opts, Env* env, const string& path, + shared_ptr* file) { unique_ptr w; RETURN_IF_ERROR(env->new_writable_file(opts, path, &w)); file->reset(w.release()); return Status::OK(); } -Status open_file_for_random(Env *env, const string &path, shared_ptr *file) { +Status open_file_for_random(Env* env, const string& path, shared_ptr* file) { unique_ptr r; RETURN_IF_ERROR(env->new_random_access_file(path, &r)); file->reset(r.release()); return Status::OK(); } +static Status do_write_string_to_file(Env* env, const Slice& data, const std::string& fname, + bool should_sync) { + unique_ptr file; + Status s = env->new_writable_file(fname, &file); + if (!s.ok()) { + return s; + } + s = file->append(data); + if (s.ok() && should_sync) { + s = file->sync(); + } + if (s.ok()) { + s = file->close(); + } + file.reset(); // Will auto-close if we did not close above + if (!s.ok()) { + RETURN_NOT_OK_STATUS_WITH_WARN(env->delete_file(fname), + "Failed to delete partially-written file " + fname); + } + return s; +} + +Status write_string_to_file(Env* env, const Slice& data, const std::string& fname) { + return do_write_string_to_file(env, data, fname, false); +} + +Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname) { + return do_write_string_to_file(env, data, fname, true); +} + +Status read_file_to_string(Env* env, const std::string& fname, faststring* data) { + data->clear(); + unique_ptr file; + Status s = env->new_sequential_file(fname, &file); + if (!s.ok()) { + return s; + } + static const int kBufferSize = 8192; + unique_ptr scratch(new uint8_t[kBufferSize]); + while (true) { + Slice fragment(scratch.get(), kBufferSize); + s = file->read(&fragment); + if (!s.ok()) { + break; + } + data->append(fragment.get_data(), fragment.get_size()); + if (fragment.empty()) { + break; + } + } + return s; +} + } // namespace env_util } // namespace doris diff --git a/be/src/env/env_util.h b/be/src/env/env_util.h index aa2af492a33efd..f8ca6bac37bab4 100644 --- a/be/src/env/env_util.h +++ b/be/src/env/env_util.h @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "env.h" namespace doris { @@ -30,14 +31,22 @@ struct WritableFileOptions; namespace env_util { -Status open_file_for_write(Env *env, const std::string& path, std::shared_ptr *file); +Status open_file_for_write(Env* env, const std::string& path, std::shared_ptr* file); -Status open_file_for_write(const WritableFileOptions& opts, Env *env, - const std::string& path, std::shared_ptr *file); +Status open_file_for_write(const WritableFileOptions& opts, Env* env, const std::string& path, + std::shared_ptr* file); -Status open_file_for_random(Env *env, const std::string& path, - std::shared_ptr *file); +Status open_file_for_random(Env* env, const std::string& path, + std::shared_ptr* file); + +// A utility routine: write "data" to the named file. +extern Status write_string_to_file(Env* env, const Slice& data, const std::string& fname); +// Like above but also fsyncs the new file. + +extern Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname); + +// A utility routine: read contents of named file into *data +extern Status read_file_to_string(Env* env, const std::string& fname, faststring* data); } // namespace env_util } // namespace doris - diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 47fb7c9df3fe79..7edf204a2de4cb 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -245,16 +245,23 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { } else { decimal_str = ((orc::Decimal128VectorBatch*) cvb)->values[_current_line_of_group].toString(); } + + int negative = decimal_str[0] == '-' ? 1 : 0; + int decimal_scale_length = decimal_str.size() - negative; std::string v; - if (decimal_str.size() <= scale) { + if (decimal_scale_length <= scale) { // decimal(5,2) : the integer of 0.01 is 1, so we should fill 0 befor integer - v = "0."; - int fill_zero = scale - decimal_str.size(); + v = std::string(negative ? "-0." : "0."); + int fill_zero = scale - decimal_scale_length; while (fill_zero--) { v += "0"; } - v += decimal_str; + if (negative) { + v += decimal_str.substr(1, decimal_str.length()); + } else { + v += decimal_str; + } } else { //Orc api will fill in 0 at the end, so size must greater than scale v = decimal_str.substr(0, decimal_str.size() - scale) + "." + decimal_str.substr(decimal_str.size() - scale); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 387de4b45fcf36..1f6187eb01ef3c 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -18,20 +18,17 @@ #include "http/default_path_handlers.h" #include -#include #include #include -#include #include #include "common/configbase.h" -#include "common/logging.h" #include "http/web_page_handler.h" #include "runtime/mem_tracker.h" #include "util/debug_util.h" -#include "util/logging.h" #include "util/pretty_printer.h" +#include "util/thread.h" namespace doris { @@ -104,10 +101,12 @@ void mem_usage_handler(MemTracker* mem_tracker, const WebPageHandler::ArgumentMa } void add_default_path_handlers(WebPageHandler* web_page_handler, MemTracker* process_mem_tracker) { - web_page_handler->register_page("/logs", "Logs", logs_handler, true /* is_on_nav_bar */); + // TODO(yingchun): logs_handler is not implemented yet, so not show it on navigate bar + web_page_handler->register_page("/logs", "Logs", logs_handler, false /* is_on_nav_bar */); web_page_handler->register_page("/varz", "Configs", config_handler, true /* is_on_nav_bar */); web_page_handler->register_page("/memz", "Memory", boost::bind(&mem_usage_handler, process_mem_tracker, _1, _2), true /* is_on_nav_bar */); + start_thread_instrumentation(web_page_handler); } } // namespace doris diff --git a/be/src/http/web_page_handler.cpp b/be/src/http/web_page_handler.cpp index abec965647a2c5..788b028e8e4156 100644 --- a/be/src/http/web_page_handler.cpp +++ b/be/src/http/web_page_handler.cpp @@ -36,6 +36,7 @@ #include "util/debug_util.h" #include "util/disk_info.h" #include "util/mem_info.h" +#include "util/mustache/mustache.h" using strings::Substitute; @@ -44,18 +45,32 @@ namespace doris { static std::string s_html_content_type = "text/html"; WebPageHandler::WebPageHandler(EvHttpServer* server) : _http_server(server) { + _www_path = std::string(getenv("DORIS_HOME")) + "/www/"; + // Make WebPageHandler to be static file handler, static files, e.g. css, png, will be handled by WebPageHandler. _http_server->register_static_file_handler(this); - PageHandlerCallback root_callback = + TemplatePageHandlerCallback root_callback = boost::bind(boost::mem_fn(&WebPageHandler::root_handler), this, _1, _2); - register_page("/", "Home", root_callback, false /* is_on_nav_bar */); + register_template_page("/", "Home", root_callback, false /* is_on_nav_bar */); } WebPageHandler::~WebPageHandler() { STLDeleteValues(&_page_map); } +void WebPageHandler::register_template_page(const std::string& path, const string& alias, + const TemplatePageHandlerCallback& callback, bool is_on_nav_bar) { + // Relative path which will be used to find .mustache file in _www_path + string render_path = (path == "/") ? "/home" : path; + auto wrapped_cb = [=](const ArgumentMap& args, std::stringstream* output) { + EasyJson ej; + callback(args, &ej); + render(render_path, ej, true /* is_styled */, output); + }; + register_page(path, alias, wrapped_cb, is_on_nav_bar); +} + void WebPageHandler::register_page(const std::string& path, const string& alias, const PageHandlerCallback& callback, bool is_on_nav_bar) { boost::mutex::scoped_lock lock(_map_lock); @@ -79,7 +94,7 @@ void WebPageHandler::handle(HttpRequest* req) { if (handler == nullptr) { // Try to handle static file request - do_file_response(std::string(getenv("DORIS_HOME")) + "/www/" + req->raw_path(), req); + do_file_response(_www_path + req->raw_path(), req); // Has replied in do_file_response, so we return here. return; } @@ -90,24 +105,22 @@ void WebPageHandler::handle(HttpRequest* req) { bool use_style = (params.find("raw") == params.end()); std::stringstream content; - // Append header - if (use_style) { - bootstrap_page_header(&content); - } - - // Append content handler->callback()(params, &content); - // Append footer + std::string output; if (use_style) { - bootstrap_page_footer(&content); + std::stringstream oss; + render_main_template(content.str(), &oss); + output = oss.str(); + } else { + output = content.str(); } req->add_output_header(HttpHeaders::CONTENT_TYPE, s_html_content_type.c_str()); - HttpChannel::send_reply(req, HttpStatus::OK, content.str()); + HttpChannel::send_reply(req, HttpStatus::OK, output); } -static const std::string PAGE_HEADER = R"( +static const std::string kMainTemplate = R"( @@ -122,9 +135,6 @@ static const std::string PAGE_HEADER = R"( -)"; - -static const std::string NAVIGATION_BAR_PREFIX = R"( -)"; - -static const std::string PAGE_FOOTER = R"( + {{^static_pages_available}} +
+ Static pages not available. Make sure ${DORIS_HOME}/www/ exists and contains web static files. +
+ {{/static_pages_available}} + {{{content}}} + + {{#footer_html}} +
+ {{{.}}} +
+ {{/footer_html}} )"; -void WebPageHandler::bootstrap_page_header(std::stringstream* output) { - boost::mutex::scoped_lock lock(_map_lock); - (*output) << PAGE_HEADER; - (*output) << NAVIGATION_BAR_PREFIX; - for (auto& iter : _page_map) { - (*output) << "
  • " << iter.first << "
  • "; +std::string WebPageHandler::mustache_partial_tag(const std::string& path) const { + return Substitute("{{> $0.mustache}}", path); +} + +bool WebPageHandler::static_pages_available() const { + bool is_dir = false; + return Env::Default()->is_directory(_www_path, &is_dir).ok() && is_dir; +} + +bool WebPageHandler::mustache_template_available(const std::string& path) const { + if (!static_pages_available()) { + return false; } - (*output) << NAVIGATION_BAR_SUFFIX; + return Env::Default()->path_exists(Substitute("$0/$1.mustache", _www_path, path)).ok(); } -void WebPageHandler::bootstrap_page_footer(std::stringstream* output) { - (*output) << PAGE_FOOTER; +void WebPageHandler::render_main_template(const std::string& content, std::stringstream* output) { + static const std::string& footer = std::string("
    ") + get_version_string(true) + std::string("
    "); + + EasyJson ej; + ej["static_pages_available"] = static_pages_available(); + ej["content"] = content; + ej["footer_html"] = footer; + EasyJson path_handlers = ej.Set("path_handlers", EasyJson::kArray); + for (const auto& handler : _page_map) { + if (handler.second->is_on_nav_bar()) { + EasyJson path_handler = path_handlers.PushBack(EasyJson::kObject); + path_handler["path"] = handler.first; + path_handler["alias"] = handler.second->alias(); + } + } + mustache::RenderTemplate(kMainTemplate, _www_path, ej.value(), output); } -void WebPageHandler::root_handler(const ArgumentMap& args, std::stringstream* output) { - // _path_handler_lock already held by MongooseCallback - (*output) << "

    Version

    "; - (*output) << "
    " << get_version_string(false) << "
    " << std::endl; - (*output) << "

    Hardware Info

    "; - (*output) << "
    ";
    -    (*output) << CpuInfo::debug_string();
    -    (*output) << MemInfo::debug_string();
    -    (*output) << DiskInfo::debug_string();
    -    (*output) << "
    "; - - (*output) << "

    Status Pages

    "; - for (auto& iter : _page_map) { - (*output) << "" << iter.first << "
    "; +void WebPageHandler::render(const string& path, const EasyJson& ej, bool use_style, + std::stringstream* output) { + if (mustache_template_available(path)) { + mustache::RenderTemplate(mustache_partial_tag(path), _www_path, ej.value(), output); + } else if (use_style) { + (*output) << "
    " << ej.ToString() << "
    "; + } else { + (*output) << ej.ToString(); } } +void WebPageHandler::root_handler(const ArgumentMap& args, EasyJson* output) { + (*output)["version"] = get_version_string(false); + (*output)["cpuinfo"] = CpuInfo::debug_string(); + (*output)["meminfo"] = MemInfo::debug_string(); + (*output)["diskinfo"] = DiskInfo::debug_string(); +} + } // namespace doris diff --git a/be/src/http/web_page_handler.h b/be/src/http/web_page_handler.h index 62d2f5cfcf53fd..63e505a84fd58f 100644 --- a/be/src/http/web_page_handler.h +++ b/be/src/http/web_page_handler.h @@ -27,6 +27,7 @@ #include #include "http/http_handler.h" +#include "util/easy_json.h" namespace doris { @@ -39,22 +40,46 @@ class WebPageHandler : public HttpHandler { typedef std::map ArgumentMap; typedef boost::function PageHandlerCallback; + typedef boost::function + TemplatePageHandlerCallback; WebPageHandler(EvHttpServer* http_server); virtual ~WebPageHandler(); void handle(HttpRequest *req) override; - // Register a route 'path'. + // Register a route 'path' to be rendered via template. + // The appropriate template to use is determined by 'path'. // If 'is_on_nav_bar' is true, a link to the page will be placed on the navbar // in the header of styled pages. The link text is given by 'alias'. + void register_template_page(const std::string& path, const std::string& alias, + const TemplatePageHandlerCallback& callback, bool is_on_nav_bar); + + // Register a route 'path'. See the register_template_page for details. void register_page(const std::string& path, const std::string& alias, const PageHandlerCallback& callback, bool is_on_nav_bar); private: - void bootstrap_page_header(std::stringstream* output); - void bootstrap_page_footer(std::stringstream* output); - void root_handler(const ArgumentMap& args, std::stringstream* output); + void root_handler(const ArgumentMap& args, EasyJson* output); + + // Returns a mustache tag that renders the partial at path when + // passed to mustache::RenderTemplate. + std::string mustache_partial_tag(const std::string& path) const; + + // Returns whether or not a mustache template corresponding + // to the given path can be found. + bool mustache_template_available(const std::string& path) const; + + // Renders the main HTML template with the pre-rendered string 'content' + // in the main body of the page, into 'output'. + void render_main_template(const std::string& content, std::stringstream* output); + + // Renders the template corresponding to 'path' (if available), using + // fields in 'ej'. + void render(const std::string& path, const EasyJson& ej, bool use_style, + std::stringstream* output); + + bool static_pages_available() const; // Container class for a list of path handler callbacks for a single URL. class PathHandler { @@ -85,6 +110,7 @@ class WebPageHandler : public HttpHandler { PageHandlerCallback callback_; }; + std::string _www_path; EvHttpServer* _http_server; // Lock guarding the _path_handlers map boost::mutex _map_lock; diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 5d286919778fb5..5107d7a312a233 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -70,7 +70,8 @@ set(UTIL_FILES null_load_error_hub.cpp time.cpp os_info.cpp -# coding_util.cpp + os_util.cpp + # coding_util.cpp cidr.cpp core_local.cpp uid_util.cpp @@ -96,6 +97,8 @@ set(UTIL_FILES trace.cpp trace_metrics.cpp timezone_utils.cpp + easy_json.cc + mustache/mustache.cc ) if (WITH_MYSQL) diff --git a/be/src/util/easy_json.cc b/be/src/util/easy_json.cc new file mode 100644 index 00000000000000..56876b6344b715 --- /dev/null +++ b/be/src/util/easy_json.cc @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/easy_json.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +// IWYU pragma: no_include + +using rapidjson::SizeType; +using rapidjson::Value; +using std::string; + +namespace doris { + +EasyJson::EasyJson() : alloc_(new EasyJsonAllocator), value_(&alloc_->value()) {} + +EasyJson::EasyJson(EasyJson::ComplexTypeInitializer type) + : alloc_(new EasyJsonAllocator), value_(&alloc_->value()) { + if (type == kObject) { + value_->SetObject(); + } else if (type == kArray) { + value_->SetArray(); + } +} + +EasyJson EasyJson::Get(const string& key) { + if (!value_->IsObject()) { + value_->SetObject(); + } + if (!value_->HasMember(key.c_str())) { + Value key_val(key.c_str(), alloc_->allocator()); + value_->AddMember(key_val, Value().SetNull(), alloc_->allocator()); + } + return EasyJson(&(*value_)[key.c_str()], alloc_); +} + +EasyJson EasyJson::Get(int index) { + if (!value_->IsArray()) { + value_->SetArray(); + } + while (SizeType(index) >= value_->Size()) { + value_->PushBack(Value().SetNull(), alloc_->allocator()); + } + return EasyJson(&(*value_)[index], alloc_); +} + +EasyJson EasyJson::operator[](const string& key) { + return Get(key); +} + +EasyJson EasyJson::operator[](int index) { + return Get(index); +} + +EasyJson& EasyJson::operator=(const string& val) { + value_->SetString(val.c_str(), alloc_->allocator()); + return *this; +} +template +EasyJson& EasyJson::operator=(T val) { + *value_ = val; + return *this; +} +template EasyJson& EasyJson::operator=(bool val); +template EasyJson& EasyJson::operator=(int32_t val); +template EasyJson& EasyJson::operator=(int64_t val); +template EasyJson& EasyJson::operator=(uint32_t val); +template EasyJson& EasyJson::operator=(uint64_t val); +template EasyJson& EasyJson::operator=(double val); +template<> EasyJson& EasyJson::operator=(const char* val) { + value_->SetString(val, alloc_->allocator()); + return *this; +} +template<> EasyJson& EasyJson::operator=(EasyJson::ComplexTypeInitializer val) { + if (val == kObject) { + value_->SetObject(); + } else if (val == kArray) { + value_->SetArray(); + } + return (*this); +} + +EasyJson& EasyJson::SetObject() { + if (!value_->IsObject()) { + value_->SetObject(); + } + return *this; +} + +EasyJson& EasyJson::SetArray() { + if (!value_->IsArray()) { + value_->SetArray(); + } + return *this; +} + +EasyJson EasyJson::Set(const string& key, const string& val) { + return (Get(key) = val); +} +template +EasyJson EasyJson::Set(const string& key, T val) { + return (Get(key) = val); +} +template EasyJson EasyJson::Set(const string& key, bool val); +template EasyJson EasyJson::Set(const string& key, int32_t val); +template EasyJson EasyJson::Set(const string& key, int64_t val); +template EasyJson EasyJson::Set(const string& key, uint32_t val); +template EasyJson EasyJson::Set(const string& key, uint64_t val); +template EasyJson EasyJson::Set(const string& key, double val); +template EasyJson EasyJson::Set(const string& key, const char* val); +template EasyJson EasyJson::Set(const string& key, + EasyJson::ComplexTypeInitializer val); + +EasyJson EasyJson::Set(int index, const string& val) { + return (Get(index) = val); +} +template +EasyJson EasyJson::Set(int index, T val) { + return (Get(index) = val); +} +template EasyJson EasyJson::Set(int index, bool val); +template EasyJson EasyJson::Set(int index, int32_t val); +template EasyJson EasyJson::Set(int index, int64_t val); +template EasyJson EasyJson::Set(int index, uint32_t val); +template EasyJson EasyJson::Set(int index, uint64_t val); +template EasyJson EasyJson::Set(int index, double val); +template EasyJson EasyJson::Set(int index, const char* val); +template EasyJson EasyJson::Set(int index, + EasyJson::ComplexTypeInitializer val); + +EasyJson EasyJson::PushBack(const string& val) { + if (!value_->IsArray()) { + value_->SetArray(); + } + Value push_val(val.c_str(), alloc_->allocator()); + value_->PushBack(push_val, alloc_->allocator()); + return EasyJson(&(*value_)[value_->Size() - 1], alloc_); +} +template +EasyJson EasyJson::PushBack(T val) { + if (!value_->IsArray()) { + value_->SetArray(); + } + value_->PushBack(val, alloc_->allocator()); + return EasyJson(&(*value_)[value_->Size() - 1], alloc_); +} +template EasyJson EasyJson::PushBack(bool val); +template EasyJson EasyJson::PushBack(int32_t val); +template EasyJson EasyJson::PushBack(int64_t val); +template EasyJson EasyJson::PushBack(uint32_t val); +template EasyJson EasyJson::PushBack(uint64_t val); +template EasyJson EasyJson::PushBack(double val); +template<> EasyJson EasyJson::PushBack(const char* val) { + if (!value_->IsArray()) { + value_->SetArray(); + } + Value push_val(val, alloc_->allocator()); + value_->PushBack(push_val, alloc_->allocator()); + return EasyJson(&(*value_)[value_->Size() - 1], alloc_); +} +template<> EasyJson EasyJson::PushBack(EasyJson::ComplexTypeInitializer val) { + if (!value_->IsArray()) { + value_->SetArray(); + } + Value push_val; + if (val == kObject) { + push_val.SetObject(); + } else if (val == kArray) { + push_val.SetArray(); + } else { + LOG(FATAL) << "Unknown initializer type"; + } + value_->PushBack(push_val, alloc_->allocator()); + return EasyJson(&(*value_)[value_->Size() - 1], alloc_); +} + +string EasyJson::ToString() const { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + value_->Accept(writer); + return buffer.GetString(); +} + +EasyJson::EasyJson(Value* value, scoped_refptr alloc) + : alloc_(std::move(alloc)), value_(value) {} + +} // namespace doris diff --git a/be/src/util/easy_json.h b/be/src/util/easy_json.h new file mode 100644 index 00000000000000..d2eb0702d8738d --- /dev/null +++ b/be/src/util/easy_json.h @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include + +#include + +#include "gutil/ref_counted.h" + +namespace doris { + +// A wrapper around rapidjson Value objects, to simplify usage. +// Intended solely for building json objects, not writing/parsing. +// +// Simplifies code like this: +// +// rapidjson::Document d; +// rapidjson::Value v; +// v.SetObject(); +// rapidjson::Value list; +// list.SetArray(); +// v.AddMember("list", list, d.GetAllocator()); +// v["list"].PushBack(rapidjson::Value().SetString("element"), d.GetAllocator()); +// +// To this: +// +// EasyJson ej; +// ej["list"][0] = "element"; +// +// Client code should build objects as demonstrated above, +// then call EasyJson::value() to obtain a reference to the +// built rapidjson Value. +class EasyJson { + public: + // Used for initializing EasyJson's with complex types. + // For example: + // + // EasyJson array; + // EasyJson nested = array.PushBack(EasyJson::kObject); + // nested["attr"] = "val"; + // // array = [ { "attr": "val" } ] + enum ComplexTypeInitializer { + kObject, + kArray + }; + + EasyJson(); + // Initializes the EasyJson object with the given type. + explicit EasyJson(ComplexTypeInitializer type); + ~EasyJson() = default; + + // Returns the child EasyJson associated with key. + // + // Note: this method can mutate the EasyJson object + // as follows: + // + // If this EasyJson's underlying Value is not an object + // (i.e. !this->value().IsObject()), then its Value is + // coerced to an object, overwriting the old Value. + // If the given key does not exist, a Null-valued + // EasyJson associated with key is created. + EasyJson Get(const std::string& key); + + // Returns the child EasyJson at index. + // + // Note: this method can mutate the EasyJson object + // as follows: + // + // If this EasyJson's underlying Value is not an array + // (i.e. !this->value().IsArray()), then its Value is + // coerced to an array, overwriting the old Value. + // If index >= this->value().Size(), then the underlying + // array's size is increased to index + 1 (new indices + // are filled with Null values). + EasyJson Get(int index); + + // Same as Get(key). + EasyJson operator[](const std::string& key); + // Same as Get(index). + EasyJson operator[](int index); + + // Sets the underlying Value equal to val. + // Returns a reference to the object itself. + // + // 'val' can be a bool, int32_t, int64_t, double, + // char*, string, or ComplexTypeInitializer. + EasyJson& operator=(const std::string& val); + template + EasyJson& operator=(T val); + + // Sets the underlying Value to an object. + // Returns a reference to the object itself. + // + // i.e. after calling SetObject(), + // value().IsObject() == true + EasyJson& SetObject(); + // Sets the underlying Value to an array. + // Returns a reference to the object itself. + // + // i.e. after calling SetArray(), + // value().IsArray() == true + EasyJson& SetArray(); + + // Associates val with key. + // Returns the child object. + // + // If this EasyJson's underlying Value is not an object + // (i.e. !this->value().IsObject()), then its Value is + // coerced to an object, overwriting the old Value. + // If the given key does not exist, a new child entry + // is created with the given value. + EasyJson Set(const std::string& key, const std::string& val); + template + EasyJson Set(const std::string& key, T val); + + // Stores val at index. + // Returns the child object. + // + // If this EasyJson's underlying Value is not an array + // (i.e. !this->value().IsArray()), then its Value is + // coerced to an array, overwriting the old Value. + // If index >= this->value().Size(), then the underlying + // array's size is increased to index + 1 (new indices + // are filled with Null values). + EasyJson Set(int index, const std::string& val); + template + EasyJson Set(int index, T val); + + // Appends val to the underlying array. + // Returns a reference to the new child object. + // + // If this EasyJson's underlying Value is not an array + // (i.e. !this->value().IsArray()), then its Value is + // coerced to an array, overwriting the old Value. + EasyJson PushBack(const std::string& val); + template + EasyJson PushBack(T val); + + // Returns a reference to the underlying Value. + rapidjson::Value& value() const { return *value_; } + + // Returns a string representation of the underlying json. + std::string ToString() const; + + private: + // One instance of EasyJsonAllocator is shared among a root + // EasyJson object and all of its descendants. The allocator + // owns the underlying rapidjson Value, and a rapidjson + // allocator (via a rapidjson::Document). + class EasyJsonAllocator : public RefCounted { + public: + rapidjson::Value& value() { return value_; } + rapidjson::Document::AllocatorType& allocator() { return value_.GetAllocator(); } + private: + friend class RefCounted; + ~EasyJsonAllocator() = default; + + // The underlying rapidjson::Value object (Document is + // a subclass of Value that has its own allocator). + rapidjson::Document value_; + }; + + // Used to instantiate descendant objects. + EasyJson(rapidjson::Value* value, scoped_refptr alloc); + + // One allocator is shared among an EasyJson object and + // all of its descendants. + scoped_refptr alloc_; + + // A pointer to the underlying Value in the object + // tree owned by alloc_. + rapidjson::Value* value_; +}; + +} // namespace doris diff --git a/be/src/util/mustache/mustache.cc b/be/src/util/mustache/mustache.cc new file mode 100644 index 00000000000000..e976677cf4554d --- /dev/null +++ b/be/src/util/mustache/mustache.cc @@ -0,0 +1,448 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "mustache.h" + +#include "rapidjson/stringbuffer.h" +#include +#include "rapidjson/writer.h" + +#include +#include +#include +#include + +#include + +using namespace rapidjson; +using namespace std; +using namespace boost::algorithm; + +namespace mustache { + +// TODO: +// # Handle malformed templates better +// # Better support for reading templates from files + +enum TagOperator { + SUBSTITUTION, + SECTION_START, + NEGATED_SECTION_START, + PREDICATE_SECTION_START, + SECTION_END, + PARTIAL, + COMMENT, + LENGTH, + EQUALITY, + INEQUALITY, + LITERAL, + NONE +}; + +struct OpCtx { + TagOperator op; + string tag_name; + string tag_arg; + bool escaped = false; +}; + +struct ContextStack { + const Value* value; + const ContextStack* parent; +}; + +TagOperator GetOperator(const string& tag) { + if (tag.size() == 0) return SUBSTITUTION; + switch (tag[0]) { + case '#': return SECTION_START; + case '^': return NEGATED_SECTION_START; + case '?': return PREDICATE_SECTION_START; + case '/': return SECTION_END; + case '>': return PARTIAL; + case '!': + if (tag.size() == 1 || tag[1] != '=') return COMMENT; + return INEQUALITY; + case '%': return LENGTH; + case '~': return LITERAL; + case '=': return EQUALITY; + default: return SUBSTITUTION; + } +} + +int EvaluateTag(const string& document, const string& document_root, int idx, + const ContextStack* context, const OpCtx& op_ctx, stringstream* out); + +static bool RenderTemplate(const string& document, const string& document_root, + const ContextStack* stack, stringstream* out); + +void EscapeHtml(const string& in, stringstream *out) { + for (const char& c: in) { + switch (c) { + case '&': (*out) << "&"; + break; + case '"': (*out) << """; + break; + case '\'': (*out) << "'"; + break; + case '<': (*out) << "<"; + break; + case '>': (*out) << ">"; + break; + default: (*out) << c; + break; + } + } +} + +void Dump(const rapidjson::Value& v) { + StringBuffer buffer; + Writer writer(buffer); + v.Accept(writer); + std::cout << buffer.GetString() << std::endl; +} + +// Breaks a dotted path into individual components. One wrinkle, which stops this from +// being a simple split() is that we allow path components to be quoted, e.g.: "foo".bar, +// and any '.' characters inside those quoted sections aren't considered to be +// delimiters. This is to allow Json keys that contain periods. +void FindJsonPathComponents(const string& path, vector* components) { + bool in_quote = false; + bool escape_this_char = false; + int start = 0; + for (int i = start; i < path.size(); ++i) { + if (path[i] == '"' && !escape_this_char) in_quote = !in_quote; + if (path[i] == '.' && !escape_this_char && !in_quote) { + // Current char == delimiter and not escaped and not in a quote pair => found a + // component + if (i - start > 0) { + if (path[start] == '"' && path[(i - 1) - start] == '"') { + if (i - start > 3) { + components->push_back(path.substr(start + 1, i - (start + 2))); + } + } else { + components->push_back(path.substr(start, i - start)); + } + start = i + 1; + } + } + + escape_this_char = (path[i] == '\\' && !escape_this_char); + } + + if (path.size() - start > 0) { + if (path[start] == '"' && path[(path.size() - 1) - start] == '"') { + if (path.size() - start > 3) { + components->push_back(path.substr(start + 1, path.size() - (start + 2))); + } + } else { + components->push_back(path.substr(start, path.size() - start)); + } + } +} + +// Looks up the json entity at 'path' in 'parent_context', and places it in 'resolved'. If +// the entity does not exist (i.e. the path is invalid), 'resolved' will be set to nullptr. +void ResolveJsonContext(const string& path, const ContextStack* stack, + const Value** resolved) { + if (path == ".") { + *resolved = stack->value; + return; + } + vector components; + FindJsonPathComponents(path, &components); + + // At each enclosing level of context, try to resolve the path. + for ( ; stack != nullptr; stack = stack->parent) { + const Value* cur = stack->value; + bool match = true; + for(const string& c: components) { + if (cur->IsObject() && cur->HasMember(c.c_str())) { + cur = &(*cur)[c.c_str()]; + } else { + match = false; + break; + } + } + if (match) { + *resolved = cur; + return; + } + } + *resolved = nullptr; +} + +int FindNextTag(const string& document, int idx, OpCtx* op, stringstream* out) { + op->op = NONE; + while (idx < document.size()) { + if (document[idx] == '{' && idx < (document.size() - 3) && document[idx + 1] == '{') { + if (document[idx + 2] == '{') { + idx += 3; + op->escaped = true; + } else { + op->escaped = false; + idx += 2; // Now at start of template expression + } + stringstream expr; + while (idx < document.size()) { + if (document[idx] != '}') { + expr << document[idx]; + ++idx; + } else { + if (!op->escaped && idx < document.size() - 1 && document[idx + 1] == '}') { + ++idx; + break; + } else if (op->escaped && idx < document.size() - 2 && document[idx + 1] == '}' + && document[idx + 2] == '}') { + idx += 2; + break; + } else { + expr << '}'; + } + } + } + + string key = expr.str(); + trim(key); + if (key != ".") trim_if(key, is_any_of(".")); + if (key.size() == 0) continue; + op->op = GetOperator(key); + if (op->op != SUBSTITUTION) { + int len = op->op == INEQUALITY ? 2 : 1; + key = key.substr(len); + trim(key); + } + if (key.size() == 0) continue; + + if (op->op == EQUALITY || op->op == INEQUALITY) { + // Find an argument + vector components; + split(components, key, is_any_of(" ")); + key = components[0]; + components.erase(components.begin()); + op->tag_arg = join(components, " "); + } + + op->tag_name = key; + return ++idx; + } else { + if (out != nullptr) (*out) << document[idx]; + } + ++idx; + } + return idx; +} + +// Evaluates a [PREDICATE_|NEGATED_]SECTION_START / SECTION_END pair by evaluating the tag +// in 'parent_context'. False or non-existant values cause the entire section to be +// skipped. True values cause the section to be evaluated as though it were a normal +// section, but with the parent context being the root context for that section. +// +// If 'is_negation' is true, the behaviour is the opposite of the above: false values +// cause the section to be normally evaluated etc. +int EvaluateSection(const string& document, const string& document_root, int idx, + const ContextStack* context_stack, const OpCtx& op_ctx, stringstream* out) { + // Precondition: idx is the immediate next character after an opening {{ #tag_name }} + const Value* context; + ResolveJsonContext(op_ctx.tag_name, context_stack, &context); + + // If we a) cannot resolve the context from the tag name or b) the context evaluates to + // false, we should skip the contents of the template until a closing {{/tag_name}}. + bool skip_contents = false; + + if (op_ctx.op == NEGATED_SECTION_START || op_ctx.op == PREDICATE_SECTION_START || + op_ctx.op == SECTION_START) { + skip_contents = (context == nullptr || context->IsFalse()); + + // If the tag is a negative block (i.e. {{^tag_name}}), do the opposite: if the + // context exists and is true, skip the contents, else echo them. + if (op_ctx.op == NEGATED_SECTION_START) { + context = context_stack->value; + skip_contents = !skip_contents; + } else if (op_ctx.op == PREDICATE_SECTION_START) { + context = context_stack->value; + } + } else if (op_ctx.op == INEQUALITY || op_ctx.op == EQUALITY) { + skip_contents = (context == nullptr || !context->IsString() || + strcasecmp(context->GetString(), op_ctx.tag_arg.c_str()) != 0); + if (op_ctx.op == INEQUALITY) skip_contents = !skip_contents; + context = context_stack->value; + } + + vector values; + if (!skip_contents && context != nullptr && context->IsArray()) { + for (int i = 0; i < context->Size(); ++i) { + values.push_back(&(*context)[i]); + } + } else { + values.push_back(skip_contents ? nullptr : context); + } + if (values.size() == 0) { + skip_contents = true; + values.push_back(nullptr); + } + + int start_idx = idx; + for(const Value* v: values) { + idx = start_idx; + stack section_starts; + section_starts.push(op_ctx); + while (idx < document.size()) { + OpCtx next_ctx; + idx = FindNextTag(document, idx, &next_ctx, skip_contents ? nullptr : out); + if (skip_contents && (next_ctx.op == SECTION_START || + next_ctx.op == PREDICATE_SECTION_START || + next_ctx.op == NEGATED_SECTION_START)) { + section_starts.push(next_ctx); + } else if (next_ctx.op == SECTION_END) { + if (next_ctx.tag_name != section_starts.top().tag_name) return -1; + section_starts.pop(); + } + if (section_starts.empty()) break; + + // Don't need to evaluate any templates if we're skipping the contents + if (!skip_contents) { + ContextStack new_context = { v, context_stack }; + idx = EvaluateTag(document, document_root, idx, &new_context, next_ctx, out); + } + } + } + return idx; +} + +// Evaluates a SUBSTITUTION tag, by replacing its contents with the value of the tag's +// name in 'parent_context'. +int EvaluateSubstitution(const string& document, const int idx, + const ContextStack* context_stack, const OpCtx& op_ctx, stringstream* out) { + const Value* val; + ResolveJsonContext(op_ctx.tag_name, context_stack, &val); + if (val == nullptr) return idx; + if (val->IsString()) { + if (!op_ctx.escaped) { + EscapeHtml(val->GetString(), out); + } else { + // TODO: Triple {{{ means don't escape + (*out) << val->GetString(); + } + } else if (val->IsInt64()) { + (*out) << val->GetInt64(); + } else if (val->IsInt()) { + (*out) << val->GetInt(); + } else if (val->IsDouble()) { + (*out) << val->GetDouble(); + } else if (val->IsBool()) { + (*out) << boolalpha << val->GetBool(); + } + return idx; +} + +// Evaluates a LENGTH tag by replacing its contents with the type-dependent 'size' of the +// value. +int EvaluateLength(const string& document, const int idx, const ContextStack* context_stack, + const string& tag_name, stringstream* out) { + const Value* val; + ResolveJsonContext(tag_name, context_stack, &val); + if (val == nullptr) return idx; + if (val->IsArray()) { + (*out) << val->Size(); + } else if (val->IsString()) { + (*out) << val->GetStringLength(); + }; + + return idx; +} + +int EvaluateLiteral(const string& document, const int idx, const ContextStack* context_stack, + const string& tag_name, stringstream* out) { + const Value* val; + ResolveJsonContext(tag_name, context_stack, &val); + if (val == nullptr) return idx; + if (!val->IsArray() && !val->IsObject()) return idx; + StringBuffer strbuf; + PrettyWriter writer(strbuf); + val->Accept(writer); + (*out) << strbuf.GetString(); + return idx; +} + +// Evaluates a 'partial' template by reading it fully from disk, then rendering it +// directly into the current output with the current context. +// +// TODO: This could obviously be more efficient (and there are lots of file accesses in a +// long list context). +void EvaluatePartial(const string& tag_name, const string& document_root, + const ContextStack* stack, stringstream* out) { + stringstream ss; + ss << document_root << tag_name; + ifstream tmpl(ss.str().c_str()); + if (!tmpl.is_open()) { + ss << ".mustache"; + tmpl.open(ss.str().c_str()); + if (!tmpl.is_open()) return; + } + stringstream file_ss; + file_ss << tmpl.rdbuf(); + RenderTemplate(file_ss.str(), document_root, stack, out); +} + +// Given a tag name, and its operator, evaluate the tag in the given context and write the +// output to 'out'. The heavy-lifting is delegated to specific Evaluate*() +// methods. Returns the new cursor position within 'document', or -1 on error. +int EvaluateTag(const string& document, const string& document_root, int idx, + const ContextStack* context, const OpCtx& op_ctx, stringstream* out) { + if (idx == -1) return idx; + switch (op_ctx.op) { + case SECTION_START: + case PREDICATE_SECTION_START: + case NEGATED_SECTION_START: + case EQUALITY: + case INEQUALITY: + return EvaluateSection(document, document_root, idx, context, op_ctx, out); + case SUBSTITUTION: + return EvaluateSubstitution(document, idx, context, op_ctx, out); + case COMMENT: + return idx; // Ignored + case PARTIAL: + EvaluatePartial(op_ctx.tag_name, document_root, context, out); + return idx; + case LENGTH: + return EvaluateLength(document, idx, context, op_ctx.tag_name, out); + case LITERAL: + return EvaluateLiteral(document, idx, context, op_ctx.tag_name, out); + case NONE: + return idx; // No tag was found + case SECTION_END: + return idx; + default: + cout << "Unknown tag: " << op_ctx.op << endl; + return -1; + } +} + +static bool RenderTemplate(const string& document, const string& document_root, + const ContextStack* stack, stringstream* out) { + int idx = 0; + while (idx < document.size() && idx != -1) { + OpCtx op; + idx = FindNextTag(document, idx, &op, out); + idx = EvaluateTag(document, document_root, idx, stack, op, out); + } + + return idx != -1; +} + +bool RenderTemplate(const string& document, const string& document_root, + const Value& context, stringstream* out) { + ContextStack stack = { &context, nullptr }; + return RenderTemplate(document, document_root, &stack, out); +} + +} diff --git a/be/src/util/mustache/mustache.h b/be/src/util/mustache/mustache.h new file mode 100644 index 00000000000000..9a2d9fb101f675 --- /dev/null +++ b/be/src/util/mustache/mustache.h @@ -0,0 +1,27 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rapidjson/document.h" +#include + +// Routines for rendering Mustache (http://mustache.github.io) templates with RapidJson +// (https://code.google.com/p/rapidjson/) documents. +namespace mustache { + +// Render a template contained in 'document' with respect to the json context +// 'context'. Alternately finds a tag and then evaluates it. Returns when an error is +// signalled (TODO: probably doesn't work in all paths), and evaluates that tag. Output is +// accumulated in 'out'. +bool RenderTemplate(const std::string& document, const std::string& document_root, + const rapidjson::Value& context, std::stringstream* out); + +} diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp new file mode 100644 index 00000000000000..15aed51d180c21 --- /dev/null +++ b/be/src/util/os_util.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/os_util.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "env/env_util.h" +#include "gutil/macros.h" +#include "gutil/strings/numbers.h" +#include "gutil/strings/split.h" +#include "gutil/strings/stringpiece.h" +#include "gutil/strings/substitute.h" +#include "gutil/strings/util.h" +#include "util/faststring.h" + +using std::string; +using std::vector; +using strings::Split; +using strings::Substitute; + +namespace doris { + +// Ensure that Impala compiles on earlier kernels. If the target kernel does not support +// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1. +#ifndef _SC_CLK_TCK +#define _SC_CLK_TCK 2 +#endif + +static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK); + +// Offsets into the ../stat file array of per-thread statistics. +// +// They are themselves offset by two because the pid and comm fields of the +// file are parsed separately. +static const int64_t kUserTicks = 13 - 2; +static const int64_t kKernelTicks = 14 - 2; +static const int64_t kIoWait = 41 - 2; + +// Largest offset we are interested in, to check we get a well formed stat file. +static const int64_t kMaxOffset = kIoWait; + +Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats) { + DCHECK(stats != nullptr); + + // The thread name should be the only field with parentheses. But the name + // itself may contain parentheses. + size_t open_paren = buffer.find('('); + size_t close_paren = buffer.rfind(')'); + if (open_paren == string::npos || // '(' must exist + close_paren == string::npos || // ')' must exist + open_paren >= close_paren || // '(' must come before ')' + close_paren + 2 == buffer.size()) { // there must be at least two chars after ')' + return Status::IOError("Unrecognised /proc format"); + } + string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1)); + string rest = buffer.substr(close_paren + 2); + vector splits = Split(rest, " ", strings::SkipEmpty()); + if (splits.size() < kMaxOffset) { + return Status::IOError("Unrecognised /proc format"); + } + + int64_t tmp; + if (safe_strto64(splits[kUserTicks], &tmp)) { + stats->user_ns = tmp * (1e9 / kTicksPerSec); + } + if (safe_strto64(splits[kKernelTicks], &tmp)) { + stats->kernel_ns = tmp * (1e9 / kTicksPerSec); + } + if (safe_strto64(splits[kIoWait], &tmp)) { + stats->iowait_ns = tmp * (1e9 / kTicksPerSec); + } + if (name != nullptr) { + *name = extracted_name; + } + return Status::OK(); +} + +Status get_thread_stats(int64_t tid, ThreadStats* stats) { + DCHECK(stats != nullptr); + if (kTicksPerSec <= 0) { + return Status::NotSupported("ThreadStats not supported"); + } + faststring buf; + RETURN_IF_ERROR(env_util::read_file_to_string( + Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf)); + + return parse_stat(buf.ToString(), nullptr, stats); +} +void disable_core_dumps() { + struct rlimit lim; + PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0); + lim.rlim_cur = 0; + PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0); + + // Set coredump_filter to not dump any parts of the address space. + // Although the above disables core dumps to files, if core_pattern + // is set to a pipe rather than a file, it's not sufficient. Setting + // this pattern results in piping a very minimal dump into the core + // processor (eg abrtd), thus speeding up the crash. + int f; + RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY)); + if (f >= 0) { + ssize_t ret; + RETRY_ON_EINTR(ret, write(f, "00000000", 8)); + int close_ret; + RETRY_ON_EINTR(close_ret, close(f)); + } +} + +bool is_being_debugged() { +#ifndef __linux__ + return false; +#else + // Look for the TracerPid line in /proc/self/status. + // If this is non-zero, we are being ptraced, which is indicative of gdb or strace + // being attached. + faststring buf; + Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf); + if (!s.ok()) { + LOG(WARNING) << "could not read /proc/self/status: " << s.to_string(); + return false; + } + StringPiece buf_sp(reinterpret_cast(buf.data()), buf.size()); + vector lines = Split(buf_sp, "\n"); + for (const auto& l : lines) { + if (!HasPrefixString(l, "TracerPid:")) continue; + std::pair key_val = Split(l, "\t"); + int64_t tracer_pid = -1; + if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) { + LOG(WARNING) << "Invalid line in /proc/self/status: " << l; + return false; + } + return tracer_pid != 0; + } + LOG(WARNING) << "Could not find TracerPid line in /proc/self/status"; + return false; +#endif // __linux__ +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/util/os_util.h b/be/src/util/os_util.h new file mode 100644 index 00000000000000..7e0f514521f9ae --- /dev/null +++ b/be/src/util/os_util.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_UTIL_OS_UTIL_H +#define DORIS_BE_UTIL_OS_UTIL_H + +#include +#include +#include + +#include "common/status.h" +#include "env/env.h" + +namespace doris { + +// Utility methods to read interesting values from /proc. +// TODO: Get stats for parent process. + +// Container struct for statistics read from the /proc filesystem for a thread. +struct ThreadStats { + int64_t user_ns; + int64_t kernel_ns; + int64_t iowait_ns; + + // Default constructor zeroes all members in case structure can't be filled by + // GetThreadStats. + ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) {} +}; + +// Populates ThreadStats object using a given buffer. The buffer is expected to +// conform to /proc//task//stat layout; an error will be returned otherwise. +// +// If 'name' is supplied, the extracted thread name will be written to it. +Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats); + +// Populates ThreadStats object for a given thread by reading from +// /proc//task//stat. Returns OK unless the file cannot be read or is in an +// unrecognised format, or if the kernel version is not modern enough. +Status get_thread_stats(int64_t tid, ThreadStats* stats); + +// Disable core dumps for this process. +// +// This is useful particularly in tests where we have injected failures and don't +// want to generate a core dump from an "expected" crash. +void disable_core_dumps(); + +// Return true if this process appears to be running under a debugger or strace. +// +// This may return false on unsupported (non-Linux) platforms. +bool is_being_debugged(); + +} // namespace doris + +#endif diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index c4b5493dd41bb2..1265ed93c59a27 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -17,23 +17,30 @@ #include "thread.h" +#include +#include #include + +#include #include #include #include #include #include -#include -#include #include "common/logging.h" #include "gutil/atomicops.h" -#include "gutil/once.h" #include "gutil/dynamic_annotations.h" +#include "gutil/map-util.h" +#include "gutil/once.h" #include "gutil/strings/substitute.h" #include "olap/olap_define.h" +#include "util/easy_json.h" #include "util/mutex.h" +#include "util/os_util.h" #include "util/scoped_cleanup.h" +#include "util/url_coding.h" +#include "http/web_page_handler.h" namespace doris { @@ -55,9 +62,7 @@ static GoogleOnceType once = GOOGLE_ONCE_INIT; // auditing. Used only by Thread. class ThreadMgr { public: - ThreadMgr() - : _threads_started_metric(0), - _threads_running_metric(0) {} + ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {} ~ThreadMgr() { MutexLock lock(&_lock); @@ -74,18 +79,17 @@ class ThreadMgr { // already been removed, this is a no-op. void remove_thread(const pthread_t& pthread_id, const std::string& category); -private: + void start_instrumentation(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const; +private: // Container class for any details we want to capture about a thread // TODO: Add start-time. // TODO: Track fragment ID. class ThreadDescriptor { public: - ThreadDescriptor() { } + ThreadDescriptor() {} ThreadDescriptor(std::string category, std::string name, int64_t thread_id) - : _name(std::move(name)), - _category(std::move(category)), - _thread_id(thread_id) {} + : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {} const std::string& name() const { return _name; } const std::string& category() const { return _category; } @@ -97,6 +101,8 @@ class ThreadMgr { int64_t _thread_id; }; + void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const; + // A ThreadCategory is a set of threads that are logically related. // TODO: unordered_map is incompatible with pthread_t, but would be more // efficient here. @@ -106,7 +112,7 @@ class ThreadMgr { typedef std::map ThreadCategoryMap; // Protects _thread_categories and thread metrics. - Mutex _lock; + mutable Mutex _lock; // All thread categorys that ever contained a thread, even if empty ThreadCategoryMap _thread_categories; @@ -121,7 +127,7 @@ class ThreadMgr { void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) { if (tid == getpid()) { - return ; + return; } int err = prctl(PR_SET_NAME, name.c_str()); if (err < 0 && errno != EPERM) { @@ -169,6 +175,81 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca ANNOTATE_IGNORE_READS_AND_WRITES_END(); } +void ThreadMgr::start_instrumentation(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const { + const auto* category_name = FindOrNull(args, "group"); + if (category_name) { + bool requested_all = (*category_name == "all"); + ej->Set("requested_thread_group", EasyJson::kObject); + (*ej)["group_name"] = escape_for_html_to_string(*category_name); + (*ej)["requested_all"] = requested_all; + + // The critical section is as short as possible so as to minimize the delay + // imposed on new threads that acquire the lock in write mode. + vector descriptors_to_print; + if (!requested_all) { + MutexLock l(&_lock); + const auto* category = FindOrNull(_thread_categories, *category_name); + if (category) { + for (const auto& elem : *category) { + descriptors_to_print.emplace_back(elem.second); + } + } + } else { + MutexLock l(&_lock); + for (const auto& category : _thread_categories) { + for (const auto& elem : category.second) { + descriptors_to_print.emplace_back(elem.second); + } + } + } + + EasyJson found = (*ej).Set("found", EasyJson::kObject); + EasyJson threads = found.Set("threads", EasyJson::kArray); + for (const auto& desc : descriptors_to_print) { + summarize_thread_descriptor(desc, &threads); + } + } else { + // List all thread groups and the number of threads running in each. + vector> thread_categories_info; + uint64_t running; + { + MutexLock l(&_lock); + running = _threads_running_metric; + thread_categories_info.reserve(_thread_categories.size()); + for (const auto& category : _thread_categories) { + thread_categories_info.emplace_back(category.first, category.second.size()); + } + + (*ej)["total_threads_running"] = running; + EasyJson groups = ej->Set("groups", EasyJson::kArray); + for (const auto& elem : thread_categories_info) { + string category_arg; + url_encode(elem.first, &category_arg); + LOG(INFO) << "encode url path: " << category_arg; + EasyJson group = groups.PushBack(EasyJson::kObject); + group["encoded_group_name"] = category_arg; + group["group_name"] = elem.first; + group["threads_running"] = elem.second; + } + } + } +} + +void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc, + EasyJson* ej) const { + ThreadStats stats; + Status status = get_thread_stats(desc.thread_id(), &stats); + if (!status.ok()) { + LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string(); + } + + EasyJson thread = ej->PushBack(EasyJson::kObject); + thread["thread_name"] = desc.name(); + thread["user_sec"] = static_cast(stats.user_ns) / 1e9; + thread["kernel_sec"] = static_cast(stats.kernel_ns) / 1e9; + thread["iowait_sec"] = static_cast(stats.iowait_ns) / 1e9; +} + Thread::~Thread() { if (_joinable) { int ret = pthread_detach(_thread); @@ -201,7 +282,8 @@ const std::string& Thread::category() const { } std::string Thread::to_string() const { - return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category); + return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, + _category); } Thread* Thread::current_thread() { @@ -210,7 +292,7 @@ Thread* Thread::current_thread() { int64_t Thread::unique_thread_id() { return static_cast(pthread_self()); -} +} int64_t Thread::current_thread_id() { return syscall(SYS_gettid); @@ -268,7 +350,7 @@ Status Thread::start_thread(const std::string& category, const std::string& name t->_joinable = true; cleanup.cancel(); - VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name; + VLOG(3) << "Started thread " << t->tid() << " - " << category << ":" << name; return Status::OK(); } @@ -331,10 +413,10 @@ void Thread::init_threadmgr() { } ThreadJoiner::ThreadJoiner(Thread* thr) - : _thread(CHECK_NOTNULL(thr)), - _warn_after_ms(kDefaultWarnAfterMs), - _warn_every_ms(kDefaultWarnEveryMs), - _give_up_after_ms(kDefaultGiveUpAfterMs) {} + : _thread(CHECK_NOTNULL(thr)), + _warn_after_ms(kDefaultWarnAfterMs), + _warn_every_ms(kDefaultWarnEveryMs), + _give_up_after_ms(kDefaultGiveUpAfterMs) {} ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { _warn_after_ms = ms; @@ -352,8 +434,7 @@ ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { } Status ThreadJoiner::join() { - if (Thread::current_thread() && - Thread::current_thread()->tid() == _thread->tid()) { + if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) { return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name); } @@ -397,8 +478,15 @@ Status ThreadJoiner::join() { } waited_ms += wait_for; } - return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", - waited_ms, _thread->_name)); + return Status::Aborted( + strings::Substitute("Timed out after $0ms joining on $1", waited_ms, _thread->_name)); } +void start_thread_instrumentation(WebPageHandler* web_page_handler) { + web_page_handler->register_template_page( + "/threadz", "Threadz", + boost::bind(&ThreadMgr::start_instrumentation, thread_manager.get(), + boost::placeholders::_1, boost::placeholders::_2), + true); +} } // namespace doris diff --git a/be/src/util/thread.h b/be/src/util/thread.h index 1e1b1e99278802..d50ec8ad0151a7 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -18,27 +18,26 @@ #ifndef DORIS_BE_SRC_UTIL_THREAD_H #define DORIS_BE_SRC_UTIL_THREAD_H -#include #include #include +#include + #include "common/status.h" #include "gutil/ref_counted.h" #include "util/countdown_latch.h" namespace doris { +class WebPageHandler; + class Thread : public RefCountedThreadSafe { public: - enum CreateFlags { - NO_FLAGS = 0, - NO_STACK_WATCHDOG = 1 - }; + enum CreateFlags { NO_FLAGS = 0, NO_STACK_WATCHDOG = 1 }; template static Status create_with_flags(const std::string& category, const std::string& name, - const F& f, uint64_t flags, - scoped_refptr* holder) { + const F& f, uint64_t flags, scoped_refptr* holder) { return start_thread(category, name, f, flags, holder); } @@ -145,17 +144,15 @@ class Thread : public RefCountedThreadSafe { }; // User function to be executed by this thread. - typedef std::function ThreadFunctor; + typedef std::function ThreadFunctor; Thread(const std::string& category, const std::string& name, ThreadFunctor functor) - : _thread(0), - _tid(INVALID_TID), - _functor(std::move(functor)), - _category(std::move(category)), - _name(std::move(name)), - _done(1), - _joinable(false) - {} - + : _thread(0), + _tid(INVALID_TID), + _functor(std::move(functor)), + _category(std::move(category)), + _name(std::move(name)), + _done(1), + _joinable(false) {} // Library-specific thread ID. pthread_t _thread; @@ -172,7 +169,7 @@ class Thread : public RefCountedThreadSafe { int64_t _tid; const ThreadFunctor _functor; - + const std::string _category; const std::string _name; @@ -188,7 +185,7 @@ class Thread : public RefCountedThreadSafe { // Thread local pointer to the current thread of execution. Will be NULL if the current // thread is not a Thread. static __thread Thread* _tls; - + // Wait for the running thread to publish its tid. int64_t wait_for_tid() const; @@ -280,6 +277,8 @@ class ThreadJoiner { DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); }; +// Registers /threadz with the debug webserver. +void start_thread_instrumentation(WebPageHandler* web_page_handler); } //namespace doris diff --git a/be/src/util/url_coding.cpp b/be/src/util/url_coding.cpp index 08a671f0acc871..5cbb4020b067ca 100644 --- a/be/src/util/url_coding.cpp +++ b/be/src/util/url_coding.cpp @@ -298,5 +298,10 @@ void escape_for_html(const std::string& in, std::stringstream* out) { } } } +std::string escape_for_html_to_string(const std::string& in) { + std::stringstream str; + escape_for_html(in, &str); + return str.str(); +} } diff --git a/be/src/util/url_coding.h b/be/src/util/url_coding.h index 7a9457e130a951..2162c9e45135ef 100644 --- a/be/src/util/url_coding.h +++ b/be/src/util/url_coding.h @@ -54,6 +54,8 @@ bool base64_decode(const std::string& in, std::string* out); // judiciously. void escape_for_html(const std::string& in, std::stringstream* out); +// Same as above, but returns a string. +std::string escape_for_html_to_string(const std::string& in); } #endif diff --git a/be/test/exec/orc_scanner_test.cpp b/be/test/exec/orc_scanner_test.cpp index 0c5e7d7fe63a91..5e27d74949acab 100644 --- a/be/test/exec/orc_scanner_test.cpp +++ b/be/test/exec/orc_scanner_test.cpp @@ -573,7 +573,7 @@ TEST_F(OrcScannerTest, normal3) { expr.nodes.push_back(cast_expr); expr.nodes.push_back(slot_ref); - params.expr_of_dest_slot.emplace(8 + i, expr); + params.expr_of_dest_slot.emplace(9 + i, expr); params.src_slot_ids.push_back(i); } @@ -606,7 +606,7 @@ TEST_F(OrcScannerTest, normal3) { expr.nodes.push_back(cast_expr); expr.nodes.push_back(slot_ref); - params.expr_of_dest_slot.emplace(13, expr); + params.expr_of_dest_slot.emplace(14, expr); params.src_slot_ids.push_back(5); } @@ -639,7 +639,7 @@ TEST_F(OrcScannerTest, normal3) { expr.nodes.push_back(cast_expr); expr.nodes.push_back(slot_ref); - params.expr_of_dest_slot.emplace(14, expr); + params.expr_of_dest_slot.emplace(15, expr); params.src_slot_ids.push_back(6); } { @@ -671,9 +671,42 @@ TEST_F(OrcScannerTest, normal3) { expr.nodes.push_back(cast_expr); expr.nodes.push_back(slot_ref); - params.expr_of_dest_slot.emplace(15, expr); + params.expr_of_dest_slot.emplace(16, expr); params.src_slot_ids.push_back(7); } + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = decimal_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimal"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = decimal_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("cast_to_decimal_val(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalOperators::cast_to_decimal_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 8; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(17, expr); + params.src_slot_ids.push_back(8); + } + } params.__set_src_tuple_id(0); params.__set_dest_tuple_id(1); @@ -709,6 +742,8 @@ TEST_F(OrcScannerTest, normal3) { TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col7").column_pos(7).build()); src_tuple_builder.add_slot( TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col8").column_pos(8).build()); + src_tuple_builder.add_slot( + TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col9").column_pos(9).build()); src_tuple_builder.build(&dtb); TTupleDescriptorBuilder dest_tuple_builder; @@ -728,6 +763,8 @@ TEST_F(OrcScannerTest, normal3) { TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("col7").column_pos(7).build()); dest_tuple_builder.add_slot( TSlotDescriptorBuilder().type(TYPE_DATE).nullable(true).column_name("col8").column_pos(8).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(27,9).column_name("col9").column_pos(9).build()); dest_tuple_builder.build(&dtb); t_desc_table = dtb.desc_tbl(); @@ -755,7 +792,7 @@ TEST_F(OrcScannerTest, normal3) { bool eof = false; ASSERT_TRUE(scanner.get_next(tuple, &tuple_pool, &eof).ok()); ASSERT_EQ(Tuple::to_string(tuple, *_desc_tbl->get_tuple_descriptor(1)), - "(0.123456789 1.12 -1.1234500000 0.12345 0.000 1 2020-01-14 14:12:19 2020-02-10)"); + "(0.123456789 1.12 -1.1234500000 0.12345 0.000 1 2020-01-14 14:12:19 2020-02-10 -0.0014)"); scanner.close(); } diff --git a/be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc b/be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc index 65bc52b7c1cdf8..548fac4f5b0749 100644 Binary files a/be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc and b/be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc differ diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 882921e590e329..9d419bbb75f412 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -64,3 +64,4 @@ ADD_BE_TEST(scoped_cleanup_test) ADD_BE_TEST(thread_test) ADD_BE_TEST(threadpool_test) ADD_BE_TEST(trace_test) +ADD_BE_TEST(easy_json-test) diff --git a/be/test/util/easy_json-test.cpp b/be/test/util/easy_json-test.cpp new file mode 100644 index 00000000000000..616ffb8aadc0a2 --- /dev/null +++ b/be/test/util/easy_json-test.cpp @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "gutil/integral_types.h" +#include "util/easy_json.h" + +using rapidjson::SizeType; +using rapidjson::Value; +using std::string; + +namespace doris { + +class EasyJsonTest: public ::testing::Test {}; + +TEST_F(EasyJsonTest, TestNull) { + EasyJson ej; + ASSERT_TRUE(ej.value().IsNull()); +} + +TEST_F(EasyJsonTest, TestBasic) { + EasyJson ej; + ej.SetObject(); + ej.Set("1", true); + ej.Set("2", kint32min); + ej.Set("4", kint64min); + ej.Set("6", 1.0); + ej.Set("7", "string"); + + Value& v = ej.value(); + + ASSERT_EQ(v["1"].GetBool(), true); + ASSERT_EQ(v["2"].GetInt(), kint32min); + ASSERT_EQ(v["4"].GetInt64(), kint64min); + ASSERT_EQ(v["6"].GetDouble(), 1.0); + ASSERT_EQ(string(v["7"].GetString()), "string"); +} + +TEST_F(EasyJsonTest, TestNested) { + EasyJson ej; + ej.SetObject(); + ej.Get("nested").SetObject(); + ej.Get("nested").Set("nested_attr", true); + ASSERT_EQ(ej.value()["nested"]["nested_attr"].GetBool(), true); + + ej.Get("nested_array").SetArray(); + ej.Get("nested_array").PushBack(1); + ej.Get("nested_array").PushBack(2); + ASSERT_EQ(ej.value()["nested_array"][SizeType(0)].GetInt(), 1); + ASSERT_EQ(ej.value()["nested_array"][SizeType(1)].GetInt(), 2); +} + +TEST_F(EasyJsonTest, TestCompactSyntax) { + EasyJson ej; + ej["nested"]["nested_attr"] = true; + ASSERT_EQ(ej.value()["nested"]["nested_attr"].GetBool(), true); + + for (int i = 0; i < 2; i++) { + ej["nested_array"][i] = i + 1; + } + ASSERT_EQ(ej.value()["nested_array"][SizeType(0)].GetInt(), 1); + ASSERT_EQ(ej.value()["nested_array"][SizeType(1)].GetInt(), 2); +} + +TEST_F(EasyJsonTest, TestComplexInitializer) { + EasyJson ej; + ej = EasyJson::kObject; + ASSERT_TRUE(ej.value().IsObject()); + + EasyJson nested_arr = ej.Set("nested_arr", EasyJson::kArray); + ASSERT_TRUE(nested_arr.value().IsArray()); + + EasyJson nested_obj = nested_arr.PushBack(EasyJson::kObject); + ASSERT_TRUE(ej["nested_arr"][0].value().IsObject()); +} + +TEST_F(EasyJsonTest, TestAllocatorLifetime) { + EasyJson* root = new EasyJson; + EasyJson child = (*root)["child"]; + delete root; + + child["child_attr"] = 1; + ASSERT_EQ(child.value()["child_attr"].GetInt(), 1); +} +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index efeda65a00fae8..aa6187917d6f07 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -111,6 +111,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.Daemon; @@ -460,6 +461,11 @@ private static class SingletonHolder { } private Catalog() { + this(false); + } + + // if isCheckpointCatalog is true, it means that we should not collect thread pool metric + private Catalog(boolean isCheckpointCatalog) { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); @@ -489,7 +495,7 @@ private Catalog() { this.masterIp = ""; this.systemInfo = new SystemInfoService(); - this.heartbeatMgr = new HeartbeatMgr(systemInfo); + this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog); this.tabletInvertedIndex = new TabletInvertedIndex(); this.colocateTableIndex = new ColocateTableIndex(); this.recycleBin = new CatalogRecycleBin(); @@ -503,7 +509,7 @@ private Catalog() { this.isDefaultClusterCreated = false; - this.pullLoadJobMgr = new PullLoadJobMgr(); + this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog); this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); @@ -522,7 +528,7 @@ private Catalog() { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size); + this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -555,7 +561,7 @@ public static Catalog getCurrentCatalog() { // only checkpoint thread it self will goes here. // so no need to care about the thread safe. if (CHECKPOINT == null) { - CHECKPOINT = new Catalog(); + CHECKPOINT = new Catalog(true); } return CHECKPOINT; } else { @@ -1206,6 +1212,8 @@ private void transferToMaster() { String msg = "master finished to replay journal, can write now."; Util.stdoutWithTime(msg); LOG.info(msg); + // for master, there are some new thread pools need to register metric + ThreadPoolManager.registerAllThreadPoolMetric(); } /* @@ -1245,6 +1253,7 @@ private void startMasterOnlyDaemonThreads() { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler + loadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); diff --git a/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java index a65797dfa9b35a..3c72c9f77b7868 100644 --- a/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java @@ -469,8 +469,11 @@ private void balanceGroup() { } List allBackendIds = infoService.getClusterBackendIds(db.getClusterName(), true); + List allBackendIdsAvailable = allBackendIds.stream() + .filter(infoService::checkBackendAvailable) + .collect(Collectors.toList()); List> balancedBackendsPerBucketSeq = Lists.newArrayList(); - if (balance(groupId, allBackendIds, colocateIndex, infoService, balancedBackendsPerBucketSeq)) { + if (balance(groupId, allBackendIdsAvailable, colocateIndex, infoService, balancedBackendsPerBucketSeq)) { colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info); diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c177cf7b1822a6..ca923a9e713648 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -39,7 +39,7 @@ /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. * thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer. - * it provide three functions to construct thread pool now. + * it provide four functions to construct thread pool now. * * 1. newDaemonCacheThreadPool * Wrapper over newCachedThreadPool with additional maxNumThread limit. @@ -47,6 +47,8 @@ * Wrapper over newCachedThreadPool with additional blocking queue capacity limit. * 3. newDaemonThreadPool * Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly. + * 4. newDaemonScheduledThreadPool + * Wrapper over ScheduledThreadPoolExecutor, but without delay task num limit and thread num limit now(NOTICE). * * All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap, * so the thread pool name in fe must be unique. @@ -66,6 +68,7 @@ public static void registerAllThreadPoolMetric() { for (Map.Entry entry : nameToThreadPoolMap.entrySet()) { registerThreadPoolMetric(entry.getKey(), entry.getValue()); } + nameToThreadPoolMap.clear(); } public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) { @@ -92,13 +95,14 @@ public Integer getValue() { } } - public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName) { - return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName); + public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } - public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName) { + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize), - new BlockedPolicy(poolName, 60), poolName); + new BlockedPolicy(poolName, 60), poolName, needRegisterMetric); } public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, @@ -107,17 +111,25 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, - String poolName) { + String poolName, + boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - nameToThreadPoolMap.put(poolName, threadPool); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } return threadPool; } - public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) { + // Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor, + // so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor + // Please use this api only for scheduling short task at fix rate. + public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName, boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory); - nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + } return scheduledThreadPoolExecutor; } diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index 9caab7bb72c36f..c6892caec3ab39 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -100,7 +100,7 @@ private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); args.executorService(threadPoolExecutor); server = new TThreadedSelectorServer(args); } @@ -114,7 +114,7 @@ private void createThreadPoolServer() throws TTransportException { TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); serverArgs.executorService(threadPoolExecutor); server = new TThreadPoolServer(serverArgs); } diff --git a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java index f96e4b7d0f99a5..3a7cae77256b5c 100644 --- a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java +++ b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java @@ -41,7 +41,7 @@ public class ClusterStatePublisher { private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class); private static ClusterStatePublisher INSTANCE; - private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher"); + private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true); private SystemInfoService clusterInfoService; diff --git a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java deleted file mode 100644 index 658068db5c3503..00000000000000 --- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.publish; - -import org.apache.doris.common.Config; -import org.apache.doris.common.ThreadPoolManager; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -// Fixed time scheduled publisher. -// You can register your routine publish here. -public class FixedTimePublisher { - private static FixedTimePublisher INSTANCE; - - private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher"); - private ClusterStatePublisher publisher; - - public FixedTimePublisher(ClusterStatePublisher publisher) { - this.publisher = publisher; - } - - public static FixedTimePublisher getInstance() { - if (INSTANCE == null) { - INSTANCE = new FixedTimePublisher(ClusterStatePublisher.getInstance()); - } - return INSTANCE; - } - - public void register(Callback callback, long intervalMs) { - scheduler.scheduleAtFixedRate(new Worker(callback), 0, intervalMs, TimeUnit.MILLISECONDS); - } - - private class Worker implements Runnable { - private Callback callback; - - public Worker(Callback callback) { - this.callback = callback; - } - - @Override - public void run() { - ClusterStateUpdate.Builder builder = ClusterStateUpdate.builder(); - builder.addUpdate(callback.getTopicUpdate()); - ClusterStateUpdate state = builder.build(); - Listener listener = Listeners.nullToNoOpListener(callback.getListener()); - - publisher.publish(state, listener, Config.meta_publish_timeout_ms); - } - } - - public static interface Callback { - public TopicUpdate getTopicUpdate(); - - public Listener getListener(); - } -} diff --git a/fe/src/main/java/org/apache/doris/load/ExportChecker.java b/fe/src/main/java/org/apache/doris/load/ExportChecker.java index 414cfbd774d63e..eccd6091d5c670 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportChecker.java +++ b/fe/src/main/java/org/apache/doris/load/ExportChecker.java @@ -53,10 +53,10 @@ public static void init(long intervalMs) { checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true); executors.put(JobState.PENDING, pendingTaskExecutor); - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true); executors.put(JobState.EXPORTING, exportingTaskExecutor); } @@ -64,6 +64,9 @@ public static void startAll() { for (ExportChecker exportChecker : checkers.values()) { exportChecker.start(); } + for (MasterTaskExecutor masterTaskExecutor : executors.values()) { + masterTaskExecutor.start(); + } } @Override diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index d6e0cb7d862f7a..20620b5f0d4629 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -92,14 +92,14 @@ public static void init(long intervalMs) { Map pendingPriorityMap = Maps.newHashMap(); pendingPriorityMap.put(TPriority.NORMAL, - new MasterTaskExecutor(Config.load_pending_thread_num_normal_priority)); + new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority, true)); pendingPriorityMap.put(TPriority.HIGH, - new MasterTaskExecutor(Config.load_pending_thread_num_high_priority)); + new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority, true)); executors.put(JobState.PENDING, pendingPriorityMap); Map etlPriorityMap = Maps.newHashMap(); - etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor(Config.load_etl_thread_num_normal_priority)); - etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor(Config.load_etl_thread_num_high_priority)); + etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority, true)); + etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority, true)); executors.put(JobState.ETL, etlPriorityMap); } @@ -110,6 +110,11 @@ public static void startAll() { for (LoadChecker loadChecker : checkers.values()) { loadChecker.start(); } + for (Map map : executors.values()) { + for (MasterTaskExecutor masterTaskExecutor : map.values()) { + masterTaskExecutor.start(); + } + } } @Override diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 8d4139948268c8..4e5e4e112cfb14 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -86,7 +86,7 @@ public final class MetricRepo { public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; - private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool"); + private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); public static synchronized void init() { diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java index 1cd099bb672e7f..ed1d599978668d 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -70,7 +70,7 @@ public boolean start() { } // start accept thread - listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener"); + listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true); running = true; listenerFuture = listener.submit(new Listener()); diff --git a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java index c0ed55f7513b9a..964abaa045f9a4 100644 --- a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java @@ -46,7 +46,7 @@ public class NMysqlServer extends MysqlServer { private AcceptingChannel server; // default task service. - private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool"); + private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool", true); public NMysqlServer(int port, ConnectScheduler connectScheduler) { this.port = port; diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 6f4ec55c21782f..5d431c476ba443 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -48,14 +48,14 @@ public class ConnectScheduler { private AtomicInteger nextConnectionId; private Map connectionMap = Maps.newHashMap(); private Map connByUser = Maps.newHashMap(); - private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool"); + private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true); // Use a thread to check whether connection is timeout. Because // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. // 2. Use a thread to poll maybe lose some accurate, but is enough to us. - private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1, - "Connect-Scheduler-Check-Timer"); + private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, + "Connect-Scheduler-Check-Timer", true); public ConnectScheduler(int maxConnections) { this.maxConnections = maxConnections; diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 6f391b419e2665..7948c6371254cd 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -70,11 +70,11 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference masterInfo = new AtomicReference<>(); - public HeartbeatMgr(SystemInfoService nodeMgr) { + public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, - Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool"); + Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); this.heartbeatFlags = new HeartbeatFlags(); } diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java index 4ebd8d44244e79..a206bb309a3fee 100644 --- a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java @@ -24,7 +24,7 @@ public class AgentTaskExecutor { - private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool"); + private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool", true); public AgentTaskExecutor() { diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 90aad6b5c949e5..9b26556a3d10e9 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -27,21 +27,28 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + public class MasterTaskExecutor { private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class); - private ScheduledExecutorService executor; + private ThreadPoolExecutor executor; private Map> runningTasks; + public ScheduledThreadPoolExecutor scheduledThreadPool; - public MasterTaskExecutor(int threadNum) { - executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool"); + public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool", needRegisterMetric); runningTasks = Maps.newHashMap(); - executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } - + + public void start() { + scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); + } + /** * submit task to task executor * @param task @@ -61,6 +68,7 @@ public boolean submit(MasterTask task) { } public void close() { + scheduledThreadPool.shutdown(); executor.shutdown(); runningTasks.clear(); } @@ -70,7 +78,7 @@ public int getTaskNum() { return runningTasks.size(); } } - + private class TaskChecker implements Runnable { @Override public void run() { diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index ed8c3240377c32..50a80681ce3e4e 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -46,8 +46,8 @@ public class PullLoadJobMgr { private int concurrency = 10; - public PullLoadJobMgr() { - executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr"); + public PullLoadJobMgr(boolean needRegisterMetric) { + executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric); } /** diff --git a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java index 2ae2ea99e6a070..a31b15cc394355 100755 --- a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java +++ b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java @@ -29,9 +29,9 @@ public class ThreadPoolManagerTest { @Test public void testNormal() throws InterruptedException { - ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool"); + ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool", true); ThreadPoolExecutor testFixedThreaddPool = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, - "test_fixed_thread_pool"); + "test_fixed_thread_pool", true); ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool); ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2b51cdda83be42..7228a2eafc99e2 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -165,7 +165,7 @@ public void functionTest(@Mocked Catalog catalog, RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); routineLoadTaskScheduler.setInterval(5000); - ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler"); + ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler", false); executorService.submit(routineLoadScheduler); executorService.submit(routineLoadTaskScheduler); diff --git a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java index 8bc68158100bcf..352e300b0941c9 100644 --- a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java @@ -33,7 +33,8 @@ public class MasterTaskExecutorTest { @Before public void setUp() { - executor = new MasterTaskExecutor(THREAD_NUM); + executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM, false); + executor.start(); } @After diff --git a/webroot/be/home.mustache b/webroot/be/home.mustache new file mode 100644 index 00000000000000..22995d2ef09910 --- /dev/null +++ b/webroot/be/home.mustache @@ -0,0 +1,28 @@ +{{! +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +}} + +

    Version Info

    +
    {{version}}
    +

    Hardware Info

    +

    CPU Info

    +
    {{cpuinfo}}
    +

    Memory Info

    +
    {{meminfo}}
    +

    Disk Info

    +
    {{diskinfo}}
    diff --git a/webroot/be/threadz.mustache b/webroot/be/threadz.mustache new file mode 100644 index 00000000000000..77f575ffae4f2d --- /dev/null +++ b/webroot/be/threadz.mustache @@ -0,0 +1,68 @@ +{{! +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +}} + +{{#requested_thread_group}} +

    Thread Group: {{group_name}}

    +{{#requested_all}}

    All Threads :

    {{/requested_all}} +{{#found}} + + + + + + + + + + + {{#threads}} + + + + + + + {{/threads}} + +
    Thread nameCumulative User CPU (s)Cumulative Kernel CPU (s)Cumulative IO-wait (s)
    {{thread_name}}{{user_sec}}{{kernel_sec}}{{iowait_sec}}
    +{{/found}} +{{^found}}Thread group {{group_name}} not found{{/found}} +{{/requested_thread_group}} + +{{^requested_thread_group}} +

    Thread Groups

    +

    {{total_threads_running}} thread(s) running

    +

    All Threads

    + + + + + + + + + {{#groups}} + + + + + {{/groups}} + +
    GroupThreads running
    {{group_name}}{{threads_running}}
    +{{/requested_thread_group}}