Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ CONF_mInt64(tablet_scan_frequency_time_node_interval_second, "300");
CONF_mInt32(compaction_tablet_scan_frequency_factor, "0");
CONF_mInt32(compaction_tablet_compaction_score_factor, "1");

// This config can be set to limit thread number in tablet migration thread pool.
CONF_Int32(min_tablet_migration_threads, "1");
CONF_Int32(max_tablet_migration_threads, "1");

CONF_mInt32(finished_migration_tasks_size, "10000");

// Port to start debug webserver on
CONF_Int32(webserver_port, "8040");
// Number of webserver workers
Expand Down
1 change: 1 addition & 0 deletions be/src/http/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_library(Webserver STATIC
http_client.cpp
action/mini_load.cpp
action/health_action.cpp
action/tablet_migration_action.cpp
action/tablets_info_action.cpp
action/tablets_distribution_action.cpp
action/checksum_action.cpp
Expand Down
239 changes: 239 additions & 0 deletions be/src/http/action/tablet_migration_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/tablet_migration_action.h"

#include <string>

#include "gutil/strings/substitute.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/task/engine_storage_migration_task.h"
#include "util/json_util.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

TabletMigrationAction::TabletMigrationAction() {
_init_migration_action();
}

void TabletMigrationAction::_init_migration_action() {
int32_t max_thread_num = config::max_tablet_migration_threads;
int32_t min_thread_num = config::min_tablet_migration_threads;
ThreadPoolBuilder("MigrationTaskThreadPool")
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.build(&_migration_thread_pool);
}

void TabletMigrationAction::handle(HttpRequest* req) {
int64_t tablet_id = 0;
int32_t schema_hash = 0;
string dest_disk = "";
string goal = "";
Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
if (status.ok()) {
if (goal == "run") {
MigrationTask current_task(tablet_id, schema_hash, dest_disk);
TabletSharedPtr tablet;
DataDir* dest_store;
Status status =
_check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store);
if (status.ok()) {
do {
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task = _migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
status = Status::AlreadyExist(strings::Substitute(
"There is a migration task for this tablet already exists. "
"dest_disk is $0 .",
(it_task->first)._dest_disk));
break;
}
_migration_tasks[current_task] = "submitted";
}
auto st = _migration_thread_pool->submit_func([&, tablet_id, schema_hash,
dest_disk, current_task]() {
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
_migration_tasks[current_task] = "running";
}
Status result_status = _execute_tablet_migration(tablet, dest_store);
{
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
_migration_tasks.erase(it_task);
}
std::pair<MigrationTask, Status> finished_task =
make_pair(current_task, result_status);
if (_finished_migration_tasks.size() >=
config::finished_migration_tasks_size) {
_finished_migration_tasks.pop_front();
}
_finished_migration_tasks.push_back(finished_task);
}
});
if (!st.ok()) {
status = Status::InternalError("Migration task submission failed");
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task =
_migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
_migration_tasks.erase(it_task);
}
}
} while (0);
}
std::string status_result;
if (!status.ok()) {
status_result = to_json(status);
} else {
status_result =
"{\"status\": \"Success\", \"msg\": \"migration task is successfully "
"submitted.\"}";
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
} else {
DCHECK(goal == "status");
MigrationTask current_task(tablet_id, schema_hash);
std::string status_result;
do {
std::unique_lock<std::mutex> lock(_migration_status_mutex);
std::map<MigrationTask, std::string>::iterator it_task = _migration_tasks.find(current_task);
if (it_task != _migration_tasks.end()) {
status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " +
it_task->second + "\", \"dest_disk\": \"" +
(it_task->first)._dest_disk + "\"}";
break;
}

int i = _finished_migration_tasks.size() - 1;
for (; i >= 0; i--) {
MigrationTask finished_task = _finished_migration_tasks[i].first;
if (finished_task._tablet_id == tablet_id &&
finished_task._schema_hash == schema_hash) {
status = _finished_migration_tasks[i].second;
if (status.ok()) {
status_result =
"{\"status\": \"Success\", \"msg\": \"migration task has "
"finished successfully\", \"dest_disk\": \"" +
finished_task._dest_disk + "\"}";
}
break;
}
}
if (i < 0) {
status = Status::NotFound("Migration task not found");
}
} while (0);
if (!status.ok()) {
status_result = to_json(status);
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
}
} else {
std::string status_result = to_json(status);
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, status_result);
}
}

Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id,
int32_t& schema_hash, string& dest_disk, string& goal) {
const std::string& req_tablet_id = req->param("tablet_id");
const std::string& req_schema_hash = req->param("schema_hash");
try {
tablet_id = std::stoull(req_tablet_id);
schema_hash = std::stoul(req_schema_hash);
} catch (const std::exception& e) {
LOG(WARNING) << "invalid argument.tablet_id:" << req_tablet_id
<< ", schema_hash:" << req_schema_hash;
return Status::InternalError(strings::Substitute("Convert failed, $0", e.what()));
}
dest_disk = req->param("disk");
goal = req->param("goal");
if (goal != "run" && goal != "status") {
return Status::InternalError("invalid goal argument.");
}
return Status::OK();
}

Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, int32_t schema_hash,
string dest_disk, TabletSharedPtr& tablet,
DataDir** dest_store) {
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash;
return Status::NotFound("Tablet not found");
}

// request specify the data dir
*dest_store = StorageEngine::instance()->get_store(dest_disk);
if (*dest_store == nullptr) {
LOG(WARNING) << "data dir not found: " << dest_disk;
return Status::NotFound("Disk not found");
}

if (tablet->data_dir() == *dest_store) {
LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
return Status::AlreadyExist("Tablet already exist in destination disk");
}

// check disk capacity
int64_t tablet_size = tablet->tablet_footprint();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
<< ", tablet size: " << tablet_size;
return Status::InternalError("Insufficient disk capacity");
}

return Status::OK();
}

Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
DataDir* dest_store) {
int64_t tablet_id = tablet->tablet_id();
int32_t schema_hash = tablet->schema_hash();
string dest_disk = dest_store->path();
EngineStorageMigrationTask engine_task(tablet, dest_store);
OLAPStatus res = StorageEngine::instance()->execute_task(&engine_task);
Status status = Status::OK();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
<< ", status:" << res;
status = Status::InternalError(strings::Substitute("migration task failed, res: $0", res));
} else {
LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
<< ", status:" << res;
}
return status;
}

} // namespace doris
79 changes: 79 additions & 0 deletions be/src/http/action/tablet_migration_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

#include "common/status.h"
#include "gen_cpp/Status_types.h"
#include "http/http_handler.h"
#include "olap/data_dir.h"
#include "olap/tablet.h"
#include "util/threadpool.h"

namespace doris {

// Migrate a tablet from a disk to another.
class TabletMigrationAction : public HttpHandler {
public:
TabletMigrationAction();
void handle(HttpRequest* req) override;
void _init_migration_action();
Status _execute_tablet_migration(TabletSharedPtr tablet, DataDir* dest_store);
Status _check_param(HttpRequest* req, int64_t& tablet_id, int32_t& schema_hash,
string& dest_disk, string& goal);
Status _check_migrate_request(int64_t tablet_id, int32_t schema_hash, string dest_disk,
TabletSharedPtr& tablet, DataDir** dest_store);

private:
std::unique_ptr<ThreadPool> _migration_thread_pool;

struct MigrationTask {
MigrationTask(int64_t tablet_id, int32_t schema_hash)
: _tablet_id(tablet_id), _schema_hash(schema_hash) {}

MigrationTask(int64_t tablet_id, int32_t schema_hash, std::string dest_disk)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _dest_disk(dest_disk) {}

bool operator<(const MigrationTask& right) const {
if (_tablet_id != right._tablet_id) {
return _tablet_id < right._tablet_id;
} else if (_schema_hash != right._schema_hash) {
return _schema_hash < right._schema_hash;
} else {
return false;
}
}

std::string to_string() const {
std::stringstream ss;
ss << "MigrationTask: tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
<< ", dest_disk=" << _dest_disk;
return ss.str();
}

int64_t _tablet_id;
int32_t _schema_hash;
std::string _dest_disk;
};

std::mutex _migration_status_mutex;
std::map<MigrationTask, std::string> _migration_tasks;
std::deque<std::pair<MigrationTask, Status>> _finished_migration_tasks;
};
} // namespace doris
5 changes: 5 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_info_action.h"
#include "http/action/update_config_action.h"
#include "http/default_path_handlers.h"
Expand Down Expand Up @@ -93,6 +94,10 @@ Status HttpService::start() {
TabletsDistributionAction* tablets_distribution_action = new TabletsDistributionAction();
_ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action);

// Register tablet migration action
TabletMigrationAction* tablet_migration_action = new TabletMigrationAction();
_ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action);

// register pprof actions
PprofActions::setup(_env, _ev_http_server.get());

Expand Down
Loading