diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 712b5ac10b7..bc0696f515f 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -26,8 +26,6 @@ #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" #include "meta/RootUserMan.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" #include "meta/stats/MetaStats.h" @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) { pool->start(FLAGS_meta_http_thread_num, "http thread pool"); auto webSvc = std::make_unique(); - status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get()); + status = initWebService(webSvc.get(), gKVStore.get()); if (!status.ok()) { LOG(ERROR) << "Init web service failed: " << status; return EXIT_FAILURE; diff --git a/src/daemons/MetaDaemonInit.cpp b/src/daemons/MetaDaemonInit.cpp index 4ed4d3e3f25..03685d4502d 100644 --- a/src/daemons/MetaDaemonInit.cpp +++ b/src/daemons/MetaDaemonInit.cpp @@ -23,8 +23,6 @@ #include "meta/KVBasedClusterIdMan.h" #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" #include "meta/stats/MetaStats.h" @@ -160,22 +158,9 @@ std::unique_ptr initKV(std::vector p return kvstore; } -nebula::Status initWebService(nebula::WebService* svc, - nebula::kvstore::KVStore* kvstore, - nebula::hdfs::HdfsCommandHelper* helper, - nebula::thread::GenericThreadPool* pool) { +nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) { LOG(INFO) << "Starting Meta HTTP Service"; auto& router = svc->router(); - router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpDownloadHandler(); - handler->init(kvstore, helper, pool); - return handler; - }); - router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpIngestHandler(); - handler->init(kvstore, pool); - return handler; - }); router.get("/replace").handler([kvstore](PathParams&&) { auto handler = new nebula::meta::MetaHttpReplaceHostHandler(); handler->init(kvstore); diff --git a/src/daemons/MetaDaemonInit.h b/src/daemons/MetaDaemonInit.h index 3d40ded967e..0a94ae4bbd4 100644 --- a/src/daemons/MetaDaemonInit.h +++ b/src/daemons/MetaDaemonInit.h @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId(); std::unique_ptr initKV(std::vector peers, nebula::HostAddr localhost); -nebula::Status initWebService(nebula::WebService* svc, - nebula::kvstore::KVStore* kvstore, - nebula::hdfs::HdfsCommandHelper* helper, - nebula::thread::GenericThreadPool* pool); +nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore); #endif diff --git a/src/graph/executor/CMakeLists.txt b/src/graph/executor/CMakeLists.txt index 4ed8baca7c6..9134cd88dba 100644 --- a/src/graph/executor/CMakeLists.txt +++ b/src/graph/executor/CMakeLists.txt @@ -67,8 +67,6 @@ nebula_add_library( admin/PartExecutor.cpp admin/CharsetExecutor.cpp admin/ShowStatsExecutor.cpp - admin/DownloadExecutor.cpp - admin/IngestExecutor.cpp admin/ConfigExecutor.cpp admin/ZoneExecutor.cpp admin/ShowServiceClientsExecutor.cpp diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index 2f3044f9abe..45a20a378cb 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -23,11 +23,9 @@ #include "graph/executor/admin/ConfigExecutor.h" #include "graph/executor/admin/CreateUserExecutor.h" #include "graph/executor/admin/DescribeUserExecutor.h" -#include "graph/executor/admin/DownloadExecutor.h" #include "graph/executor/admin/DropHostsExecutor.h" #include "graph/executor/admin/DropUserExecutor.h" #include "graph/executor/admin/GrantRoleExecutor.h" -#include "graph/executor/admin/IngestExecutor.h" #include "graph/executor/admin/KillQueryExecutor.h" #include "graph/executor/admin/ListRolesExecutor.h" #include "graph/executor/admin/ListUserRolesExecutor.h" @@ -517,12 +515,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kSignOutService: { return pool->add(new SignOutServiceExecutor(node, qctx)); } - case PlanNode::Kind::kDownload: { - return pool->add(new DownloadExecutor(node, qctx)); - } - case PlanNode::Kind::kIngest: { - return pool->add(new IngestExecutor(node, qctx)); - } case PlanNode::Kind::kShowSessions: { return pool->add(new ShowSessionsExecutor(node, qctx)); } diff --git a/src/graph/executor/admin/DownloadExecutor.cpp b/src/graph/executor/admin/DownloadExecutor.cpp deleted file mode 100644 index 94e44b1d897..00000000000 --- a/src/graph/executor/admin/DownloadExecutor.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/executor/admin/DownloadExecutor.h" - -#include "graph/context/QueryContext.h" -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -folly::Future DownloadExecutor::execute() { - SCOPED_TIMER(&execTime_); - auto *dNode = asNode(node()); - auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->download(dNode->getHdfsHost(), dNode->getHdfsPort(), dNode->getHdfsPath(), spaceId) - .via(runner()) - .thenValue([this](StatusOr resp) { - SCOPED_TIMER(&execTime_); - NG_RETURN_IF_ERROR(resp); - if (!resp.value()) { - return Status::Error("Download failed!"); - } - return Status::OK(); - }); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/executor/admin/DownloadExecutor.h b/src/graph/executor/admin/DownloadExecutor.h deleted file mode 100644 index c0912c9aedf..00000000000 --- a/src/graph/executor/admin/DownloadExecutor.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ -#define GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ - -#include "graph/executor/Executor.h" - -namespace nebula { -namespace graph { - -class DownloadExecutor final : public Executor { - public: - DownloadExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("DownloadExecutor", node, qctx) {} - - folly::Future execute() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ diff --git a/src/graph/executor/admin/IngestExecutor.cpp b/src/graph/executor/admin/IngestExecutor.cpp deleted file mode 100644 index 9cb397e3006..00000000000 --- a/src/graph/executor/admin/IngestExecutor.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/executor/admin/IngestExecutor.h" - -#include "graph/context/QueryContext.h" -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -folly::Future IngestExecutor::execute() { - auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->ingest(spaceId).via(runner()).thenValue( - [this](StatusOr resp) { - SCOPED_TIMER(&execTime_); - NG_RETURN_IF_ERROR(resp); - if (!resp.value()) { - return Status::Error("Ingest failed!"); - } - return Status::OK(); - }); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/executor/admin/IngestExecutor.h b/src/graph/executor/admin/IngestExecutor.h deleted file mode 100644 index 8a84c1dbf67..00000000000 --- a/src/graph/executor/admin/IngestExecutor.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ -#define GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ - -#include "graph/executor/Executor.h" - -namespace nebula { -namespace graph { - -class IngestExecutor final : public Executor { - public: - IngestExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("IngestExecutor", node, qctx) {} - - folly::Future execute() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index 2fd3526fc0b..84af26bdb51 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode { : SingleDependencyNode(qctx, Kind::kShowListener, input) {} }; -class Download final : public SingleDependencyNode { - public: - static Download* make(QueryContext* qctx, - PlanNode* input, - std::string hdfsHost, - int32_t hdfsPort, - std::string hdfsPath) { - return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath)); - } - - const std::string& getHdfsHost() const { - return hdfsHost_; - } - - int32_t getHdfsPort() const { - return hdfsPort_; - } - - const std::string& getHdfsPath() const { - return hdfsPath_; - } - - private: - Download(QueryContext* qctx, - PlanNode* dep, - std::string hdfsHost, - int32_t hdfsPort, - std::string hdfsPath) - : SingleDependencyNode(qctx, Kind::kDownload, dep), - hdfsHost_(hdfsHost), - hdfsPort_(hdfsPort), - hdfsPath_(hdfsPath) {} - - private: - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; -}; - -class Ingest final : public SingleDependencyNode { - public: - static Ingest* make(QueryContext* qctx, PlanNode* dep) { - return qctx->objPool()->add(new Ingest(qctx, dep)); - } - - private: - Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {} -}; - // User related Node class CreateUser final : public CreateNode { public: diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index 310990f759d..aec1f8ac115 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "SignInService"; case Kind::kSignOutService: return "SignOutService"; - case Kind::kDownload: - return "Download"; - case Kind::kIngest: - return "Ingest"; case Kind::kShowSessions: return "ShowSessions"; case Kind::kUpdateSession: diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index 774e57eed21..34af5fab7fe 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -174,8 +174,6 @@ class PlanNode { kShowFTIndexes, kSignInService, kSignOutService, - kDownload, - kIngest, kShowSessions, kUpdateSession, diff --git a/src/graph/service/PermissionCheck.cpp b/src/graph/service/PermissionCheck.cpp index 2ff0475b27a..8939a249ae5 100644 --- a/src/graph/service/PermissionCheck.cpp +++ b/src/graph/service/PermissionCheck.cpp @@ -11,7 +11,7 @@ namespace graph { /** * Read space : kUse, kDescribeSpace * Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot, - * kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload + * kDropSnapshot, kBalance, kAdmin, kConfig * Read schema : kDescribeTag, kDescribeEdge, * kDescribeTagIndex, kDescribeEdgeIndex * Write schema : kCreateTag, kAlterTag, kCreateEdge, @@ -68,8 +68,6 @@ namespace graph { case Sentence::Kind::kShowConfigs: case Sentence::Kind::kSetConfig: case Sentence::Kind::kGetConfig: - case Sentence::Kind::kIngest: - case Sentence::Kind::kDownload: case Sentence::Kind::kSignOutService: case Sentence::Kind::kSignInService: { return PermissionManager::canWriteSpace(session); diff --git a/src/graph/validator/AdminJobValidator.h b/src/graph/validator/AdminJobValidator.h index 31a931dda77..971abfbb045 100644 --- a/src/graph/validator/AdminJobValidator.h +++ b/src/graph/validator/AdminJobValidator.h @@ -37,14 +37,12 @@ class AdminJobValidator final : public Validator { case meta::cpp2::AdminCmd::STATS: case meta::cpp2::AdminCmd::COMPACT: case meta::cpp2::AdminCmd::FLUSH: + case meta::cpp2::AdminCmd::DOWNLOAD: + case meta::cpp2::AdminCmd::INGEST: case meta::cpp2::AdminCmd::DATA_BALANCE: case meta::cpp2::AdminCmd::LEADER_BALANCE: case meta::cpp2::AdminCmd::ZONE_BALANCE: return true; - // TODO: download and ingest need to be refactored to use the rpc protocol. - // Currently they are using their own validator - case meta::cpp2::AdminCmd::DOWNLOAD: - case meta::cpp2::AdminCmd::INGEST: case meta::cpp2::AdminCmd::UNKNOWN: return false; } diff --git a/src/graph/validator/CMakeLists.txt b/src/graph/validator/CMakeLists.txt index 722f3725a36..ac7dd18981e 100644 --- a/src/graph/validator/CMakeLists.txt +++ b/src/graph/validator/CMakeLists.txt @@ -27,8 +27,6 @@ nebula_add_library( FindPathValidator.cpp LookupValidator.cpp MatchValidator.cpp - DownloadValidator.cpp - IngestValidator.cpp ) nebula_add_subdirectory(test) diff --git a/src/graph/validator/DownloadValidator.cpp b/src/graph/validator/DownloadValidator.cpp deleted file mode 100644 index 73b4797ec5a..00000000000 --- a/src/graph/validator/DownloadValidator.cpp +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/validator/DownloadValidator.h" - -#include "graph/planner/plan/Admin.h" -#include "parser/MutateSentences.h" - -namespace nebula { -namespace graph { - -// Plan to download SST file from HDFS -Status DownloadValidator::toPlan() { - auto sentence = static_cast(sentence_); - if (sentence->host() == nullptr || sentence->port() == 0 || sentence->path() == nullptr) { - return Status::SemanticError( - "HDFS path illegal." - "Should be HDFS://${HDFS_HOST}:${HDFS_PORT}/${HDFS_PATH}"); - } - auto *doNode = - Download::make(qctx_, nullptr, *sentence->host(), sentence->port(), *sentence->path()); - root_ = doNode; - tail_ = root_; - return Status::OK(); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/validator/DownloadValidator.h b/src/graph/validator/DownloadValidator.h deleted file mode 100644 index 27ff4bab89a..00000000000 --- a/src/graph/validator/DownloadValidator.h +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ -#define GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ - -#include "graph/validator/Validator.h" -#include "parser/AdminSentences.h" - -namespace nebula { -namespace graph { - -class DownloadValidator final : public Validator { - public: - DownloadValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {} - - private: - Status validateImpl() override { - return Status::OK(); - } - - Status toPlan() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ diff --git a/src/graph/validator/IngestValidator.cpp b/src/graph/validator/IngestValidator.cpp deleted file mode 100644 index 6798e4271f0..00000000000 --- a/src/graph/validator/IngestValidator.cpp +++ /dev/null @@ -1,22 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/validator/IngestValidator.h" - -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -// Plan to ingest SST file in server side -Status IngestValidator::toPlan() { - auto *doNode = Ingest::make(qctx_, nullptr); - root_ = doNode; - tail_ = root_; - return Status::OK(); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/validator/IngestValidator.h b/src/graph/validator/IngestValidator.h deleted file mode 100644 index eb560626963..00000000000 --- a/src/graph/validator/IngestValidator.h +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_VALIDATOR_INGESTVALIDATOR_H_ -#define GRAPH_VALIDATOR_INGESTVALIDATOR_H_ - -#include "graph/validator/Validator.h" - -namespace nebula { -namespace graph { - -class IngestValidator final : public Validator { - public: - IngestValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {} - - private: - Status validateImpl() override { - return Status::OK(); - } - - Status toPlan() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_VALIDATOR_INGESTVALIDATOR_H_ diff --git a/src/graph/validator/Validator.cpp b/src/graph/validator/Validator.cpp index d87ea35bec7..58330d75132 100644 --- a/src/graph/validator/Validator.cpp +++ b/src/graph/validator/Validator.cpp @@ -16,7 +16,6 @@ #include "graph/validator/AdminJobValidator.h" #include "graph/validator/AdminValidator.h" #include "graph/validator/AssignmentValidator.h" -#include "graph/validator/DownloadValidator.h" #include "graph/validator/ExplainValidator.h" #include "graph/validator/FetchEdgesValidator.h" #include "graph/validator/FetchVerticesValidator.h" @@ -24,7 +23,6 @@ #include "graph/validator/GetSubgraphValidator.h" #include "graph/validator/GoValidator.h" #include "graph/validator/GroupByValidator.h" -#include "graph/validator/IngestValidator.h" #include "graph/validator/LimitValidator.h" #include "graph/validator/LookupValidator.h" #include "graph/validator/MaintainValidator.h" @@ -238,10 +236,6 @@ std::unique_ptr Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique(sentence, context); case Sentence::Kind::kSignOutService: return std::make_unique(sentence, context); - case Sentence::Kind::kDownload: - return std::make_unique(sentence, context); - case Sentence::Kind::kIngest: - return std::make_unique(sentence, context); case Sentence::Kind::kCreateFTIndex: return std::make_unique(sentence, context); case Sentence::Kind::kDropFTIndex: diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 1a8e6167e72..973540e0b51 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -45,8 +45,6 @@ nebula_add_library( processors/admin/CreateSnapshotProcessor.cpp processors/admin/DropSnapshotProcessor.cpp processors/admin/ListSnapshotsProcessor.cpp - processors/job/BalancePlan.cpp - processors/job/BalanceTask.cpp processors/admin/AdminClient.cpp processors/admin/SnapShot.cpp processors/admin/CreateBackupProcessor.cpp @@ -71,6 +69,10 @@ nebula_add_library( processors/job/SimpleConcurrentJobExecutor.cpp processors/job/CompactJobExecutor.cpp processors/job/FlushJobExecutor.cpp + processors/job/DownloadJobExecutor.cpp + processors/job/IngestJobExecutor.cpp + processors/job/BalancePlan.cpp + processors/job/BalanceTask.cpp processors/job/BalanceJobExecutor.cpp processors/job/ZoneBalanceJobExecutor.cpp processors/job/DataBalanceJobExecutor.cpp @@ -112,6 +114,7 @@ add_dependencies( ) set(meta_test_deps + $ $ $ $ diff --git a/src/meta/http/CMakeLists.txt b/src/meta/http/CMakeLists.txt index 34d2e95b4fe..c346be8d85d 100644 --- a/src/meta/http/CMakeLists.txt +++ b/src/meta/http/CMakeLists.txt @@ -2,12 +2,9 @@ # # This source code is licensed under Apache 2.0 License. - nebula_add_library( meta_http_handler OBJECT - MetaHttpIngestHandler.cpp - MetaHttpDownloadHandler.cpp MetaHttpReplaceHostHandler.cpp - ) +) nebula_add_subdirectory(test) diff --git a/src/meta/http/MetaHttpDownloadHandler.h b/src/meta/http/MetaHttpDownloadHandler.h deleted file mode 100644 index 77ee273c686..00000000000 --- a/src/meta/http/MetaHttpDownloadHandler.h +++ /dev/null @@ -1,67 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef META_HTTP_METAHTTPDOWNLOADHANDLER_H_ -#define META_HTTP_METAHTTPDOWNLOADHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "common/hdfs/HdfsHelper.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace meta { - -using nebula::HttpCode; - -/** - * @brief Download sst files from hdfs to every storaged download folder. - * It will send download http request to every storaged, letting them - * download the corressponding sst files. - * Functions such as onRequest, onBody... and requestComplete are inherited - * from RequestHandler, we will check request parameters in onRequest and - * call main logic in onEOM. - */ -class MetaHttpDownloadHandler : public proxygen::RequestHandler { - public: - MetaHttpDownloadHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore, - nebula::hdfs::HdfsHelper *helper, - nebula::thread::GenericThreadPool *pool); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - private: - bool dispatchSSTFiles(const std::string &host, int32_t port, const std::string &path); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; - GraphSpaceID spaceID_; - nebula::kvstore::KVStore *kvstore_; - nebula::hdfs::HdfsHelper *helper_; - nebula::thread::GenericThreadPool *pool_; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_HTTP_METAHTTPDOWNLOADHANDLER_H_ diff --git a/src/meta/http/MetaHttpIngestHandler.h b/src/meta/http/MetaHttpIngestHandler.h deleted file mode 100644 index acddf4affc8..00000000000 --- a/src/meta/http/MetaHttpIngestHandler.h +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef META_HTTP_METAHTTPINGESTHANDLER_H -#define META_HTTP_METAHTTPINGESTHANDLER_H - -#include - -#include "common/base/Base.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace meta { - -using nebula::HttpCode; - -/** - * @brief Ingest should be called after download successfully. - * It will instruct relative storaged to ingest sst files - * from local download folder by sending http request. - * It will handle one space each time. - * Functions such as onRequest, onBody... and requestComplete are inherited - * from RequestHandler, we will check request parameters in onRequest and - * call main logic in onEOM. - * - */ -class MetaHttpIngestHandler : public proxygen::RequestHandler { - public: - MetaHttpIngestHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore, nebula::thread::GenericThreadPool *pool); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - bool ingestSSTFiles(GraphSpaceID space); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - GraphSpaceID space_; - nebula::kvstore::KVStore *kvstore_; - nebula::thread::GenericThreadPool *pool_; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_HTTP_METAHTTPINGESTHANDLER_H diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index 9f116aefe5e..c4d059fb41b 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -2,37 +2,6 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_test( - NAME meta_http_download_test - SOURCES MetaHttpDownloadHandlerTest.cpp - OBJECTS - $ - $ - ${meta_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest - ) - -nebula_add_test( - NAME meta_http_ingest_test - SOURCES MetaHttpIngestHandlerTest.cpp - OBJECTS - $ - $ - ${meta_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest - ) - - nebula_add_test( NAME meta_http_replace_test diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 13723a171ad..c5dcb06e718 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -124,6 +124,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq // Check if space not exists auto spaceName = paras.back(); + LOG(INFO) << "Space Name: " << spaceName; auto spaceRet = getSpaceId(spaceName); if (!nebula::ok(spaceRet)) { auto retCode = nebula::error(spaceRet); diff --git a/src/meta/processors/job/DownloadJobExecutor.cpp b/src/meta/processors/job/DownloadJobExecutor.cpp new file mode 100644 index 00000000000..86dc77873ab --- /dev/null +++ b/src/meta/processors/job/DownloadJobExecutor.cpp @@ -0,0 +1,126 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/job/DownloadJobExecutor.h" + +#include "common/hdfs/HdfsHelper.h" +#include "common/utils/MetaKeyUtils.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +DownloadJobExecutor::DownloadJobExecutor(JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& paras) + : SimpleConcurrentJobExecutor(jobId, kvstore, adminClient, paras) { + helper_ = std::make_unique(); +} + +bool DownloadJobExecutor::check() { + if (paras_.size() != 2) { + return false; + } + + auto spaceRet = getSpaceIdFromName(paras_[1]); + if (!nebula::ok(spaceRet)) { + LOG(ERROR) << "Can't find the space: " << paras_[1]; + return false; + } + space_ = nebula::value(spaceRet); + + auto& url = paras_[0]; + std::string hdfsPrefix = "hdfs://"; + if (url.find(hdfsPrefix) != 0) { + LOG(ERROR) << "URL should start with " << hdfsPrefix; + return false; + } + + auto u = url.substr(hdfsPrefix.size(), url.size()); + std::vector tokens; + folly::split(":", u, tokens); + if (tokens.size() == 2) { + host_ = std::make_unique(tokens[0]); + int32_t position = tokens[1].find_first_of("/"); + if (position != -1) { + try { + port_ = folly::to(tokens[1].toString().substr(0, position).c_str()); + } catch (const std::exception& ex) { + LOG(ERROR) << "URL's port parse failed: " << url; + return false; + } + path_ = + std::make_unique(tokens[1].toString().substr(position, tokens[1].size())); + } else { + LOG(ERROR) << "URL Parse Failed: " << url; + return false; + } + } else { + LOG(ERROR) << "URL Parse Failed: " << url; + return false; + } + + return true; +} + +nebula::cpp2::ErrorCode DownloadJobExecutor::prepare() { + auto errOrHost = getTargetHost(space_); + if (!nebula::ok(errOrHost)) { + LOG(ERROR) << "Can't get any host according to space"; + return nebula::error(errOrHost); + } + + LOG(INFO) << "HDFS host: " << *host_.get() << " port: " << port_ << " path: " << *path_.get(); + + auto listResult = helper_->ls(*host_.get(), port_, *path_.get()); + if (!listResult.ok()) { + LOG(ERROR) << "Dispatch SSTFile Failed"; + return nebula::cpp2::ErrorCode::E_INVALID_JOB; + } + + taskParameters_.emplace_back(*host_.get()); + taskParameters_.emplace_back(folly::to(port_)); + taskParameters_.emplace_back(*path_.get()); + std::unique_ptr iter; + auto prefix = MetaKeyUtils::partPrefix(space_); + auto result = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Fetch Parts Failed"; + } + return result; +} + +folly::Future DownloadJobExecutor::executeInternal(HostAddr&& address, + std::vector&& parts) { + taskParameters_.resize(3); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::DOWNLOAD, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts)) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; +} + +nebula::cpp2::ErrorCode DownloadJobExecutor::stop() { + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/job/DownloadJobExecutor.h b/src/meta/processors/job/DownloadJobExecutor.h new file mode 100644 index 00000000000..7528ae589ef --- /dev/null +++ b/src/meta/processors/job/DownloadJobExecutor.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_DOWNLOADJOBEXECUTOR_H_ +#define META_DOWNLOADJOBEXECUTOR_H_ + +#include "common/hdfs/HdfsCommandHelper.h" +#include "meta/processors/job/SimpleConcurrentJobExecutor.h" + +namespace nebula { +namespace meta { + +class DownloadJobExecutor : public SimpleConcurrentJobExecutor { + public: + DownloadJobExecutor(JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& params); + + bool check() override; + + nebula::cpp2::ErrorCode prepare() override; + + nebula::cpp2::ErrorCode stop() override; + + protected: + folly::Future executeInternal(HostAddr&& address, + std::vector&& parts) override; + + private: + std::unique_ptr host_; + int32_t port_; + std::unique_ptr path_; + std::unique_ptr helper_; + std::vector taskParameters_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_DOWNLOADJOBEXECUTOR_H_ diff --git a/src/meta/processors/job/IngestJobExecutor.cpp b/src/meta/processors/job/IngestJobExecutor.cpp new file mode 100644 index 00000000000..0894fb67ba5 --- /dev/null +++ b/src/meta/processors/job/IngestJobExecutor.cpp @@ -0,0 +1,77 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/job/IngestJobExecutor.h" + +#include "common/utils/MetaKeyUtils.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +IngestJobExecutor::IngestJobExecutor(JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& paras) + : SimpleConcurrentJobExecutor(jobId, kvstore, adminClient, paras) {} + +bool IngestJobExecutor::check() { + return paras_.size() == 1; +} + +nebula::cpp2::ErrorCode IngestJobExecutor::prepare() { + std::string spaceName = paras_[0]; + auto errOrSpaceId = getSpaceIdFromName(spaceName); + if (!nebula::ok(errOrSpaceId)) { + LOG(ERROR) << "Can't find the space: " << spaceName; + return nebula::error(errOrSpaceId); + } + + space_ = nebula::value(errOrSpaceId); + std::unique_ptr iter; + auto prefix = MetaKeyUtils::partPrefix(space_); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Fetch Parts Failed"; + return code; + } + + while (iter->valid()) { + for (auto& host : MetaKeyUtils::parsePartVal(iter->val())) { + if (storageHosts_.count(host.host) == 0) { + storageHosts_.insert(host.host); + } + } + iter->next(); + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +folly::Future IngestJobExecutor::executeInternal(HostAddr&& address, + std::vector&& parts) { + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::INGEST, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts)) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/job/IngestJobExecutor.h b/src/meta/processors/job/IngestJobExecutor.h new file mode 100644 index 00000000000..ba28a493a4a --- /dev/null +++ b/src/meta/processors/job/IngestJobExecutor.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_INGESTJOBEXECUTOR_H_ +#define META_INGESTJOBEXECUTOR_H_ + +#include "meta/processors/job/MetaJobExecutor.h" +#include "meta/processors/job/SimpleConcurrentJobExecutor.h" + +namespace nebula { +namespace meta { + +class IngestJobExecutor : public SimpleConcurrentJobExecutor { + public: + IngestJobExecutor(JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& params); + + bool check() override; + + nebula::cpp2::ErrorCode prepare() override; + + folly::Future executeInternal(HostAddr&& address, + std::vector&& parts) override; + + private: + std::set storageHosts_; + std::vector taskParameters_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_INGESTJOBEXECUTOR_H_ diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 22bf69c3a17..7948d987734 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -3,8 +3,8 @@ * This source code is licensed under Apache 2.0 License. */ -#ifndef META_KVJOBDESCRIPTION_H_ -#define META_KVJOBDESCRIPTION_H_ +#ifndef META_JOBDESCRIPTION_H_ +#define META_JOBDESCRIPTION_H_ #include @@ -228,4 +228,4 @@ class JobDescription { } // namespace meta } // namespace nebula -#endif // META_KVJOBDESCRIPTION_H_ +#endif // META_JOBDESCRIPTION_H_ diff --git a/src/meta/processors/job/JobExecutor.cpp b/src/meta/processors/job/JobExecutor.cpp index e44d2847610..3ae6b40141c 100644 --- a/src/meta/processors/job/JobExecutor.cpp +++ b/src/meta/processors/job/JobExecutor.cpp @@ -14,7 +14,9 @@ #include "meta/processors/admin/AdminClient.h" #include "meta/processors/job/CompactJobExecutor.h" #include "meta/processors/job/DataBalanceJobExecutor.h" +#include "meta/processors/job/DownloadJobExecutor.h" #include "meta/processors/job/FlushJobExecutor.h" +#include "meta/processors/job/IngestJobExecutor.h" #include "meta/processors/job/LeaderBalanceJobExecutor.h" #include "meta/processors/job/RebuildEdgeJobExecutor.h" #include "meta/processors/job/RebuildFTJobExecutor.h" @@ -51,6 +53,15 @@ std::unique_ptr JobExecutorFactory::createJobExecutor(const JobDesc case cpp2::AdminCmd::COMPACT: ret.reset(new CompactJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; + case cpp2::AdminCmd::FLUSH: + ret.reset(new FlushJobExecutor(jd.getJobId(), store, client, jd.getParas())); + break; + case cpp2::AdminCmd::DOWNLOAD: + ret.reset(new DownloadJobExecutor(jd.getJobId(), store, client, jd.getParas())); + break; + case cpp2::AdminCmd::INGEST: + ret.reset(new IngestJobExecutor(jd.getJobId(), store, client, jd.getParas())); + break; case cpp2::AdminCmd::DATA_BALANCE: ret.reset(new DataBalanceJobExecutor(jd, store, client, jd.getParas())); break; @@ -60,9 +71,6 @@ std::unique_ptr JobExecutorFactory::createJobExecutor(const JobDesc case cpp2::AdminCmd::LEADER_BALANCE: ret.reset(new LeaderBalanceJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::FLUSH: - ret.reset(new FlushJobExecutor(jd.getJobId(), store, client, jd.getParas())); - break; case cpp2::AdminCmd::REBUILD_TAG_INDEX: ret.reset(new RebuildTagJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 7aed996f81b..10d1d6c9244 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -117,7 +117,7 @@ void JobManager::scheduleThread() { std::pair opJobId; while (status_.load(std::memory_order_acquire) == JbmgrStatus::BUSY || !try_dequeue(opJobId)) { if (status_.load(std::memory_order_acquire) == JbmgrStatus::STOPPED) { - LOG(INFO) << "[JobManager] detect shutdown called, exit"; + LOG(INFO) << "Detect shutdown called, exit"; break; } usleep(FLAGS_job_check_intervals); @@ -125,12 +125,12 @@ void JobManager::scheduleThread() { auto jobDescRet = JobDescription::loadJobDescription(opJobId.second, kvStore_); if (!nebula::ok(jobDescRet)) { - LOG(INFO) << "[JobManager] load an invalid job from queue " << opJobId.second; + LOG(INFO) << "Load an invalid job from queue " << opJobId.second; continue; // leader change or archive happened } auto jobDesc = nebula::value(jobDescRet); if (!jobDesc.setStatus(cpp2::JobStatus::RUNNING, opJobId.first == JbOp::RECOVER)) { - LOG(INFO) << "[JobManager] skip job " << opJobId.second; + LOG(INFO) << "Skip job " << opJobId.second; continue; } save(jobDesc.jobKey(), jobDesc.jobVal()); @@ -207,7 +207,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job std::lock_guard lk(muJobFinished_); auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { - LOG(INFO) << folly::sformat("can't load job, jobId={}", jobId); + LOG(INFO) << folly::sformat("Can't load job, jobId={}", jobId); if (jobStatus != cpp2::JobStatus::STOPPED) { // there is a rare condition, that when job finished, // the job description is deleted(default more than a week) @@ -231,7 +231,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job auto it = runningJobs_.find(jobId); if (it == runningJobs_.end()) { - LOG(INFO) << folly::sformat("can't find jobExecutor, jobId={}", jobId); + LOG(INFO) << folly::sformat("Can't find jobExecutor, jobId={}", jobId); return nebula::cpp2::ErrorCode::E_UNKNOWN; } std::unique_ptr& jobExec = it->second; @@ -345,7 +345,7 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& }); if (task == tasks.end()) { LOG(INFO) << folly::sformat( - "report an invalid or outdate task, will ignore this report, job={}, " + "Report an invalid or outdate task, will ignore this report, job={}, " "task={}", jobId, taskId); @@ -454,7 +454,7 @@ ErrorOr> JobManager::showJob auto jobDesc = optJob.toJobDesc(); if (isExpiredJob(jobDesc)) { lastExpiredJobId = jobDesc.get_id(); - LOG(INFO) << "remove expired job " << lastExpiredJobId; + LOG(INFO) << "Remove expired job " << lastExpiredJobId; expiredJobKeys.emplace_back(jobKey); continue; } @@ -665,7 +665,7 @@ ErrorOr JobManager::recoverJob( nebula::cpp2::ErrorCode JobManager::save(const std::string& k, const std::string& v) { std::vector data{std::make_pair(k, v)}; folly::Baton baton; - auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode rc = nebula::cpp2::ErrorCode::SUCCEEDED; kvStore_->asyncMultiPut( kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { rc = code; diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 16d0ccc61d4..f299d51da32 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -41,6 +41,8 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { FRIEND_TEST(JobManagerTest, RecoverJob); FRIEND_TEST(JobManagerTest, AddRebuildTagIndexJob); FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); + FRIEND_TEST(JobManagerTest, DownloadJob); + FRIEND_TEST(JobManagerTest, IngestJob); FRIEND_TEST(GetStatsTest, StatsJob); FRIEND_TEST(GetStatsTest, MockSingleMachineTest); FRIEND_TEST(GetStatsTest, MockMultiMachineTest); diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 1963f176299..a2c845278d1 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -107,6 +107,40 @@ TEST_F(JobManagerTest, AddRebuildEdgeIndexJob) { ASSERT_TRUE(result); } +TEST_F(JobManagerTest, DownloadJob) { + std::unique_ptr> jobMgr = getJobManager(); + // For preventting job schedule in JobManager + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + + std::vector paras{"hdfs://127.0.0.1:9000/tmp", "test_space"}; + JobDescription job(11, cpp2::AdminCmd::DOWNLOAD, paras); + auto rc = jobMgr->addJob(job, adminClient_.get()); + ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); + auto result = jobMgr->runJobInternal(job, JobManager::JbOp::ADD); + ASSERT_TRUE(result); + + job.setStatus(cpp2::JobStatus::FINISHED); + jobMgr->save(job.jobKey(), job.jobVal()); +} + +TEST_F(JobManagerTest, IngestJob) { + std::unique_ptr> jobMgr = getJobManager(); + // For preventting job schedule in JobManager + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + + std::vector paras{"test_space"}; + JobDescription job(11, cpp2::AdminCmd::INGEST, paras); + auto rc = jobMgr->addJob(job, adminClient_.get()); + ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); + auto result = jobMgr->runJobInternal(job, JobManager::JbOp::ADD); + ASSERT_TRUE(result); + + job.setStatus(cpp2::JobStatus::FINISHED); + jobMgr->save(job.jobKey(), job.jobVal()); +} + TEST_F(JobManagerTest, StatsJob) { std::unique_ptr> jobMgr = getJobManager(); // For preventing job schedule in JobManager @@ -318,7 +352,7 @@ TEST_F(JobManagerTest, ShowJob) { jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); - int32_t iJob = jd.id_; + JobID iJob = jd.id_; int32_t task1 = 0; auto host1 = toHost("127.0.0.1"); @@ -372,7 +406,7 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); - int32_t iJob = jd.id_; + JobID iJob = jd.id_; int32_t task1 = 0; auto host1 = toHost("127.0.0.1"); @@ -400,7 +434,7 @@ TEST_F(JobManagerTest, RecoverJob) { // set status to prevent running the job since AdminClient is a injector jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); - auto spaceName = "test_space"; + std::string spaceName = "test_space"; int32_t nJob = 3; for (auto i = 0; i != nJob; ++i) { JobDescription jd(i, cpp2::AdminCmd::FLUSH, {spaceName}); @@ -448,7 +482,7 @@ TEST(JobDescriptionTest, Ctor3) { } TEST(JobDescriptionTest, ParseKey) { - int32_t iJob = std::pow(2, 16); + JobID iJob = std::pow(2, 16); std::vector paras{"test_space"}; JobDescription jd(iJob, cpp2::AdminCmd::COMPACT, paras); auto sKey = jd.jobKey(); @@ -461,7 +495,7 @@ TEST(JobDescriptionTest, ParseKey) { } TEST(JobDescriptionTest, ParseVal) { - int32_t iJob = std::pow(2, 15); + JobID iJob = std::pow(2, 15); std::vector paras{"nba"}; JobDescription jd(iJob, cpp2::AdminCmd::FLUSH, paras); auto status = cpp2::JobStatus::FINISHED; @@ -481,8 +515,8 @@ TEST(JobDescriptionTest, ParseVal) { } TEST(TaskDescriptionTest, Ctor) { - int32_t iJob = std::pow(2, 4); - int32_t iTask = 0; + JobID iJob = std::pow(2, 4); + TaskID iTask = 0; auto dest = toHost(""); TaskDescription td(iJob, iTask, dest); auto status = cpp2::JobStatus::RUNNING; @@ -495,8 +529,8 @@ TEST(TaskDescriptionTest, Ctor) { } TEST(TaskDescriptionTest, ParseKey) { - int32_t iJob = std::pow(2, 5); - int32_t iTask = 0; + JobID iJob = std::pow(2, 5); + TaskID iTask = 0; std::string dest{"127.0.0.1"}; TaskDescription td(iJob, iTask, toHost(dest)); @@ -508,8 +542,8 @@ TEST(TaskDescriptionTest, ParseKey) { } TEST(TaskDescriptionTest, ParseVal) { - int32_t iJob = std::pow(2, 5); - int32_t iTask = 0; + JobID iJob = std::pow(2, 5); + TaskID iTask = 0; std::string dest{"127.0.0.1"}; TaskDescription td(iJob, iTask, toHost(dest)); @@ -528,8 +562,8 @@ TEST(TaskDescriptionTest, ParseVal) { } TEST(TaskDescriptionTest, Ctor2) { - int32_t iJob = std::pow(2, 6); - int32_t iTask = 0; + JobID iJob = std::pow(2, 6); + TaskID iTask = 0; auto dest = toHost("127.0.0.1"); TaskDescription td1(iJob, iTask, dest); diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index ec5911b88d1..f97c800ba47 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -259,15 +259,6 @@ class SpaceOptItem final { } } - std::string getGroupName() const { - if (isString()) { - return asString(); - } else { - LOG(ERROR) << "group name value illegal."; - return ""; - } - } - OptionType getOptType() const { return optType_; } diff --git a/src/parser/MutateSentences.cpp b/src/parser/MutateSentences.cpp index 077d58d1b11..09d4596b339 100644 --- a/src/parser/MutateSentences.cpp +++ b/src/parser/MutateSentences.cpp @@ -283,13 +283,4 @@ std::string DeleteEdgesSentence::toString() const { return buf; } -std::string DownloadSentence::toString() const { - return folly::stringPrintf( - "DOWNLOAD HDFS \"hdfs://%s:%d%s\"", host_.get()->c_str(), port_, path_.get()->c_str()); -} - -std::string IngestSentence::toString() const { - return "INGEST"; -} - } // namespace nebula diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 2fcc12ed31b..42347cb0b78 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -610,74 +610,5 @@ class DeleteEdgesSentence final : public Sentence { std::unique_ptr edgeKeyRef_; }; -class DownloadSentence final : public Sentence { - public: - DownloadSentence() { - kind_ = Kind::kDownload; - } - - const std::string *host() const { - return host_.get(); - } - - int32_t port() const { - return port_; - } - - void setPort(int32_t port) { - port_ = port; - } - - const std::string *path() const { - return path_.get(); - } - - void setUrl(std::string &url) { - static std::string hdfsPrefix = "hdfs://"; - if (url.find(hdfsPrefix) != 0) { - LOG(ERROR) << "URL should start with " << hdfsPrefix; - return; - } - - std::string u = url.substr(hdfsPrefix.size(), url.size()); - std::vector tokens; - folly::split(":", u, tokens); - if (tokens.size() == 2) { - host_ = std::make_unique(tokens[0]); - int32_t position = tokens[1].find_first_of("/"); - if (position != -1) { - try { - port_ = folly::to(tokens[1].toString().substr(0, position).c_str()); - } catch (const std::exception &ex) { - LOG(ERROR) << "URL's port parse failed: " << url; - return; - } - path_ = - std::make_unique(tokens[1].toString().substr(position, tokens[1].size())); - } else { - LOG(ERROR) << "URL Parse Failed: " << url; - } - } else { - LOG(ERROR) << "URL Parse Failed: " << url; - } - } - - std::string toString() const override; - - private: - std::unique_ptr host_; - int32_t port_; - std::unique_ptr path_; -}; - -class IngestSentence final : public Sentence { - public: - IngestSentence() { - kind_ = Kind::kIngest; - } - - std::string toString() const override; -}; - } // namespace nebula #endif // PARSER_MUTATESENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 9a10e5e2daa..18b998a4d65 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -97,8 +97,6 @@ class Sentence { kGrant, kRevoke, kChangePassword, - kDownload, - kIngest, kOrderBy, kShowConfigs, kSetConfig, diff --git a/src/parser/parser.yy b/src/parser/parser.yy index ceb55afe744..d2f7b0292c6 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -3207,10 +3207,18 @@ delete_tag_sentence download_sentence : KW_DOWNLOAD KW_HDFS STRING { - auto sentence = new DownloadSentence(); - sentence->setUrl(*$3); + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, + meta::cpp2::AdminCmd::DOWNLOAD); + sentence->addPara(*$3); + $$ = sentence; + } + ; + +ingest_sentence + : KW_INGEST { + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, + meta::cpp2::AdminCmd::INGEST); $$ = sentence; - delete $3; } ; @@ -3226,13 +3234,6 @@ delete_edge_sentence } ; -ingest_sentence - : KW_INGEST { - auto sentence = new IngestSentence(); - $$ = sentence; - } - ; - admin_job_sentence : KW_SUBMIT KW_JOB KW_COMPACT { auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, @@ -3244,6 +3245,17 @@ admin_job_sentence meta::cpp2::AdminCmd::FLUSH); $$ = sentence; } + | KW_SUBMIT KW_JOB KW_DOWNLOAD KW_HDFS STRING { + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, + meta::cpp2::AdminCmd::DOWNLOAD); + sentence->addPara(*$5); + $$ = sentence; + } + | KW_SUBMIT KW_JOB KW_INGEST { + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, + meta::cpp2::AdminCmd::INGEST); + $$ = sentence; + } | KW_SUBMIT KW_JOB KW_STATS { auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, meta::cpp2::AdminCmd::STATS); diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index e44b1045a73..d343081081f 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -3202,6 +3202,12 @@ TEST_F(ParserTest, JobTest) { }; checkTest("SUBMIT JOB COMPACT", "SUBMIT JOB COMPACT"); checkTest("SUBMIT JOB FLUSH", "SUBMIT JOB FLUSH"); + + checkTest("SUBMIT JOB DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\"", + "SUBMIT JOB DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\""); + + checkTest("SUBMIT JOB INGEST", "SUBMIT JOB INGEST"); + checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS"); checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE"); checkTest( diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 55874b5cc30..86d4ab2758e 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -16,6 +16,8 @@ nebula_add_library( admin/AdminTask.cpp admin/CompactTask.cpp admin/FlushTask.cpp + admin/DownloadTask.cpp + admin/IngestTask.cpp admin/RebuildIndexTask.cpp admin/RebuildTagIndexTask.cpp admin/RebuildEdgeIndexTask.cpp @@ -64,8 +66,6 @@ nebula_add_library( nebula_add_library( storage_http_handler OBJECT - http/StorageHttpIngestHandler.cpp - http/StorageHttpDownloadHandler.cpp http/StorageHttpAdminHandler.cpp http/StorageHttpStatsHandler.cpp http/StorageHttpPropertyHandler.cpp diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index 4ccdc2307c8..c10845921b4 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -27,8 +27,6 @@ #include "storage/StorageAdminServiceHandler.h" #include "storage/StorageFlags.h" #include "storage/http/StorageHttpAdminHandler.h" -#include "storage/http/StorageHttpDownloadHandler.h" -#include "storage/http/StorageHttpIngestHandler.h" #include "storage/http/StorageHttpPropertyHandler.h" #include "storage/http/StorageHttpStatsHandler.h" #include "storage/transaction/TransactionManager.h" @@ -104,16 +102,6 @@ bool StorageServer::initWebService() { webSvc_ = std::make_unique(); auto& router = webSvc_->router(); - router.get("/download").handler([this](web::PathParams&&) { - auto* handler = new storage::StorageHttpDownloadHandler(); - handler->init(hdfsHelper_.get(), webWorkers_.get(), kvstore_.get(), dataPaths_); - return handler; - }); - router.get("/ingest").handler([this](web::PathParams&&) { - auto handler = new nebula::storage::StorageHttpIngestHandler(); - handler->init(kvstore_.get()); - return handler; - }); router.get("/admin").handler([this](web::PathParams&&) { return new storage::StorageHttpAdminHandler(schemaMan_.get(), kvstore_.get()); }); diff --git a/src/storage/admin/AdminTask.cpp b/src/storage/admin/AdminTask.cpp index 154c1795d10..0ee0e2b0a9f 100644 --- a/src/storage/admin/AdminTask.cpp +++ b/src/storage/admin/AdminTask.cpp @@ -6,7 +6,9 @@ #include "storage/admin/AdminTask.h" #include "storage/admin/CompactTask.h" +#include "storage/admin/DownloadTask.h" #include "storage/admin/FlushTask.h" +#include "storage/admin/IngestTask.h" #include "storage/admin/RebuildEdgeIndexTask.h" #include "storage/admin/RebuildFTIndexTask.h" #include "storage/admin/RebuildTagIndexTask.h" @@ -37,6 +39,12 @@ std::shared_ptr AdminTaskFactory::createAdminTask(StorageEnv* env, Ta case meta::cpp2::AdminCmd::STATS: ret = std::make_shared(env, std::move(ctx)); break; + case meta::cpp2::AdminCmd::DOWNLOAD: + ret = std::make_shared(env, std::move(ctx)); + break; + case meta::cpp2::AdminCmd::INGEST: + ret = std::make_shared(env, std::move(ctx)); + break; default: break; } diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index e8437d92006..333142563fe 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -9,9 +9,6 @@ #include #include -#include "common/thrift/ThriftTypes.h" -#include "interface/gen-cpp2/meta_types.h" -#include "interface/gen-cpp2/storage_types.h" #include "kvstore/Common.h" #include "kvstore/NebulaStore.h" #include "storage/CommonUtils.h" @@ -19,6 +16,8 @@ namespace nebula { namespace storage { +using TaskFunction = std::function; + /** * @brief Subtask class for admin tasks. An admin task comprises a sequence of subtasks. * @@ -27,7 +26,7 @@ class AdminSubTask { public: AdminSubTask() = default; - explicit AdminSubTask(std::function f) : run_(f) {} + explicit AdminSubTask(TaskFunction f) : run_(f) {} /** * @brief Entry point to invoke sub tasks function. @@ -39,7 +38,7 @@ class AdminSubTask { } private: - std::function run_; + TaskFunction run_; }; enum class TaskPriority : int8_t { LO, MID, HI }; @@ -89,6 +88,12 @@ class AdminTask { virtual ~AdminTask() {} + /** + * @brief Check the argument + * + */ + virtual bool check() = 0; + /** * @brief Set the Callback object * @@ -172,7 +177,7 @@ class AdminTask { * @param rc Errorcode. */ virtual void subTaskFinish(nebula::cpp2::ErrorCode rc) { - auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode suc = nebula::cpp2::ErrorCode::SUCCEEDED; rc_.compare_exchange_strong(suc, rc); } @@ -183,7 +188,7 @@ class AdminTask { virtual void cancel() { FLOG_INFO("task(%d, %d) cancelled", ctx_.jobId_, ctx_.taskId_); canceled_ = true; - auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode suc = nebula::cpp2::ErrorCode::SUCCEEDED; rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL); } diff --git a/src/storage/admin/AdminTaskProcessor.h b/src/storage/admin/AdminTaskProcessor.h index de3f3cdb7d5..76c0beda78e 100644 --- a/src/storage/admin/AdminTaskProcessor.h +++ b/src/storage/admin/AdminTaskProcessor.h @@ -7,7 +7,6 @@ #define STORAGE_ADMIN_ADMINTASKPROCESSOR_H_ #include "common/base/Base.h" -#include "common/thrift/ThriftTypes.h" #include "interface/gen-cpp2/meta_types.h" #include "kvstore/NebulaStore.h" #include "storage/BaseProcessor.h" diff --git a/src/storage/admin/CompactTask.cpp b/src/storage/admin/CompactTask.cpp index 913a6bc6924..a746d71c427 100644 --- a/src/storage/admin/CompactTask.cpp +++ b/src/storage/admin/CompactTask.cpp @@ -10,6 +10,10 @@ namespace nebula { namespace storage { +bool CompactTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> CompactTask::genSubTasks() { std::vector ret; if (!env_->kvstore_) { diff --git a/src/storage/admin/CompactTask.h b/src/storage/admin/CompactTask.h index e01bc2d0a8d..e9479ca2dc4 100644 --- a/src/storage/admin/CompactTask.h +++ b/src/storage/admin/CompactTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_COMPACTTASK_H_ #define STORAGE_ADMIN_COMPACTTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -22,6 +21,8 @@ class CompactTask : public AdminTask { public: CompactTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + bool check() override; + /** * @brief Generate subtasks for compact. * diff --git a/src/storage/admin/DownloadTask.cpp b/src/storage/admin/DownloadTask.cpp new file mode 100644 index 00000000000..cca84d1a759 --- /dev/null +++ b/src/storage/admin/DownloadTask.cpp @@ -0,0 +1,59 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/DownloadTask.h" + +#include "common/fs/FileUtils.h" + +namespace nebula { +namespace storage { + +bool DownloadTask::check() { + return env_->kvstore_ != nullptr; +} + +ErrorOr> DownloadTask::genSubTasks() { + auto space = *ctx_.parameters_.space_id_ref(); + auto parts = *ctx_.parameters_.parts_ref(); + auto paras = ctx_.parameters_.task_specific_paras_ref(); + if (!paras.has_value() || paras->size() != 3) { + LOG(ERROR) << "Download Task should be three parameters"; + return nebula::cpp2::ErrorCode::E_INVALID_PARM; + } + + hdfsHost_ = (*paras)[0]; + hdfsPort_ = folly::to((*paras)[1]); + hdfsPath_ = (*paras)[2]; + std::vector tasks; + for (const auto& part : parts) { + TaskFunction task = std::bind(&DownloadTask::subTask, this, space, part); + tasks.emplace_back(std::move(task)); + } + return tasks; +} + +nebula::cpp2::ErrorCode DownloadTask::subTask(GraphSpaceID space, PartitionID part) { + LOG(INFO) << "Space: " << space << " Part: " << part; + auto hdfsPartPath = folly::stringPrintf("%s/%d", hdfsPath_.c_str(), part); + auto partResult = env_->kvstore_->part(space, part); + if (!ok(partResult)) { + LOG(ERROR) << "Can't found space: " << space << ", part: " << part; + return nebula::cpp2::ErrorCode::E_PART_NOT_FOUND; + } + + auto localPath = folly::stringPrintf("%s/download/", value(partResult)->engine()->getDataRoot()); + if (fs::FileUtils::fileType(localPath.c_str()) == fs::FileType::NOTEXIST) { + if (!fs::FileUtils::makeDir(localPath)) { + return nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED; + } + } + + auto result = helper_->copyToLocal(hdfsHost_, hdfsPort_, hdfsPartPath, localPath); + return result.ok() ? nebula::cpp2::ErrorCode::SUCCEEDED + : nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/DownloadTask.h b/src/storage/admin/DownloadTask.h new file mode 100644 index 00000000000..2b3f8785931 --- /dev/null +++ b/src/storage/admin/DownloadTask.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_DOWNLOADTASK_H_ +#define STORAGE_ADMIN_DOWNLOADTASK_H_ + +#include "common/hdfs/HdfsCommandHelper.h" +#include "storage/admin/AdminTask.h" + +namespace nebula { +namespace storage { + +/** + * @brief Task class to handle storage download task. + * + */ +class DownloadTask : public AdminTask { + public: + DownloadTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) { + helper_ = std::make_unique(); + } + + bool check() override; + + ErrorOr> genSubTasks() override; + + private: + nebula::cpp2::ErrorCode subTask(GraphSpaceID space, PartitionID part); + + private: + std::string hdfsPath_; + std::string hdfsHost_; + int32_t hdfsPort_; + std::unique_ptr helper_; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_ADMIN_DOWNLOADTASK_H_ diff --git a/src/storage/admin/FlushTask.cpp b/src/storage/admin/FlushTask.cpp index 6d9f4be9bac..6f6ff86e85e 100644 --- a/src/storage/admin/FlushTask.cpp +++ b/src/storage/admin/FlushTask.cpp @@ -10,12 +10,12 @@ namespace nebula { namespace storage { +bool FlushTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> FlushTask::genSubTasks() { std::vector ret; - if (!env_->kvstore_) { - return ret; - } - auto* store = dynamic_cast(env_->kvstore_); auto errOrSpace = store->space(*ctx_.parameters_.space_id_ref()); if (!ok(errOrSpace)) { diff --git a/src/storage/admin/FlushTask.h b/src/storage/admin/FlushTask.h index 243dda1e8c4..ea83a6fecf9 100644 --- a/src/storage/admin/FlushTask.h +++ b/src/storage/admin/FlushTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_FLUSHTASK_H_ #define STORAGE_ADMIN_FLUSHTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -21,6 +20,9 @@ namespace storage { class FlushTask : public AdminTask { public: FlushTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + /** * @brief Generage subtasks for flushing. * diff --git a/src/storage/admin/IngestTask.cpp b/src/storage/admin/IngestTask.cpp new file mode 100644 index 00000000000..e8ce86e9801 --- /dev/null +++ b/src/storage/admin/IngestTask.cpp @@ -0,0 +1,50 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/IngestTask.h" + +#include "common/fs/FileUtils.h" + +namespace nebula { +namespace storage { + +bool IngestTask::check() { + return env_->kvstore_ != nullptr; +} + +ErrorOr> IngestTask::genSubTasks() { + std::vector results; + auto* store = dynamic_cast(env_->kvstore_); + auto errOrSpace = store->space(*ctx_.parameters_.space_id_ref()); + if (!ok(errOrSpace)) { + LOG(ERROR) << "Space not found"; + return error(errOrSpace); + } + + auto space = nebula::value(errOrSpace); + results.emplace_back([space = space]() { + for (auto& engine : space->engines_) { + auto parts = engine->allParts(); + for (auto part : parts) { + auto path = folly::stringPrintf("%s/download/%d", engine->getDataRoot(), part); + if (!fs::FileUtils::exist(path)) { + LOG(INFO) << path << " not existed"; + continue; + } + + auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); + auto code = engine->ingest(std::vector(files)); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + return results; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/IngestTask.h b/src/storage/admin/IngestTask.h new file mode 100644 index 00000000000..e46754ac3cb --- /dev/null +++ b/src/storage/admin/IngestTask.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_INGESTTASK_H_ +#define STORAGE_ADMIN_INGESTTASK_H_ + +#include "storage/admin/AdminTask.h" + +namespace nebula { +namespace storage { + +/** + * @brief Task class to handle storage ingest task. + * + */ +class IngestTask : public AdminTask { + public: + IngestTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + + ErrorOr> genSubTasks() override; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_ADMIN_INGESTTASK_H_ diff --git a/src/storage/admin/RebuildEdgeIndexTask.cpp b/src/storage/admin/RebuildEdgeIndexTask.cpp index 0e14e2f1068..78fa54d0286 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.cpp +++ b/src/storage/admin/RebuildEdgeIndexTask.cpp @@ -48,8 +48,8 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac LOG(INFO) << "Get space edge schema failed"; return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; } - auto schemas = schemasRet.value(); + auto schemas = schemasRet.value(); auto vidSize = vidSizeRet.value(); std::unique_ptr iter; const auto& prefix = NebulaKeyUtils::edgePrefix(part); @@ -114,8 +114,8 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac iter->next(); continue; } - auto* schema = schemaIter->second.get(); + auto* schema = schemaIter->second.get(); auto ttlProp = CommonUtils::ttlProps(schema); if (ttlProp.first && CommonUtils::checkDataExpiredForTTL( schema, reader.get(), ttlProp.second.second, ttlProp.second.first)) { diff --git a/src/storage/admin/RebuildFTIndexTask.cpp b/src/storage/admin/RebuildFTIndexTask.cpp index bffb59b5d2b..fc8af8989ea 100644 --- a/src/storage/admin/RebuildFTIndexTask.cpp +++ b/src/storage/admin/RebuildFTIndexTask.cpp @@ -12,6 +12,10 @@ DECLARE_uint32(raft_heartbeat_interval_secs); namespace nebula { namespace storage { +bool RebuildFTIndexTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> RebuildFTIndexTask::genSubTasks() { std::vector tasks; VLOG(1) << "Begin rebuild fulltext indexes, space : " << *ctx_.parameters_.space_id_ref(); @@ -47,8 +51,7 @@ ErrorOr> RebuildFTIndexTask:: VLOG(1) << folly::sformat("Processing fulltext rebuild subtask, space={}, part={}", *ctx_.parameters_.space_id_ref(), part); - std::function task = - std::bind(&RebuildFTIndexTask::taskByPart, this, listener); + TaskFunction task = std::bind(&RebuildFTIndexTask::taskByPart, this, listener); tasks.emplace_back(std::move(task)); } return tasks; diff --git a/src/storage/admin/RebuildFTIndexTask.h b/src/storage/admin/RebuildFTIndexTask.h index 528e508dfaf..72835b8fd7f 100644 --- a/src/storage/admin/RebuildFTIndexTask.h +++ b/src/storage/admin/RebuildFTIndexTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ #define STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -21,6 +20,9 @@ namespace storage { class RebuildFTIndexTask : public AdminTask { public: RebuildFTIndexTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + /** * @brief Generate subtasks for rebuilding FT index. * diff --git a/src/storage/admin/RebuildIndexTask.cpp b/src/storage/admin/RebuildIndexTask.cpp index ca1988873b4..d3e883e27ba 100644 --- a/src/storage/admin/RebuildIndexTask.cpp +++ b/src/storage/admin/RebuildIndexTask.cpp @@ -14,6 +14,10 @@ namespace storage { const int32_t kReserveNum = 1024 * 4; +bool RebuildIndexTask::check() { + return env_->kvstore_ != nullptr; +} + RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) { // Rebuild index rate is limited to FLAGS_rebuild_index_part_rate_limit * SubTaskConcurrency. As @@ -26,7 +30,6 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) } ErrorOr> RebuildIndexTask::genSubTasks() { - CHECK_NOTNULL(env_->kvstore_); space_ = *ctx_.parameters_.space_id_ref(); auto parts = *ctx_.parameters_.parts_ref(); @@ -67,8 +70,7 @@ ErrorOr> RebuildIndexTask::ge for (const auto& part : parts) { env_->rebuildIndexGuard_->insert_or_assign(std::make_tuple(space_, part), IndexState::STARTING); - std::function task = - std::bind(&RebuildIndexTask::invoke, this, space_, part, items); + TaskFunction task = std::bind(&RebuildIndexTask::invoke, this, space_, part, items); tasks.emplace_back(std::move(task)); } return tasks; diff --git a/src/storage/admin/RebuildIndexTask.h b/src/storage/admin/RebuildIndexTask.h index 772de08710a..808cc4c4bad 100644 --- a/src/storage/admin/RebuildIndexTask.h +++ b/src/storage/admin/RebuildIndexTask.h @@ -29,6 +29,8 @@ class RebuildIndexTask : public AdminTask { LOG(INFO) << "Release Rebuild Task"; } + bool check() override; + /** * @brief Generate subtasks for rebuilding index. * diff --git a/src/storage/admin/StatsTask.cpp b/src/storage/admin/StatsTask.cpp index acb2ae89681..8008df3173d 100644 --- a/src/storage/admin/StatsTask.cpp +++ b/src/storage/admin/StatsTask.cpp @@ -14,8 +14,11 @@ namespace nebula { namespace storage { +bool StatsTask::check() { + return env_->kvstore_ != nullptr && env_->schemaMan_ != nullptr; +} + nebula::cpp2::ErrorCode StatsTask::getSchemas(GraphSpaceID spaceId) { - CHECK_NOTNULL(env_->schemaMan_); auto tags = env_->schemaMan_->getAllVerTagSchema(spaceId); if (!tags.ok()) { return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND; @@ -61,8 +64,7 @@ ErrorOr> StatsTask::genSubTas std::vector tasks; for (const auto& part : parts) { - std::function task = - std::bind(&StatsTask::genSubTask, this, spaceId_, part, tags_, edges_); + TaskFunction task = std::bind(&StatsTask::genSubTask, this, spaceId_, part, tags_, edges_); tasks.emplace_back(std::move(task)); } return tasks; @@ -100,7 +102,6 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, auto partitionNum = partitionNumRet.value(); LOG(INFO) << "Start stats task"; - CHECK_NOTNULL(env_->kvstore_); auto tagPrefix = NebulaKeyUtils::tagPrefix(part); std::unique_ptr tagIter; auto edgePrefix = NebulaKeyUtils::edgePrefix(part); diff --git a/src/storage/admin/StatsTask.h b/src/storage/admin/StatsTask.h index 5b353b7ff1f..56f81b411bc 100644 --- a/src/storage/admin/StatsTask.h +++ b/src/storage/admin/StatsTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_STATSTASK_H_ #define STORAGE_ADMIN_STATSTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "interface/gen-cpp2/meta_types.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" @@ -28,6 +27,8 @@ class StatsTask : public AdminTask { LOG(INFO) << "Release Stats Task"; } + bool check() override; + /** * @brief Generate sub tasks for StatsTask. * diff --git a/src/storage/admin/StopAdminTaskProcessor.h b/src/storage/admin/StopAdminTaskProcessor.h index b8363283916..b3418e50041 100644 --- a/src/storage/admin/StopAdminTaskProcessor.h +++ b/src/storage/admin/StopAdminTaskProcessor.h @@ -7,7 +7,6 @@ #define STORAGE_ADMIN_STOPADMINTASKPROCESSOR_H_ #include "common/base/Base.h" -#include "common/thrift/ThriftTypes.h" #include "kvstore/NebulaStore.h" #include "storage/BaseProcessor.h" #include "storage/StorageFlags.h" diff --git a/src/storage/http/StorageHttpDownloadHandler.h b/src/storage/http/StorageHttpDownloadHandler.h deleted file mode 100644 index 1a4d8656d22..00000000000 --- a/src/storage/http/StorageHttpDownloadHandler.h +++ /dev/null @@ -1,69 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ -#define STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "common/hdfs/HdfsHelper.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace storage { - -using nebula::HttpCode; - -/** - * @brief down load files from hdfs - * - */ -class StorageHttpDownloadHandler : public proxygen::RequestHandler { - public: - StorageHttpDownloadHandler() = default; - - void init(nebula::hdfs::HdfsHelper *helper, - nebula::thread::GenericThreadPool *pool, - nebula::kvstore::KVStore *kvstore, - std::vector paths); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - private: - bool downloadSSTFiles(const std::string &url, - int port, - const std::string &path, - const std::vector &parts); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - GraphSpaceID spaceID_; - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; - std::string partitions_; - nebula::hdfs::HdfsHelper *helper_; - nebula::thread::GenericThreadPool *pool_; - nebula::kvstore::KVStore *kvstore_; - std::vector paths_; -}; - -} // namespace storage -} // namespace nebula - -#endif // STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ diff --git a/src/storage/http/StorageHttpIngestHandler.h b/src/storage/http/StorageHttpIngestHandler.h deleted file mode 100644 index b3beebbb846..00000000000 --- a/src/storage/http/StorageHttpIngestHandler.h +++ /dev/null @@ -1,53 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ -#define STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace storage { - -using nebula::HttpCode; - -/** - * @brief ingest from from downloaded file. - * - */ -class StorageHttpIngestHandler : public proxygen::RequestHandler { - public: - StorageHttpIngestHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - bool ingestSSTFiles(GraphSpaceID space); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - nebula::kvstore::KVStore *kvstore_; - GraphSpaceID space_; -}; - -} // namespace storage -} // namespace nebula - -#endif // STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ diff --git a/src/storage/test/AdminTaskManagerTest.cpp b/src/storage/test/AdminTaskManagerTest.cpp index 6d2de138134..bfde6e3c5f0 100644 --- a/src/storage/test/AdminTaskManagerTest.cpp +++ b/src/storage/test/AdminTaskManagerTest.cpp @@ -42,12 +42,17 @@ struct HookableTask : public AdminTask { HookableTask() { fGenSubTasks = [&]() { return subTasks; }; } + ErrOrSubTasks genSubTasks() override { LOG(INFO) << "HookableTask::genSubTasks() subTasks.size()=" << subTasks.size(); return fGenSubTasks(); } - void addSubTask(std::function subTask) { + bool check() override { + return true; + } + + void addSubTask(TaskFunction subTask) { subTasks.emplace_back(subTask); } diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 5bbbf05acb3..b08f279ec9a 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -1,4 +1,5 @@ set(storage_test_deps + $ $ $ $ @@ -460,40 +461,6 @@ nebula_add_test( gtest ) -nebula_add_test( - NAME - storage_http_download_test - SOURCES - StorageHttpDownloadHandlerTest.cpp - OBJECTS - $ - $ - ${storage_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest -) - -nebula_add_test( - NAME - storage_http_ingest_test - SOURCES - StorageHttpIngestHandlerTest.cpp - OBJECTS - $ - $ - ${storage_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest -) - nebula_add_test( NAME storage_http_property_test @@ -769,3 +736,17 @@ nebula_add_executable( gtest ) +nebula_add_test( + NAME + download_and_ingest_test + SOURCES + DownloadAndIngestTest.cpp + OBJECTS + ${storage_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) \ No newline at end of file diff --git a/src/storage/test/DownloadAndIngestTest.cpp b/src/storage/test/DownloadAndIngestTest.cpp new file mode 100644 index 00000000000..1388e75f750 --- /dev/null +++ b/src/storage/test/DownloadAndIngestTest.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include + +#include "common/base/Base.h" +#include "common/fs/TempDir.h" +#include "mock/MockCluster.h" +#include "storage/admin/AdminTaskManager.h" + +namespace nebula { +namespace storage { + +class DownloadAndIngestTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + LOG(INFO) << "SetUp Download And Ingest TestCase"; + rootPath_ = std::make_unique("/tmp/DownloadAndIngestTest.XXXXXX"); + cluster_ = std::make_unique(); + cluster_->initStorageKV(rootPath_->path()); + env_ = cluster_->storageEnv_.get(); + manager_ = AdminTaskManager::instance(); + manager_->init(); + } + + static void TearDownTestCase() { + LOG(INFO) << "TearDown Download And Ingest TestCase"; + manager_->shutdown(); + cluster_.reset(); + rootPath_.reset(); + } + + void SetUp() override {} + + void TearDown() override {} + + static StorageEnv* env_; + static AdminTaskManager* manager_; + + private: + static std::unique_ptr rootPath_; + static std::unique_ptr cluster_; +}; + +StorageEnv* DownloadAndIngestTest::env_{nullptr}; +AdminTaskManager* DownloadAndIngestTest::manager_{nullptr}; +std::unique_ptr DownloadAndIngestTest::rootPath_{nullptr}; +std::unique_ptr DownloadAndIngestTest::cluster_{nullptr}; + +TEST_F(DownloadAndIngestTest, NormalCondition) {} + +} // namespace storage +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp deleted file mode 100644 index ec88273affc..00000000000 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "mock/MockCluster.h" -#include "mock/MockData.h" -#include "storage/http/StorageHttpDownloadHandler.h" -#include "storage/test/MockHdfsHelper.h" -#include "storage/test/TestUtils.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -DECLARE_string(meta_server_addrs); - -namespace nebula { -namespace storage { - -std::unique_ptr helper = std::make_unique(); - -class StorageHttpDownloadHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - - rootPath_ = std::make_unique("/tmp/StorageHttpDownloadHandler.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - - pool_ = std::make_unique(); - pool_->start(1); - - VLOG(1) << "Starting web service..."; - webSvc_ = std::make_unique(); - auto& router = webSvc_->router(); - router.get("/download").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpDownloadHandler(); - std::vector paths{rootPath_->path()}; - handler->init(helper.get(), pool_.get(), cluster_->storageEnv_->kvstore_, paths); - return handler; - }); - auto status = webSvc_->start(); - ASSERT_TRUE(status.ok()) << status; - } - - void TearDown() override { - cluster_.reset(); - rootPath_.reset(); - webSvc_.reset(); - pool_->stop(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr cluster_; - std::unique_ptr webSvc_; - std::unique_ptr rootPath_; - std::unique_ptr pool_; -}; - -TEST(StorageHttpDownloadHandlerTest, StorageDownloadTest) { - { - auto url = "/download"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_TRUE(resp.value().empty()); - } - { - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download successfully", resp.value()); - } - { - auto url = - "/download?host=127.0.0.1&port=9000&path=/" - "data&parts=illegal-part&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download failed", resp.value()); - } - { - helper = std::make_unique(); - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download failed", resp.value()); - } -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpDownloadHandlerTestEnv()); - - return RUN_ALL_TESTS(); -} diff --git a/src/storage/test/StorageHttpIngestHandlerTest.cpp b/src/storage/test/StorageHttpIngestHandlerTest.cpp deleted file mode 100644 index 0c18c19741e..00000000000 --- a/src/storage/test/StorageHttpIngestHandlerTest.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "mock/MockCluster.h" -#include "mock/MockData.h" -#include "storage/http/StorageHttpIngestHandler.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -namespace nebula { -namespace storage { - -class StorageHttpIngestHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - VLOG(1) << "Starting web service..."; - - rootPath_ = std::make_unique("/tmp/StorageHttpIngestHandler.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - - auto partPath = folly::stringPrintf("%s/disk1/nebula/0/download/0", rootPath_->path()); - ASSERT_TRUE(nebula::fs::FileUtils::makeDir(partPath)); - - auto options = rocksdb::Options(); - auto env = rocksdb::EnvOptions(); - rocksdb::SstFileWriter writer{env, options}; - - auto sstPath = folly::stringPrintf("%s/data.sst", partPath.c_str()); - auto status = writer.Open(sstPath); - ASSERT_EQ(rocksdb::Status::OK(), status); - - for (auto i = 0; i < 10; i++) { - status = writer.Put(folly::stringPrintf("key_%d", i), folly::stringPrintf("val_%d", i)); - ASSERT_EQ(rocksdb::Status::OK(), status); - } - status = writer.Finish(); - ASSERT_EQ(rocksdb::Status::OK(), status); - - webSvc_ = std::make_unique(); - auto& router = webSvc_->router(); - router.get("/ingest").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpIngestHandler(); - handler->init(cluster_->storageEnv_->kvstore_); - return handler; - }); - auto webStatus = webSvc_->start(); - ASSERT_TRUE(webStatus.ok()) << webStatus; - } - - void TearDown() override { - cluster_.reset(); - rootPath_.reset(); - webSvc_.reset(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr rootPath_; - std::unique_ptr cluster_; - std::unique_ptr webSvc_; -}; - -TEST(StorageHttpIngestHandlerTest, StorageIngestTest) { - { - auto url = "/ingest?space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile ingest successfully", resp.value()); - } - { - auto url = "/ingest?space=0"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile ingest failed", resp.value()); - } -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpIngestHandlerTestEnv()); - - return RUN_ALL_TESTS(); -} diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index efc6bdb4d52..43f0db21420 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -1,5 +1,62 @@ # note: standalone version don't have dependent test tools for now. +set(tools_test_deps + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) + if(NOT ENABLE_STANDALONE_VERSION) nebula_add_subdirectory(storage-perf) nebula_add_subdirectory(simple-kv-verify) diff --git a/src/tools/db-dump/CMakeLists.txt b/src/tools/db-dump/CMakeLists.txt index 9eb9c671658..62f9432c1fa 100644 --- a/src/tools/db-dump/CMakeLists.txt +++ b/src/tools/db-dump/CMakeLists.txt @@ -1,60 +1,3 @@ -set(tools_test_deps - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ -) - nebula_add_executable( NAME db_dump diff --git a/src/tools/db-upgrade/CMakeLists.txt b/src/tools/db-upgrade/CMakeLists.txt index 367918cb540..af0b1efbd26 100644 --- a/src/tools/db-upgrade/CMakeLists.txt +++ b/src/tools/db-upgrade/CMakeLists.txt @@ -8,59 +8,7 @@ nebula_add_executable( NebulaKeyUtilsV3.cpp DbUpgrader.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/meta-dump/CMakeLists.txt b/src/tools/meta-dump/CMakeLists.txt index 7ba2307dcb0..47fbcc1be56 100644 --- a/src/tools/meta-dump/CMakeLists.txt +++ b/src/tools/meta-dump/CMakeLists.txt @@ -4,60 +4,7 @@ nebula_add_executable( SOURCES MetaDumpTool.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index 8434a6c8104..5b5d3bdf220 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -4,58 +4,7 @@ nebula_add_executable( SOURCES SimpleKVVerifyTool.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 40a34abfa48..cccddaaadb2 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -1,66 +1,10 @@ -set(perf_test_deps - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ -) - nebula_add_executable( NAME storage_perf SOURCES StoragePerfTool.cpp OBJECTS - ${perf_test_deps} + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} @@ -83,7 +27,7 @@ nebula_add_executable( SOURCES StorageIntegrityTool.cpp OBJECTS - ${perf_test_deps} + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES}