diff --git a/be/src/common/config.h b/be/src/common/config.h index 66e52a3be02a3c..9a28c6bca60551 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -305,6 +305,12 @@ CONF_mInt64(tablet_scan_frequency_time_node_interval_second, "300"); CONF_mInt32(compaction_tablet_scan_frequency_factor, "0"); CONF_mInt32(compaction_tablet_compaction_score_factor, "1"); +// This config can be set to limit thread number in tablet migration thread pool. +CONF_Int32(min_tablet_migration_threads, "1"); +CONF_Int32(max_tablet_migration_threads, "1"); + +CONF_mInt32(finished_migration_tasks_size, "10000"); + // Port to start debug webserver on CONF_Int32(webserver_port, "8040"); // Number of webserver workers diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 176955bf88016a..4d7caaceb1fe28 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -37,6 +37,7 @@ add_library(Webserver STATIC http_client.cpp action/mini_load.cpp action/health_action.cpp + action/tablet_migration_action.cpp action/tablets_info_action.cpp action/tablets_distribution_action.cpp action/checksum_action.cpp diff --git a/be/src/http/action/tablet_migration_action.cpp b/be/src/http/action/tablet_migration_action.cpp new file mode 100644 index 00000000000000..ed7ea2197607a7 --- /dev/null +++ b/be/src/http/action/tablet_migration_action.cpp @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/tablet_migration_action.h" + +#include + +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "olap/task/engine_storage_migration_task.h" +#include "util/json_util.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; + +TabletMigrationAction::TabletMigrationAction() { + _init_migration_action(); +} + +void TabletMigrationAction::_init_migration_action() { + int32_t max_thread_num = config::max_tablet_migration_threads; + int32_t min_thread_num = config::min_tablet_migration_threads; + ThreadPoolBuilder("MigrationTaskThreadPool") + .set_min_threads(min_thread_num) + .set_max_threads(max_thread_num) + .build(&_migration_thread_pool); +} + +void TabletMigrationAction::handle(HttpRequest* req) { + int64_t tablet_id = 0; + int32_t schema_hash = 0; + string dest_disk = ""; + string goal = ""; + Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal); + if (status.ok()) { + if (goal == "run") { + MigrationTask current_task(tablet_id, schema_hash, dest_disk); + TabletSharedPtr tablet; + DataDir* dest_store; + Status status = + _check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store); + if (status.ok()) { + do { + { + std::unique_lock lock(_migration_status_mutex); + std::map::iterator it_task = _migration_tasks.find(current_task); + if (it_task != _migration_tasks.end()) { + status = Status::AlreadyExist(strings::Substitute( + "There is a migration task for this tablet already exists. " + "dest_disk is $0 .", + (it_task->first)._dest_disk)); + break; + } + _migration_tasks[current_task] = "submitted"; + } + auto st = _migration_thread_pool->submit_func([&, tablet_id, schema_hash, + dest_disk, current_task]() { + { + std::unique_lock lock(_migration_status_mutex); + _migration_tasks[current_task] = "running"; + } + Status result_status = _execute_tablet_migration(tablet, dest_store); + { + std::unique_lock lock(_migration_status_mutex); + std::map::iterator it_task = + _migration_tasks.find(current_task); + if (it_task != _migration_tasks.end()) { + _migration_tasks.erase(it_task); + } + std::pair finished_task = + make_pair(current_task, result_status); + if (_finished_migration_tasks.size() >= + config::finished_migration_tasks_size) { + _finished_migration_tasks.pop_front(); + } + _finished_migration_tasks.push_back(finished_task); + } + }); + if (!st.ok()) { + status = Status::InternalError("Migration task submission failed"); + std::unique_lock lock(_migration_status_mutex); + std::map::iterator it_task = + _migration_tasks.find(current_task); + if (it_task != _migration_tasks.end()) { + _migration_tasks.erase(it_task); + } + } + } while (0); + } + std::string status_result; + if (!status.ok()) { + status_result = to_json(status); + } else { + status_result = + "{\"status\": \"Success\", \"msg\": \"migration task is successfully " + "submitted.\"}"; + } + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + } else { + DCHECK(goal == "status"); + MigrationTask current_task(tablet_id, schema_hash); + std::string status_result; + do { + std::unique_lock lock(_migration_status_mutex); + std::map::iterator it_task = _migration_tasks.find(current_task); + if (it_task != _migration_tasks.end()) { + status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " + + it_task->second + "\", \"dest_disk\": \"" + + (it_task->first)._dest_disk + "\"}"; + break; + } + + int i = _finished_migration_tasks.size() - 1; + for (; i >= 0; i--) { + MigrationTask finished_task = _finished_migration_tasks[i].first; + if (finished_task._tablet_id == tablet_id && + finished_task._schema_hash == schema_hash) { + status = _finished_migration_tasks[i].second; + if (status.ok()) { + status_result = + "{\"status\": \"Success\", \"msg\": \"migration task has " + "finished successfully\", \"dest_disk\": \"" + + finished_task._dest_disk + "\"}"; + } + break; + } + } + if (i < 0) { + status = Status::NotFound("Migration task not found"); + } + } while (0); + if (!status.ok()) { + status_result = to_json(status); + } + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + } + } else { + std::string status_result = to_json(status); + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + } +} + +Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id, + int32_t& schema_hash, string& dest_disk, string& goal) { + const std::string& req_tablet_id = req->param("tablet_id"); + const std::string& req_schema_hash = req->param("schema_hash"); + try { + tablet_id = std::stoull(req_tablet_id); + schema_hash = std::stoul(req_schema_hash); + } catch (const std::exception& e) { + LOG(WARNING) << "invalid argument.tablet_id:" << req_tablet_id + << ", schema_hash:" << req_schema_hash; + return Status::InternalError(strings::Substitute("Convert failed, $0", e.what())); + } + dest_disk = req->param("disk"); + goal = req->param("goal"); + if (goal != "run" && goal != "status") { + return Status::InternalError("invalid goal argument."); + } + return Status::OK(); +} + +Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, int32_t schema_hash, + string dest_disk, TabletSharedPtr& tablet, + DataDir** dest_store) { + tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); + if (tablet == nullptr) { + LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash; + return Status::NotFound("Tablet not found"); + } + + // request specify the data dir + *dest_store = StorageEngine::instance()->get_store(dest_disk); + if (*dest_store == nullptr) { + LOG(WARNING) << "data dir not found: " << dest_disk; + return Status::NotFound("Disk not found"); + } + + if (tablet->data_dir() == *dest_store) { + LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk; + return Status::AlreadyExist("Tablet already exist in destination disk"); + } + + // check disk capacity + int64_t tablet_size = tablet->tablet_footprint(); + if ((*dest_store)->reach_capacity_limit(tablet_size)) { + LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path() + << ", tablet size: " << tablet_size; + return Status::InternalError("Insufficient disk capacity"); + } + + return Status::OK(); +} + +Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet, + DataDir* dest_store) { + int64_t tablet_id = tablet->tablet_id(); + int32_t schema_hash = tablet->schema_hash(); + string dest_disk = dest_store->path(); + EngineStorageMigrationTask engine_task(tablet, dest_store); + OLAPStatus res = StorageEngine::instance()->execute_task(&engine_task); + Status status = Status::OK(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk + << ", status:" << res; + status = Status::InternalError(strings::Substitute("migration task failed, res: $0", res)); + } else { + LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk + << ", status:" << res; + } + return status; +} + +} // namespace doris diff --git a/be/src/http/action/tablet_migration_action.h b/be/src/http/action/tablet_migration_action.h new file mode 100644 index 00000000000000..4e5a92b877fa23 --- /dev/null +++ b/be/src/http/action/tablet_migration_action.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "gen_cpp/Status_types.h" +#include "http/http_handler.h" +#include "olap/data_dir.h" +#include "olap/tablet.h" +#include "util/threadpool.h" + +namespace doris { + +// Migrate a tablet from a disk to another. +class TabletMigrationAction : public HttpHandler { +public: + TabletMigrationAction(); + void handle(HttpRequest* req) override; + void _init_migration_action(); + Status _execute_tablet_migration(TabletSharedPtr tablet, DataDir* dest_store); + Status _check_param(HttpRequest* req, int64_t& tablet_id, int32_t& schema_hash, + string& dest_disk, string& goal); + Status _check_migrate_request(int64_t tablet_id, int32_t schema_hash, string dest_disk, + TabletSharedPtr& tablet, DataDir** dest_store); + +private: + std::unique_ptr _migration_thread_pool; + + struct MigrationTask { + MigrationTask(int64_t tablet_id, int32_t schema_hash) + : _tablet_id(tablet_id), _schema_hash(schema_hash) {} + + MigrationTask(int64_t tablet_id, int32_t schema_hash, std::string dest_disk) + : _tablet_id(tablet_id), _schema_hash(schema_hash), _dest_disk(dest_disk) {} + + bool operator<(const MigrationTask& right) const { + if (_tablet_id != right._tablet_id) { + return _tablet_id < right._tablet_id; + } else if (_schema_hash != right._schema_hash) { + return _schema_hash < right._schema_hash; + } else { + return false; + } + } + + std::string to_string() const { + std::stringstream ss; + ss << "MigrationTask: tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash + << ", dest_disk=" << _dest_disk; + return ss.str(); + } + + int64_t _tablet_id; + int32_t _schema_hash; + std::string _dest_disk; + }; + + std::mutex _migration_status_mutex; + std::map _migration_tasks; + std::deque> _finished_migration_tasks; +}; +} // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index a2e430246f4488..2685c1e9c3e377 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -29,6 +29,7 @@ #include "http/action/snapshot_action.h" #include "http/action/stream_load.h" #include "http/action/tablets_distribution_action.h" +#include "http/action/tablet_migration_action.h" #include "http/action/tablets_info_action.h" #include "http/action/update_config_action.h" #include "http/default_path_handlers.h" @@ -93,6 +94,10 @@ Status HttpService::start() { TabletsDistributionAction* tablets_distribution_action = new TabletsDistributionAction(); _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action); + // Register tablet migration action + TabletMigrationAction* tablet_migration_action = new TabletMigrationAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action); + // register pprof actions PprofActions::setup(_env, _ev_http_server.get()); diff --git a/docs/en/administrator-guide/http-actions/tablet-migration-action.md b/docs/en/administrator-guide/http-actions/tablet-migration-action.md new file mode 100644 index 00000000000000..7ecc552552eee6 --- /dev/null +++ b/docs/en/administrator-guide/http-actions/tablet-migration-action.md @@ -0,0 +1,89 @@ +--- +{ + "title": "MIGRATE SINGLE TABLET TO A PARTICULAR DISK", + "language": "en" +} +--- + + + +# MIGRATE SINGLE TABLET TO A PARTICULAR DISK + +Migrate single tablet to a particular disk. + +Submit the migration task: + +``` +curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=run&tablet_id=xxx&schema_hash=xxx&disk=xxx +``` + +The return is the submission result of the migration task: + +``` + { + status: "Success", + msg: "migration task is successfully submitted." + } +``` + +or + +``` + { + status: "Fail", + msg: "Migration task submission failed" + } +``` + +Show the status of migration task: + +``` +curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=status&tablet_id=xxx&schema_hash=xxx +``` + +The return is the execution result of the migration task: + +``` + { + status: "Success", + msg: "migration task is running.", + dest_disk: "xxxxxx" + } +``` + +or + +``` + { + status: "Success", + msg: "migration task has finished successfully.", + dest_disk: "xxxxxx" + } +``` + +or + +``` + { + status: "Success", + msg: "migration task failed.", + dest_disk: "xxxxxx" + } +``` \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md b/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md new file mode 100644 index 00000000000000..c2855b6f9c95d0 --- /dev/null +++ b/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md @@ -0,0 +1,87 @@ +--- +{ + "title": "MIGRATE SINGLE TABLET TO A PARTICULAR DISK", + "language": "zh-CN" +} +--- + + + +# MIGRATE SINGLE TABLET TO A PARTICULAR DISK + +在BE节点上迁移单个tablet到指定磁盘 + +提交迁移任务: + +``` +curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=run&tablet_id=xxx&schema_hash=xxx&disk=xxx +``` + +返回值就是tablet迁移任务的提交结果: + +``` + { + status: "Success", + msg: "migration task is successfully submitted." + } +``` +或 +``` + { + status: "Fail", + msg: "Migration task submission failed" + } +``` + +查询迁移任务状态: + +``` +curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=status&tablet_id=xxx&schema_hash=xxx +``` + +返回值就是tablet迁移任务执行状态: + +``` + { + status: "Success", + msg: "migration task is running", + dest_disk: "xxxxxx" + } +``` + +或 + +``` + { + status: "Success", + msg: "migration task has finished successfully", + dest_disk: "xxxxxx" + } +``` + +或 + +``` + { + status: "Success", + msg: "migration task failed.", + dest_disk: "xxxxxx" + } +``` \ No newline at end of file