Skip to content

Commit

Permalink
Migrate download and ingest into job manager (#3994)
Browse files Browse the repository at this point in the history
* download and ingest job

* fix comment

* remove some useless flag
  • Loading branch information
darionyaphet authored Mar 29, 2022
1 parent 02b8b39 commit 0649da8
Show file tree
Hide file tree
Showing 92 changed files with 756 additions and 2,336 deletions.
44 changes: 0 additions & 44 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "version/Version.h"
#include "webservice/Common.h"

DECLARE_int32(ws_meta_http_port);

DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
Expand Down Expand Up @@ -3530,48 +3528,6 @@ folly::Future<StatusOr<int64_t>> MetaClient::getWorkerId(std::string ipAddr) {
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
hdfsHost.c_str(),
hdfsPort,
hdfsPath.c_str(),
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile dispatch successfully") {
LOG(INFO) << "Download Successfully";
return true;
} else {
LOG(ERROR) << "Download Failed: " << result.value();
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/ingest-dispatch?space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile ingest successfully") {
LOG(INFO) << "Ingest Successfully";
return true;
} else {
LOG(ERROR) << "Ingest Failed";
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<int64_t>> MetaClient::getSegmentId(int64_t length) {
auto req = cpp2::GetSegmentIdReq();
req.length_ref() = length;
Expand Down
7 changes: 0 additions & 7 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,6 @@ class MetaClient : public BaseMetaClient {
nebula::cpp2::ErrorCode taskErrCode,
cpp2::StatsItem* statisticItem);

folly::Future<StatusOr<bool>> download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId);

folly::Future<StatusOr<bool>> ingest(GraphSpaceID spaceId);

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override;
Expand Down
4 changes: 1 addition & 3 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) {
pool->start(FLAGS_meta_http_thread_num, "http thread pool");

auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return EXIT_FAILURE;
Expand Down
19 changes: 2 additions & 17 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand All @@ -45,7 +43,7 @@ DEFINE_int32(meta_num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_num_worker_threads, 32, "Number of workers");
DEFINE_string(meta_data_path, "", "Root data path");
DECLARE_string(meta_server_addrs); // use define from grap flags.
DECLARE_int32(ws_meta_http_port);
DEFINE_int32(ws_meta_http_port, 11000, "Port to listen on Meta with HTTP protocol");
#endif

using nebula::web::PathParams;
Expand Down Expand Up @@ -160,22 +158,9 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return kvstore;
}

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool) {
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore, helper, pool);
return handler;
});
router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore, pool);
return handler;
});
router.get("/replace").handler([kvstore](PathParams&&) {
auto handler = new nebula::meta::MetaHttpReplaceHostHandler();
handler->init(kvstore);
Expand Down
5 changes: 1 addition & 4 deletions src/daemons/MetaDaemonInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId();
std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> peers,
nebula::HostAddr localhost);

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool);
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore);
#endif
7 changes: 1 addition & 6 deletions src/daemons/StandAloneDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -209,11 +207,8 @@ int main(int argc, char *argv[]) {
return;
}
LOG(INFO) << "Start http service";
auto helper = std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto pool = std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_http_thread_num, "http thread pool");
auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gMetaKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gMetaKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return;
Expand Down
2 changes: 0 additions & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ nebula_add_library(
admin/PartExecutor.cpp
admin/CharsetExecutor.cpp
admin/ShowStatsExecutor.cpp
admin/DownloadExecutor.cpp
admin/IngestExecutor.cpp
admin/ConfigExecutor.cpp
admin/ZoneExecutor.cpp
admin/ShowServiceClientsExecutor.cpp
Expand Down
8 changes: 0 additions & 8 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
#include "graph/executor/admin/ConfigExecutor.h"
#include "graph/executor/admin/CreateUserExecutor.h"
#include "graph/executor/admin/DescribeUserExecutor.h"
#include "graph/executor/admin/DownloadExecutor.h"
#include "graph/executor/admin/DropHostsExecutor.h"
#include "graph/executor/admin/DropUserExecutor.h"
#include "graph/executor/admin/GrantRoleExecutor.h"
#include "graph/executor/admin/IngestExecutor.h"
#include "graph/executor/admin/KillQueryExecutor.h"
#include "graph/executor/admin/ListRolesExecutor.h"
#include "graph/executor/admin/ListUserRolesExecutor.h"
Expand Down Expand Up @@ -518,12 +516,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kSignOutService: {
return pool->add(new SignOutServiceExecutor(node, qctx));
}
case PlanNode::Kind::kDownload: {
return pool->add(new DownloadExecutor(node, qctx));
}
case PlanNode::Kind::kIngest: {
return pool->add(new IngestExecutor(node, qctx));
}
case PlanNode::Kind::kShowSessions: {
return pool->add(new ShowSessionsExecutor(node, qctx));
}
Expand Down
33 changes: 0 additions & 33 deletions src/graph/executor/admin/DownloadExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/DownloadExecutor.h

This file was deleted.

28 changes: 0 additions & 28 deletions src/graph/executor/admin/IngestExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/IngestExecutor.h

This file was deleted.

49 changes: 0 additions & 49 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode {
: SingleDependencyNode(qctx, Kind::kShowListener, input) {}
};

class Download final : public SingleDependencyNode {
public:
static Download* make(QueryContext* qctx,
PlanNode* input,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath) {
return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath));
}

const std::string& getHdfsHost() const {
return hdfsHost_;
}

int32_t getHdfsPort() const {
return hdfsPort_;
}

const std::string& getHdfsPath() const {
return hdfsPath_;
}

private:
Download(QueryContext* qctx,
PlanNode* dep,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath)
: SingleDependencyNode(qctx, Kind::kDownload, dep),
hdfsHost_(hdfsHost),
hdfsPort_(hdfsPort),
hdfsPath_(hdfsPath) {}

private:
std::string hdfsHost_;
int32_t hdfsPort_;
std::string hdfsPath_;
};

class Ingest final : public SingleDependencyNode {
public:
static Ingest* make(QueryContext* qctx, PlanNode* dep) {
return qctx->objPool()->add(new Ingest(qctx, dep));
}

private:
Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {}
};

// User related Node
class CreateUser final : public CreateNode {
public:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "SignInService";
case Kind::kSignOutService:
return "SignOutService";
case Kind::kDownload:
return "Download";
case Kind::kIngest:
return "Ingest";
case Kind::kShowSessions:
return "ShowSessions";
case Kind::kUpdateSession:
Expand Down
2 changes: 0 additions & 2 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ class PlanNode {
kShowFTIndexes,
kSignInService,
kSignOutService,
kDownload,
kIngest,
kShowSessions,
kUpdateSession,

Expand Down
Loading

0 comments on commit 0649da8

Please sign in to comment.