From 653294dfb3349b0d573c5dbdeab8fefe74451fdc Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Mon, 21 Nov 2022 16:43:18 +0800 Subject: [PATCH] start elasticsearch container in CI setup listener in CI adjust use libcurl remove some useless curl impl http client with libcurl refactor es client abandon multi field in single fulltext index compile remove useless curl fix compile fix unittest add elasticsearch container service in CI fix CI fix CI --- .github/workflows/pull_request.yml | 12 + src/common/expression/test/CMakeLists.txt | 5 +- src/common/http/test/CMakeLists.txt | 1 - src/common/http/test/HttpClientTest.cpp | 2 +- src/common/plugin/fulltext/CMakeLists.txt | 10 +- src/common/plugin/fulltext/FTGraphAdapter.h | 58 ---- src/common/plugin/fulltext/FTStorageAdapter.h | 32 -- .../fulltext/elasticsearch/ESAdapter.cpp | 245 +++++++++++++ .../plugin/fulltext/elasticsearch/ESAdapter.h | 77 ++++ .../fulltext/elasticsearch/ESClient.cpp | 118 +++++++ .../plugin/fulltext/elasticsearch/ESClient.h | 50 +++ .../fulltext/elasticsearch/ESGraphAdapter.cpp | 328 ------------------ .../fulltext/elasticsearch/ESGraphAdapter.h | 116 ------- .../elasticsearch/ESStorageAdapter.cpp | 220 ------------ .../fulltext/elasticsearch/ESStorageAdapter.h | 51 --- .../plugin/fulltext/test/CMakeLists.txt | 33 +- .../fulltext/test/FulltextPluginTest.cpp | 2 - src/common/utils/test/CMakeLists.txt | 2 +- src/daemons/CMakeLists.txt | 6 +- src/graph/context/test/CMakeLists.txt | 2 +- src/graph/executor/admin/SpaceExecutor.cpp | 20 +- .../executor/maintain/FTIndexExecutor.cpp | 12 +- src/graph/executor/test/CMakeLists.txt | 2 +- src/graph/optimizer/test/CMakeLists.txt | 2 +- src/graph/util/FTIndexUtils.cpp | 171 +++------ src/graph/util/FTIndexUtils.h | 44 +-- src/graph/util/test/CMakeLists.txt | 5 +- src/graph/validator/AdminValidator.h | 1 - src/graph/validator/LookupValidator.cpp | 14 +- src/graph/validator/LookupValidator.h | 2 - src/graph/validator/MaintainValidator.cpp | 16 +- src/graph/validator/test/CMakeLists.txt | 2 +- src/graph/visitor/test/CMakeLists.txt | 2 +- src/kvstore/Listener.cpp | 2 - src/kvstore/LogEncoder.h | 2 +- .../plugins/elasticsearch/ESListener.cpp | 213 +++++------- .../plugins/elasticsearch/ESListener.h | 76 +--- src/kvstore/test/CMakeLists.txt | 15 +- src/kvstore/test/NebulaListenerTest.cpp | 63 +++- src/meta/CMakeLists.txt | 3 +- src/meta/http/test/CMakeLists.txt | 1 + src/meta/test/CMakeLists.txt | 18 + src/parser/MaintainSentences.cpp | 4 +- src/parser/MaintainSentences.h | 15 +- src/parser/parser.yy | 4 +- src/parser/test/CMakeLists.txt | 11 +- src/parser/test/fuzzing/CMakeLists.txt | 2 +- src/storage/test/CMakeLists.txt | 42 ++- src/tools/CMakeLists.txt | 3 +- src/tools/simple-kv-verify/CMakeLists.txt | 1 + src/tools/storage-perf/CMakeLists.txt | 1 + 51 files changed, 884 insertions(+), 1255 deletions(-) delete mode 100644 src/common/plugin/fulltext/FTGraphAdapter.h delete mode 100644 src/common/plugin/fulltext/FTStorageAdapter.h create mode 100644 src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp create mode 100644 src/common/plugin/fulltext/elasticsearch/ESAdapter.h create mode 100644 src/common/plugin/fulltext/elasticsearch/ESClient.cpp create mode 100644 src/common/plugin/fulltext/elasticsearch/ESClient.h delete mode 100644 src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp delete mode 100644 src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h delete mode 100644 src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp delete mode 100644 src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 97cf436339e..3ff7c9d5239 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -254,6 +254,18 @@ jobs: volumes: - /tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}:/tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }} options: --cap-add=SYS_PTRACE + services: + elasticsearch: + image: elasticsearch:7.17.7 + ports: + - 9200:9200 + env: + discovery.type: single-node + options: >- + --health-cmd "curl elasticsearch:9200" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: webiny/action-post-run@2.0.1 with: diff --git a/src/common/expression/test/CMakeLists.txt b/src/common/expression/test/CMakeLists.txt index 15baabdec6e..b2884aecd67 100644 --- a/src/common/expression/test/CMakeLists.txt +++ b/src/common/expression/test/CMakeLists.txt @@ -33,7 +33,7 @@ set(expression_test_common_libs $ $ $ - $ + $ $ $ $ @@ -47,6 +47,7 @@ set(expression_test_common_libs $ $ $ + $ ) @@ -95,6 +96,7 @@ nebula_add_test( gtest ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( @@ -110,6 +112,7 @@ nebula_add_executable( boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( diff --git a/src/common/http/test/CMakeLists.txt b/src/common/http/test/CMakeLists.txt index 06142e7da26..764b1883a08 100644 --- a/src/common/http/test/CMakeLists.txt +++ b/src/common/http/test/CMakeLists.txt @@ -23,5 +23,4 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} gtest curl - ) diff --git a/src/common/http/test/HttpClientTest.cpp b/src/common/http/test/HttpClientTest.cpp index b671de7ec9f..2d1603de4e1 100644 --- a/src/common/http/test/HttpClientTest.cpp +++ b/src/common/http/test/HttpClientTest.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. +/* Copyright (c) 2022 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ diff --git a/src/common/plugin/fulltext/CMakeLists.txt b/src/common/plugin/fulltext/CMakeLists.txt index 1f26a1f0c2d..813a7fee234 100644 --- a/src/common/plugin/fulltext/CMakeLists.txt +++ b/src/common/plugin/fulltext/CMakeLists.txt @@ -3,13 +3,9 @@ # This source code is licensed under Apache 2.0 License. nebula_add_library( - ft_es_graph_adapter_obj OBJECT - elasticsearch/ESGraphAdapter.cpp -) - -nebula_add_library( - ft_es_storage_adapter_obj OBJECT - elasticsearch/ESStorageAdapter.cpp + es_adapter_obj OBJECT + elasticsearch/ESAdapter.cpp + elasticsearch/ESClient.cpp ) nebula_add_subdirectory(test) diff --git a/src/common/plugin/fulltext/FTGraphAdapter.h b/src/common/plugin/fulltext/FTGraphAdapter.h deleted file mode 100644 index cf89ad82d08..00000000000 --- a/src/common/plugin/fulltext/FTGraphAdapter.h +++ /dev/null @@ -1,58 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTGraphAdapter { - public: - FTGraphAdapter() = default; - - virtual ~FTGraphAdapter() = default; - - virtual StatusOr prefix(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr wildcard(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr regexp(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr fuzzy(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const = 0; - - virtual StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const = 0; - - virtual StatusOr dropIndex(const HttpClient& client, const std::string& index) const = 0; - - // Clear the fulltext index data and keep the index schema. - virtual StatusOr clearIndex(const HttpClient& client, const std::string& index) const = 0; - - virtual StatusOr indexExists(const HttpClient& client, const std::string& index) const = 0; -}; -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/FTStorageAdapter.h b/src/common/plugin/fulltext/FTStorageAdapter.h deleted file mode 100644 index 2c046988f8f..00000000000 --- a/src/common/plugin/fulltext/FTStorageAdapter.h +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTStorageAdapter { - public: - virtual ~FTStorageAdapter() = default; - - virtual StatusOr put(const HttpClient& client, const DocItem& item) const = 0; - - virtual StatusOr bulk(const HttpClient& client, - const std::vector& items) const = 0; - - protected: - FTStorageAdapter() = default; -}; - -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp new file mode 100644 index 00000000000..7dd0f4bfe69 --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp @@ -0,0 +1,245 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" + +#include + +#include "fmt/printf.h" +#include "openssl/sha.h" +namespace nebula::plugin { + +using namespace fmt::literals; // NOLINT + +void ESBulk::put(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text) { + folly::dynamic action = folly::dynamic::object(); + folly::dynamic metadata = folly::dynamic::object(); + folly::dynamic body = folly::dynamic::object(); + auto docId = ESAdapter::genDocID(vid, src, dst, rank); + metadata["_id"] = docId; + metadata["_type"] = "_doc"; + metadata["_index"] = indexName; + action["index"] = std::move(metadata); + body["vid"] = vid; + body["src"] = src; + body["dst"] = dst; + body["rank"] = rank; + body["content"] = text; + documents_[docId] = {std::move(action), std::move(body)}; +} + +void ESBulk::delete_(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank) { + folly::dynamic action = folly::dynamic::object(); + folly::dynamic metadata = folly::dynamic::object(); + auto docId = ESAdapter::genDocID(vid, src, dst, rank); + metadata["_id"] = docId; + metadata["_type"] = "_doc"; + metadata["_index"] = indexName; + action["delete"] = std::move(metadata); + documents_[docId] = {std::move(action)}; +} + +bool ESBulk::empty() { + return documents_.empty(); +} + +ESAdapter::ESAdapter(std::vector&& clients) : clients_(clients) {} + +void ESAdapter::setClients(std::vector&& clients) { + clients_ = std::move(clients); +} + +Status ESAdapter::createIndex(const std::string& name) { + folly::dynamic mappings = folly::parseJson(R"( + { + "mappings":{ + "properties":{ + "vid": { + "type": "keyword" + }, + "src": { + "type": "keyword" + }, + "dst": { + "type": "keyword" + }, + "rank": { + "type": "long" + }, + "text": { + "type": "text" + } + } + } + } + )"); + auto result = randomClient().createIndex(name, mappings); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["acknowledged"].isBool() && resp["acknowledged"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::dropIndex(const std::string& name) { + auto result = randomClient().dropIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["acknowledged"].isBool() && resp["acknowledged"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::clearIndex(const std::string& name) { + auto result = randomClient().clearIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["failures"].isArray() && resp["failures"].size() == 0) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +StatusOr ESAdapter::isIndexExist(const std::string& name) { + auto result = randomClient().getIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + + if (resp[name].isObject()) { + return true; + } + auto error = resp["error"]; + if (error.isObject()) { + if (error["type"].isString() && error["type"].getString() == "index_not_found_exception") { + return false; + } + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::bulk(const ESBulk& bulk) { + std::vector jsonArray; + for (auto& [docId, objs] : bulk.documents_) { + for (auto& obj : objs) { + jsonArray.push_back(obj); + } + } + auto result = randomClient().bulk(jsonArray); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["errors"].isBool() && !resp["errors"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +StatusOr ESAdapter::prefix(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("prefix", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::wildcard(const std::string& index, const std::string& pattern) { + folly::dynamic query = + folly::dynamic::object("query", folly::dynamic::object("wildcard", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::regexp(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("regexp", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::fuzzy(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("fuzzy", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::query(const std::string& index, const folly::dynamic& query) { + auto result = randomClient().search(index, query); + if (!result.ok()) { + return std::move(result).status(); + } + auto resp = std::move(result).value(); + auto hits = resp["hits"]; + if (hits.isObject()) { + ESQueryResult res; + for (auto& hit : hits) { + auto source = hit["_source"]; + ESQueryResult::Item item; + item.text = source["text"].getString(); + item.src = source["src"].getString(); + item.dst = source["dst"].getString(); + item.rank = source["rank"].getInt(); + item.vid = source["vid"].getString(); + res.items.emplace_back(std::move(item)); + } + return res; + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +std::string ESAdapter::genDocID(const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank) { + std::string str; + unsigned char mdStr[33] = {0}; + if (!vid.empty()) { + str = vid; + } else { + str = src + dst + std::to_string(rank); + } + SHA256(reinterpret_cast(str.data()), str.size(), mdStr); + return folly::hexDump(mdStr, 32); +} + +ESClient& ESAdapter::randomClient() { + static thread_local std::default_random_engine engine; + static thread_local std::uniform_int_distribution d(0, clients_.size() - 1); + return clients_[d(engine)]; +} +} // namespace nebula::plugin diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h new file mode 100644 index 00000000000..05f172bf10e --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h @@ -0,0 +1,77 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_ESADAPTER_H_ +#define COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_ESADAPTER_H_ + +#include "common/base/StatusOr.h" +#include "common/plugin/fulltext/elasticsearch/ESClient.h" +#include "folly/container/F14Map.h" +#include "folly/dynamic.h" +namespace nebula::plugin { + +struct ESQueryResult { + struct Item { + std::string text; + std::string vid; // for vertex + std::string src, dst; // for edge + int64_t rank; // for edge + }; + std::vector items; +}; + +class ESBulk { + public: + void put(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text); + void delete_(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank); + + bool empty(); + + private: + folly::F14FastMap> documents_; + friend class ESAdapter; +}; + +class ESAdapter { + public: + explicit ESAdapter(std::vector&& clients); + ESAdapter() = default; + void setClients(std::vector&& clients); + Status createIndex(const std::string& name); + Status dropIndex(const std::string& name); + Status clearIndex(const std::string& name); + StatusOr isIndexExist(const std::string& name); + + Status bulk(const ESBulk& bulk); + StatusOr prefix(const std::string& index, const std::string& pattern); + StatusOr fuzzy(const std::string& index, const std::string& pattern); + StatusOr regexp(const std::string& index, const std::string& pattern); + StatusOr wildcard(const std::string& index, const std::string& pattern); + StatusOr query(const std::string& index, const folly::dynamic& query); + + private: + static std::string genDocID(const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank); + + ESClient& randomClient(); + std::vector clients_; + + friend class ESBulk; +}; + +} // namespace nebula::plugin + +#endif diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.cpp b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp new file mode 100644 index 00000000000..a4721e3c514 --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp @@ -0,0 +1,118 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/plugin/fulltext/elasticsearch/ESClient.h" + +#include "common/http/HttpClient.h" +namespace nebula::plugin { + +ESClient::ESClient(const std::string& protocol, + const std::string& address, + const std::string& user, + const std::string& password) + : protocol_(protocol), address_(address), user_(user), password_(password) { + // TODO(hs.zhang): enable protocol + // TODO(hs.zhang): enable user&password +} + +StatusOr ESClient::createIndex(const std::string& name, + const folly::dynamic& body) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::put(url, {"Content-Type: application/json"}, folly::toJson(body)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + auto ret = folly::parseJson(resp.body); + return ret; +} + +StatusOr ESClient::dropIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::delete_(url, {"Content-Type: application/json"}); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::clearIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}/_delete_by_query?refresh", address_, name); + std::string body = R"( + { + "query":{ + "match_all":{} + } + } + )"; + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, body); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::getIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::get(url, {"Content-Type: application/json"}); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::deleteByQuery(const std::string& index, + const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_delete_by_query", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::updateByQuery(const std::string& index, + const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_update_by_query", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +// StatusOr ESClient::put(const std::string& index, const folly::dynamic& document) +// { +// std::string url = fmt::format("http://{}/{}/_doc", address_, index); +// auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(document)); +// if (resp.curlCode != 0) { +// return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); +// } +// return folly::parseJson(resp.body); +// } + +StatusOr ESClient::search(const std::string& index, const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_search", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::bulk(const std::vector& bulk) { + std::string url = fmt::format("http://{}/_bulk", address_); + std::string body; + for (auto& obj : bulk) { + body += folly::toJson(obj); + body += "\n"; + } + auto resp = HttpClient::post(url, {"Content-Type: application/x-ndjson"}, body); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +} // namespace nebula::plugin diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.h b/src/common/plugin/fulltext/elasticsearch/ESClient.h new file mode 100644 index 00000000000..f861398b78a --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_CLIENT_ +#define COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_CLIENT_ +#include + +#include +#include + +#include "common/base/StatusOr.h" +namespace nebula::plugin { + +class ESClient { + public: + ESClient(const std::string& protocol, + const std::string& address, + const std::string& user, + const std::string& password); + + StatusOr createIndex(const std::string& name, const folly::dynamic& object); + StatusOr dropIndex(const std::string& name); + + StatusOr clearIndex(const std::string& name); + + StatusOr getIndex(const std::string& name); + + StatusOr deleteByQuery(const std::string& index, const folly::dynamic& query); + StatusOr updateByQuery(const std::string& index, const folly::dynamic& query); + StatusOr search(const std::string& index, const folly::dynamic& query); + // StatusOr put(const std::string& index, const folly::dynamic& object); + StatusOr bulk(const std::vector& bulk); + + private: + std::string protocol_; + std::string address_; + std::string user_; + std::string password_; + + StatusOr sendHttpRequest(const std::string& url, + const std::string& method, + const std::vector& header, + const std::string& body); +}; + +} // namespace nebula::plugin + +#endif diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp deleted file mode 100644 index d3395cf6c84..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" - -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { - -std::unique_ptr ESGraphAdapter::kAdapter = - std::unique_ptr(new ESGraphAdapter()); - -StatusOr ESGraphAdapter::prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kPrefix); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kWildcard); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kRegexp); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kFuzzy, fuzziness, op); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -std::string ESGraphAdapter::header() const noexcept { - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON; - return os.str(); -} - -std::string ESGraphAdapter::header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET http://127.0.0.1:9200/my_temp_index_3/_search?timeout=10ms - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XGET; - os << client.toString() << item.index << "/_search?timeout=" << limit.timeout_ << "ms" - << "\""; - return os.str(); -} - -folly::dynamic ESGraphAdapter::columnBody(const std::string& col) const noexcept { - // "term": {"column_id": "col1"} - folly::dynamic itemColumn = folly::dynamic::object("column_id", DocIDTraits::column(col)); - return folly::dynamic::object("term", itemColumn); -} - -std::string ESGraphAdapter::body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - folly::dynamic obj; - switch (type) { - case FT_SEARCH_OP::kPrefix: { - obj = prefixBody(item.val); - break; - } - case FT_SEARCH_OP::kWildcard: { - obj = wildcardBody(item.val); - break; - } - case FT_SEARCH_OP::kRegexp: { - obj = regexpBody(item.val); - break; - } - case FT_SEARCH_OP::kFuzzy: { - obj = fuzzyBody(item.val, fuzziness, op); - } - } - auto itemArray = folly::dynamic::array(columnBody(item.column), obj); - folly::dynamic itemMust = folly::dynamic::object("must", itemArray); - folly::dynamic itemBool = folly::dynamic::object("bool", itemMust); - folly::dynamic itemQuery = - folly::dynamic::object("query", itemBool)("_source", "value")("size", maxRows)("from", 0); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(itemQuery)) << "'"; - return os.str(); -} - -folly::dynamic ESGraphAdapter::prefixBody(const std::string& prefix) const noexcept { - // {"prefix": {"value": "a"}} - folly::dynamic itemValue = folly::dynamic::object("value", prefix); - return folly::dynamic::object("prefix", itemValue); -} - -folly::dynamic ESGraphAdapter::wildcardBody(const std::string& wildcard) const noexcept { - // {"wildcard": {"value": "*a"}} - folly::dynamic itemValue = folly::dynamic::object("value", wildcard); - return folly::dynamic::object("wildcard", itemValue); -} - -folly::dynamic ESGraphAdapter::regexpBody(const std::string& regexp) const noexcept { - // {"regexp": {"value": "c+"}} - folly::dynamic itemValue = folly::dynamic::object("value", regexp); - return folly::dynamic::object("regexp", itemValue); -} - -folly::dynamic ESGraphAdapter::fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - // {"match": {"value": {"query":"ccc aaa","fuzziness": "AUTO","operator": - // "and"}}} - folly::dynamic items = - folly::dynamic::object("query", regexp)("fuzziness", fuzziness)("operator", op); - folly::dynamic value = folly::dynamic::object("value", items); - return folly::dynamic::object("match", value); -} - -StatusOr ESGraphAdapter::createIndex(const HttpClient& client, - const std::string& index, - const std::string&) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/index_exist" - std::string cmd = createIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string&) const noexcept { - std::stringstream os; - os << header() << XPUT << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::dropIndex(const HttpClient& client, const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XDELETE "http://127.0.0.1:9200/index_exist" - std::string cmd = dropIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http DELETE Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XDELETE << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::clearIndex(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5" - // -d '{"query": {"match_all":{}}}' - std::string cmd = clearIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http POST Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return clearCheck(ret.value()); -} - -std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\"" - << " -d '{\"query\": {\"match_all\":{}}}'"; - return os.str(); -} - -StatusOr ESGraphAdapter::indexExists(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET "http://127.0.0.1:9200/_cat/indices/index_exist?format=json" - std::string cmd = indexExistsCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("Http GET Failed: : %s", cmd.c_str()); - } - return indexCheck(ret.value()); -} - -std::string ESGraphAdapter::indexExistsCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XGET << client.toString() << "_cat/indices/" << index << "?format=json" - << "\""; - return os.str(); -} - -bool ESGraphAdapter::result(const std::string& ret, std::vector& rows) const { - try { - auto root = folly::parseJson(ret); - auto rootHits = root.find("hits"); - if (rootHits != root.items().end()) { - auto subHits = rootHits->second.find("hits"); - if (subHits != rootHits->second.items().end()) { - for (auto& item : subHits->second) { - auto s = item.find("_source"); - if (s != item.items().end()) { - auto v = s->second.find("value"); - if (v != s->second.items().end()) { - rows.emplace_back(v->second.getString()); - } else { - continue; - } - } else { - continue; - } - } - } - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::statusCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("acknowledged"); - if (result != root.items().end() && result->second.isBool() && result->second.getBool()) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::indexCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (!root.isArray()) { - return false; - } - for (auto& entry : root) { - auto exists = entry.find("index"); - if (exists != entry.items().end()) { - return true; - } - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::clearCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("failures"); - if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h deleted file mode 100644 index beac919d59e..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESGRAPHADAPTER_H -#define NEBULA_PLUGIN_ESGRAPHADAPTER_H - -#include - -#include "common/plugin/fulltext/FTGraphAdapter.h" - -namespace nebula { -namespace plugin { -class ESGraphAdapter final : public FTGraphAdapter { - FRIEND_TEST(FulltextPluginTest, ESIndexCheckTest); - FRIEND_TEST(FulltextPluginTest, ESResultTest); - FRIEND_TEST(FulltextPluginTest, ESPrefixTest); - FRIEND_TEST(FulltextPluginTest, ESWildcardTest); - FRIEND_TEST(FulltextPluginTest, ESRegexpTest); - FRIEND_TEST(FulltextPluginTest, ESFuzzyTest); - FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest); - FRIEND_TEST(FulltextPluginTest, ESDropIndexTest); - FRIEND_TEST(FulltextPluginTest, ESClearIndexTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const override; - - StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const override; - - StatusOr dropIndex(const HttpClient& client, const std::string& index) const override; - - // Clear the fulltext index data on es and keep the index schema. - // client: es client - // index: fulltext index name - StatusOr clearIndex(const HttpClient& client, const std::string& index) const override; - - StatusOr indexExists(const HttpClient& client, const std::string& index) const override; - - private: - ESGraphAdapter() {} - - std::string header() const noexcept; - - std::string header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept; - - folly::dynamic columnBody(const std::string& col) const noexcept; - - std::string body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness = nullptr, - const std::string& op = "") const noexcept; - - folly::dynamic prefixBody(const std::string& prefix) const noexcept; - - folly::dynamic wildcardBody(const std::string& wildcard) const noexcept; - - folly::dynamic regexpBody(const std::string& regexp) const noexcept; - - folly::dynamic fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept; - - bool result(const std::string& ret, std::vector& rows) const; - - bool statusCheck(const std::string& ret) const; - - bool indexCheck(const std::string& ret) const; - - // check the result - bool clearCheck(const std::string& ret) const; - - std::string createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const noexcept; - - std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - // Encapsulates the clearIndex command. - // client: es client - // index: fulltext index name - std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept; -}; -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESGRAPHADAPTER_H diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp deleted file mode 100644 index a84e16b9fc0..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" - -#include "common/plugin/fulltext/FTUtils.h" -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { -std::unique_ptr ESStorageAdapter::kAdapter = - std::unique_ptr(new ESStorageAdapter()); - -bool ESStorageAdapter::checkPut(const std::string& ret, const std::string& cmd) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("index1", "col1", 1, 2, "aaaa"); - // - // Command should be : - // /usr/bin/curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT - // "http://127.0.0.1:9200/index1/_doc/0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa" - // // NOLINT - // -d'{"value":"aaaa","schema_id":2,"column_id":"8c43de7b01bca674276c43e09b3ec5ba"}' - // - // If successful, the result is returned: - // { - // "_primary_term": 1, - // "_shards": { - // "failed": 0, - // "total": 2, - // "successful": 1 - // }, - // "_id": - // "0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa", - // "result": "created", - // "_seq_no": 0, - // "_type": "_doc", - // "_index": "index1", - // "_version": 1 - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("result"); - if (result != root.items().end() && - (result->second.getString() == "created" || result->second.getString() == "updated")) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Command : " << cmd << "failed : " << ret; - return false; -} - -bool ESStorageAdapter::checkBulk(const std::string& ret) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("bulk_index", "col1", 1, 2, "row_1") - // ("bulk_index", "col1", 1, 2, "row_2") - // - // Command should be : - // curl -H "Content-Type: application/x-ndjson" -XPOST - // localhost:9200/_bulk -d ' { "index" : { "_index" : "bulk_index", "_id" : - // "1" } } { "schema_id" : 1 , "column_id" : "col1", "value" : "row_1"} { - // "index" : { "_index" : "bulk_index", "_id" : "2" } } { "schema_id" : 1 , - // "column_id" : "col1", "value" : "row_2"} - // ' - // - // If successful, the result is returned: - // { - // "took": 18, - // "errors": false, - // "items": [{ - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "1", - // "_version": 4, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 4, - // "_primary_term": 1, - // "status": 200 - // } - // }, { - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "2", - // "_version": 2, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 5, - // "_primary_term": 1, - // "status": 200 - // } - // }] - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("errors"); - if (result != root.items().end() && result->second.isBool() && !result->second.getBool()) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Bulk insert failed"; - VLOG(3) << ret; - return false; -} - -StatusOr ESStorageAdapter::put(const HttpClient& client, const DocItem& item) const { - auto command = putCmd(client, item); - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << command; - return Status::Error("command failed : %s", command.c_str()); - } - return checkPut(ret.value(), command); -} - -StatusOr ESStorageAdapter::bulk(const HttpClient& client, - const std::vector& items) const { - auto command = bulkCmd(client, items); - if (command.empty()) { - return true; - } - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - VLOG(3) << "Http PUT Failed"; - VLOG(3) << command; - return Status::Error("bulk command failed"); - } - return checkBulk(ret.value()); -} - -std::string ESStorageAdapter::putHeader(const HttpClient& client, - const DocItem& item) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/my_temp_index_3/_doc/part1|tag4|col4|hello" - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XPUT; - os << client.toString() << item.index << "/_doc/" << DocIDTraits::docId(item) << "\""; - return os.str(); -} - -std::string ESStorageAdapter::putBody(const DocItem& item) const noexcept { - // -d'{"column_id" : "col4", "value" : "hello"}' - folly::dynamic d = folly::dynamic::object("column_id", DocIDTraits::column(item.column))( - "value", DocIDTraits::val(item.val)); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(d)) << "'"; - return os.str(); -} - -std::string ESStorageAdapter::putCmd(const HttpClient& client, const DocItem& item) const noexcept { - std::stringstream os; - os << putHeader(client, item) << putBody(item); - return os.str(); -} - -std::string ESStorageAdapter::bulkHeader(const HttpClient& client) const noexcept { - // curl -H "Content-Type: application/x-ndjson; charset=utf-8" - // http://127.0.0.1:9200/_bulk - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_NDJSON << XPOST; - os << client.toString() << "_bulk\""; - return os.str(); -} - -std::string ESStorageAdapter::bulkBody(const std::vector& items) const noexcept { - // -d ' - // { "index" : { "_index" : "bulk_index", "_id" : "1" } } - // { "column_id" : "col1", "value" : "row_1"} - // { "index" : { "_index" : "bulk_index", "_id" : "2" } } - // { "column_id" : "col1", "value" : "row_2"} - // ' - if (items.empty()) { - return ""; - } - std::stringstream os; - os << " -d '" - << "\n"; - for (const auto& item : items) { - folly::dynamic meta = - folly::dynamic::object("_id", DocIDTraits::docId(item))("_index", item.index); - folly::dynamic data = folly::dynamic::object("value", DocIDTraits::val(item.val))( - "column_id", DocIDTraits::column(item.column)); - os << folly::toJson(folly::dynamic::object("index", meta)) << "\n"; - os << DocIDTraits::normalizedJson(folly::toJson(data)) << "\n"; - } - os << "'"; - return os.str(); -} - -std::string ESStorageAdapter::bulkCmd(const HttpClient& client, - const std::vector& items) const noexcept { - auto body = bulkBody(items); - if (body.empty()) { - return ""; - } - std::stringstream os; - os << bulkHeader(client) << body; - return os.str(); -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h deleted file mode 100644 index 323018724c6..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESSTORAGEADAPTER_H -#define NEBULA_PLUGIN_ESSTORAGEADAPTER_H - -#include - -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" - -namespace nebula { -namespace plugin { - -class ESStorageAdapter final : public FTStorageAdapter { - FRIEND_TEST(FulltextPluginTest, ESPutTest); - FRIEND_TEST(FulltextPluginTest, ESBulkTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr put(const HttpClient& client, const DocItem& item) const override; - - StatusOr bulk(const HttpClient& client, const std::vector& items) const override; - - private: - ESStorageAdapter() {} - - std::string putHeader(const HttpClient& client, const DocItem& item) const noexcept; - - std::string putBody(const DocItem& item) const noexcept; - - std::string putCmd(const HttpClient& client, const DocItem& item) const noexcept; - - std::string bulkHeader(const HttpClient& client) const noexcept; - - std::string bulkBody(const std::vector& items) const noexcept; - - std::string bulkCmd(const HttpClient& client, const std::vector& items) const noexcept; - - bool checkPut(const std::string& ret, const std::string& cmd) const; - - bool checkBulk(const std::string& ret) const; -}; - -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESSTORAGEADAPTER_H diff --git a/src/common/plugin/fulltext/test/CMakeLists.txt b/src/common/plugin/fulltext/test/CMakeLists.txt index 1446e8a34b3..26488314679 100644 --- a/src/common/plugin/fulltext/test/CMakeLists.txt +++ b/src/common/plugin/fulltext/test/CMakeLists.txt @@ -2,20 +2,19 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_test( - NAME - plugin_fulltext_test - SOURCES - FulltextPluginTest.cpp - OBJECTS - $ - $ - $ - $ - $ - $ - LIBRARIES - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - gtest -) +# nebula_add_test( +# NAME +# plugin_fulltext_test +# SOURCES +# FulltextPluginTest.cpp +# OBJECTS +# $ +# $ +# $ +# $ +# $ +# LIBRARIES +# ${THRIFT_LIBRARIES} +# ${PROXYGEN_LIBRARIES} +# gtest +# ) diff --git a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp index c17c5291f3d..10671dce966 100644 --- a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp +++ b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp @@ -8,8 +8,6 @@ #include "common/base/Base.h" #include "common/network/NetworkUtils.h" #include "common/plugin/fulltext/FTUtils.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" namespace nebula { namespace plugin { diff --git a/src/common/utils/test/CMakeLists.txt b/src/common/utils/test/CMakeLists.txt index 940999b720f..afbdc3e9de4 100644 --- a/src/common/utils/test/CMakeLists.txt +++ b/src/common/utils/test/CMakeLists.txt @@ -100,7 +100,7 @@ nebula_add_test( $ $ $ - $ + # $ LIBRARIES ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 1b2665e2941..6affb99edd3 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -53,7 +53,7 @@ set(storage_meta_deps $ $ $ - $ + $ $ $ ) @@ -146,7 +146,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ ${common_deps} @@ -233,7 +233,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ $ diff --git a/src/graph/context/test/CMakeLists.txt b/src/graph/context/test/CMakeLists.txt index b83013e5101..d01223a6696 100644 --- a/src/graph/context/test/CMakeLists.txt +++ b/src/graph/context/test/CMakeLists.txt @@ -27,7 +27,7 @@ SET(CONTEXT_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/executor/admin/SpaceExecutor.cpp b/src/graph/executor/admin/SpaceExecutor.cpp index da761521210..9aa7c4abf50 100644 --- a/src/graph/executor/admin/SpaceExecutor.cpp +++ b/src/graph/executor/admin/SpaceExecutor.cpp @@ -134,15 +134,16 @@ folly::Future DropSpaceExecutor::execute() { qctx()->rctx()->session()->setSpace(std::move(spaceInfo)); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::dropTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.dropIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(ERROR) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet; } } } @@ -189,15 +190,16 @@ folly::Future ClearSpaceExecutor::execute() { return resp.status(); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { LOG(WARNING) << "Get text search clients failed"; return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::clearTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.clearIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet; } } } diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 4b32d3939c6..92d0018ace9 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -41,14 +41,14 @@ folly::Future DropFTIndexExecutor::execute() { << "' failed: " << resp.status(); return resp.status(); } - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); } - auto ftRet = FTIndexUtils::dropTSIndex(std::move(tsRet).value(), inode->getName()); + auto esAdapter = std::move(esAdapterRet).value(); + auto ftRet = esAdapter.dropIndex(inode->getName()); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index '" << inode->getName() - << "' failed: " << ftRet.status(); + LOG(WARNING) << "Drop fulltext index '" << inode->getName() << "' failed: " << ftRet; } return Status::OK(); }); diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index af16fa5f3df..cbb5aee6994 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -34,7 +34,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/optimizer/test/CMakeLists.txt b/src/graph/optimizer/test/CMakeLists.txt index 04bd1330ba3..03c56547921 100644 --- a/src/graph/optimizer/test/CMakeLists.txt +++ b/src/graph/optimizer/test/CMakeLists.txt @@ -29,7 +29,7 @@ set(OPTIMIZER_TEST_LIB $ $ $ - $ + $ $ $ $ diff --git a/src/graph/util/FTIndexUtils.cpp b/src/graph/util/FTIndexUtils.cpp index 685c60ede1a..4f2c32c5945 100644 --- a/src/graph/util/FTIndexUtils.cpp +++ b/src/graph/util/FTIndexUtils.cpp @@ -25,8 +25,7 @@ bool FTIndexUtils::needTextSearch(const Expression* expr) { } } -StatusOr> FTIndexUtils::getTSClients( - meta::MetaClient* client) { +StatusOr<::nebula::plugin::ESAdapter> FTIndexUtils::getESAdapter(meta::MetaClient* client) { auto tcs = client->getServiceClientsFromCache(meta::cpp2::ExternalServiceType::ELASTICSEARCH); if (!tcs.ok()) { return tcs.status(); @@ -34,76 +33,30 @@ StatusOr> FTIndexUtils::getTSClients( if (tcs.value().empty()) { return Status::SemanticError("No text search client found"); } - std::vector tsClients; + std::vector<::nebula::plugin::ESClient> clients; for (const auto& c : tcs.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; - if (c.user_ref().has_value() && c.pwd_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); - } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - tsClients.emplace_back(std::move(hc)); - } - return tsClients; -} - -StatusOr FTIndexUtils::checkTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->indexExists(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("fulltext index get failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::dropTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->dropIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("drop fulltext index failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::clearTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + std::string address = c.host.toString(); + std::string user = c.user_ref().has_value() ? *c.user_ref() : ""; + std::string password = c.pwd_ref().has_value() ? *c.pwd_ref() : ""; + clients.emplace_back(protocol, address, user, password); } - return Status::Error("clear fulltext index failed : %s", index.c_str()); + return ::nebula::plugin::ESAdapter(std::move(clients)); } -StatusOr FTIndexUtils::rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients) { - auto vRet = textSearch(expr, index, tsClients); +StatusOr FTIndexUtils::rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter) { + auto vRet = textSearch(expr, index, esAdapter); if (!vRet.ok()) { - return Status::SemanticError("Text search error."); + return vRet.status(); } - if (vRet.value().empty()) { + auto result = std::move(vRet).value(); + if (result.items.empty()) { return nullptr; } - auto tsArg = static_cast(expr)->arg(); Expression* propExpr; if (isEdge) { @@ -112,8 +65,8 @@ StatusOr FTIndexUtils::rewriteTSFilter( propExpr = TagPropertyExpression::make(pool, tsArg->from(), tsArg->prop()); } std::vector rels; - for (const auto& row : vRet.value()) { - auto constExpr = ConstantExpression::make(pool, Value(row)); + for (auto& item : result.items) { + auto constExpr = ConstantExpression::make(pool, Value(item.text)); rels.emplace_back(RelationalExpression::makeEQ(pool, propExpr, constExpr)); } if (rels.size() == 1) { @@ -122,67 +75,43 @@ StatusOr FTIndexUtils::rewriteTSFilter( return ExpressionUtils::pushOrs(pool, rels); } -StatusOr> FTIndexUtils::textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients) { +StatusOr FTIndexUtils::textSearch( + Expression* expr, const std::string& index, ::nebula::plugin::ESAdapter& esAdapter) { auto tsExpr = static_cast(expr); - - nebula::plugin::DocItem doc(index, tsExpr->arg()->prop(), tsExpr->arg()->val()); - nebula::plugin::LimitItem limit(tsExpr->arg()->timeout(), tsExpr->arg()->limit()); - std::vector result; - // TODO (sky) : External index load balancing - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - StatusOr ret = Status::Error(); - switch (tsExpr->kind()) { - case Expression::Kind::kTSFuzzy: { - folly::dynamic fuzz = folly::dynamic::object(); - if (tsExpr->arg()->fuzziness() < 0) { - fuzz = "AUTO"; - } else { - fuzz = tsExpr->arg()->fuzziness(); - } - std::string op = tsExpr->arg()->op().empty() ? "or" : tsExpr->arg()->op(); - ret = nebula::plugin::ESGraphAdapter::kAdapter->fuzzy( - randomFTClient(tsClients), doc, limit, fuzz, op, result); - break; - } - case Expression::Kind::kTSPrefix: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->prefix( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSRegexp: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->regexp( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSWildcard: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->wildcard( - randomFTClient(tsClients), doc, limit, result); - break; - } - default: - return Status::SemanticError("text search expression error"); + std::string pattern = tsExpr->arg()->val(); + std::function()> execFunc; + switch (tsExpr->kind()) { + case Expression::Kind::kTSFuzzy: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.fuzzy(index, pattern); }; + break; } - if (!ret.ok()) { - continue; + case Expression::Kind::kTSPrefix: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.prefix(index, pattern); }; + break; } - if (ret.value()) { - return result; + case Expression::Kind::kTSRegexp: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.regexp(index, pattern); }; + break; + } + case Expression::Kind::kTSWildcard: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.wildcard(index, pattern); }; + break; + } + default: { + return Status::SemanticError("text search expression error"); } - return Status::SemanticError( - "External index error. " - "please check the status of fulltext cluster"); } - return Status::SemanticError("scan external index failed"); -} -const nebula::plugin::HttpClient& FTIndexUtils::randomFTClient( - const std::vector& tsClients) { - auto i = folly::Random::rand32(tsClients.size() - 1); - return tsClients[i]; + auto retryCnt = FLAGS_ft_request_retry_times > 0 ? FLAGS_ft_request_retry_times : 1; + StatusOr result; + while (retryCnt-- > 0) { + result = execFunc(); + if (!result.ok()) { + continue; + } + break; + } + return result; } } // namespace graph diff --git a/src/graph/util/FTIndexUtils.h b/src/graph/util/FTIndexUtils.h index 1231f76ddcc..14339fb80f3 100644 --- a/src/graph/util/FTIndexUtils.h +++ b/src/graph/util/FTIndexUtils.h @@ -7,7 +7,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/StatusOr.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/util/SchemaUtil.h" #include "parser/MaintainSentences.h" @@ -19,37 +19,21 @@ class FTIndexUtils final { FTIndexUtils() = delete; // Checks if the filter expression is full-text search related static bool needTextSearch(const Expression* expr); - // Checks meta client and returns the full-text search client if there is one - static StatusOr> getTSClients(meta::MetaClient* client); - // Checks if the full-text index exists - static StatusOr checkTSIndex(const std::vector& tsClients, - const std::string& index); - // Drops the full-text index - static StatusOr dropTSIndex(const std::vector& tsClients, - const std::string& index); - - // Clears the full-text index data, but keeps the index schema - static StatusOr clearTSIndex(const std::vector& tsClients, - const std::string& index); - - // Converts TextSearchExpression into a relational expression that could be pushed down - static StatusOr rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients); + + static StatusOr<::nebula::plugin::ESAdapter> getESAdapter(meta::MetaClient* client); + + // Converts TextSearchExpression into a relational expresion that could be pushed down + static StatusOr rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); // Performs full-text search using elastic search adapter - // Search type is defined by the expression kind - static StatusOr> textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients); - - // Picks a random full-text search client from the given list - static const nebula::plugin::HttpClient& randomFTClient( - const std::vector& tsClients); + // Search type is defiend by the expression kind + static StatusOr textSearch(Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); }; } // namespace graph diff --git a/src/graph/util/test/CMakeLists.txt b/src/graph/util/test/CMakeLists.txt index 5de9c601e43..0abef7eaeda 100644 --- a/src/graph/util/test/CMakeLists.txt +++ b/src/graph/util/test/CMakeLists.txt @@ -52,7 +52,6 @@ nebula_add_test( $ $ $ - $ $ $ $ @@ -72,11 +71,13 @@ nebula_add_test( $ $ $ + $ + $ LIBRARIES gtest gtest_main ${Boost_Thread_LIBRARY} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} - # curl + curl ) diff --git a/src/graph/validator/AdminValidator.h b/src/graph/validator/AdminValidator.h index b0a2a87ec10..e9f10b234d5 100644 --- a/src/graph/validator/AdminValidator.h +++ b/src/graph/validator/AdminValidator.h @@ -7,7 +7,6 @@ #define GRAPH_VALIDATOR_ADMINVALIDATOR_H_ #include "clients/meta/MetaClient.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" #include "parser/AdminSentences.h" #include "parser/MaintainSentences.h" diff --git a/src/graph/validator/LookupValidator.cpp b/src/graph/validator/LookupValidator.cpp index 2ea3be001ce..ea88bb0c2e3 100644 --- a/src/graph/validator/LookupValidator.cpp +++ b/src/graph/validator/LookupValidator.cpp @@ -505,11 +505,7 @@ StatusOr LookupValidator::checkTSExpr(Expression* expr) { auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId()); NG_RETURN_IF_ERROR(tsi); auto tsName = tsi.value().first; - auto ret = FTIndexUtils::checkTSIndex(tsClients_, tsName); - NG_RETURN_IF_ERROR(ret); - if (!ret.value()) { - return Status::SemanticError("text search index not found : %s", tsName.c_str()); - } + auto ftFields = tsi.value().second.get_fields(); auto tsExpr = static_cast(expr); auto prop = tsExpr->arg()->prop(); @@ -606,13 +602,13 @@ Status LookupValidator::getSchemaProvider(shared_ptr // Generate text search filter, check validity and rewrite StatusOr LookupValidator::genTsFilter(Expression* filter) { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - tsClients_ = std::move(tsRet).value(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); auto tsIndex = checkTSExpr(filter); NG_RETURN_IF_ERROR(tsIndex); return FTIndexUtils::rewriteTSFilter( - qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), tsClients_); + qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), esAdapter); } } // namespace graph diff --git a/src/graph/validator/LookupValidator.h b/src/graph/validator/LookupValidator.h index 9889d99971e..79ed4ce4e42 100644 --- a/src/graph/validator/LookupValidator.h +++ b/src/graph/validator/LookupValidator.h @@ -5,7 +5,6 @@ #ifndef _VALIDATOR_LOOKUP_VALIDATOR_H_ #define _VALIDATOR_LOOKUP_VALIDATOR_H_ -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" namespace nebula { @@ -56,7 +55,6 @@ class LookupValidator final : public Validator { void extractExprProps(); std::unique_ptr lookupCtx_; - std::vector tsClients_; ExpressionProps exprProps_; std::vector idxReturnCols_; std::vector schemaIds_; diff --git a/src/graph/validator/MaintainValidator.cpp b/src/graph/validator/MaintainValidator.cpp index b347fceda5b..02c93c173fe 100644 --- a/src/graph/validator/MaintainValidator.cpp +++ b/src/graph/validator/MaintainValidator.cpp @@ -10,6 +10,7 @@ #include "common/base/Status.h" #include "common/charset/Charset.h" #include "common/expression/ConstantExpression.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/planner/plan/Admin.h" #include "graph/planner/plan/Maintain.h" #include "graph/planner/plan/Query.h" @@ -601,11 +602,12 @@ Status CreateFTIndexValidator::validateImpl() { if (containUpper) { return Status::SyntaxError("Fulltext index names cannot contain uppercase letters"); } - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - auto tsIndex = FTIndexUtils::checkTSIndex(std::move(tsRet).value(), name); - NG_RETURN_IF_ERROR(tsIndex); - if (tsIndex.value()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); + auto existResult = esAdapter.isIndexExist(name); + NG_RETURN_IF_ERROR(existResult); + if (existResult.value()) { return Status::Error("text search index exist : %s", name.c_str()); } auto space = vctx_->whichSpace(); @@ -621,7 +623,7 @@ Status CreateFTIndexValidator::validateImpl() { } index_.space_id_ref() = space.id; index_.depend_schema_ref() = std::move(id); - index_.fields_ref() = sentence->fields(); + index_.fields_ref()->push_back(sentence->field()); return Status::OK(); } @@ -634,8 +636,6 @@ Status CreateFTIndexValidator::toPlan() { } Status DropFTIndexValidator::validateImpl() { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); return Status::OK(); } diff --git a/src/graph/validator/test/CMakeLists.txt b/src/graph/validator/test/CMakeLists.txt index 616fa901998..d1401408bc4 100644 --- a/src/graph/validator/test/CMakeLists.txt +++ b/src/graph/validator/test/CMakeLists.txt @@ -55,7 +55,7 @@ set(VALIDATOR_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/visitor/test/CMakeLists.txt b/src/graph/visitor/test/CMakeLists.txt index e7fa08ce8d9..caf85a5cbe1 100644 --- a/src/graph/visitor/test/CMakeLists.txt +++ b/src/graph/visitor/test/CMakeLists.txt @@ -70,7 +70,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 2bccddd26dd..0bac41055df 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -155,8 +155,6 @@ void Listener::doApply() { }); } - - void Listener::resetListener() { std::lock_guard g(raftLock_); reset(); diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 4e8b024472b..88ef21f70bc 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -200,7 +200,7 @@ class BatchHolder : public boost::noncopyable, public nebula::cpp::NonMovable { * * @return const std::vector>& */ - const std::vector>& getBatch() { + const std::vector>& getBatch() const { return batch_; } diff --git a/src/kvstore/plugins/elasticsearch/ESListener.cpp b/src/kvstore/plugins/elasticsearch/ESListener.cpp index 05121ea196b..4ea02a4e78b 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.cpp +++ b/src/kvstore/plugins/elasticsearch/ESListener.cpp @@ -5,7 +5,7 @@ #include "kvstore/plugins/elasticsearch/ESListener.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "common/utils/NebulaKeyUtils.h" DECLARE_uint32(ft_request_retry_times); @@ -25,17 +25,18 @@ void ESListener::init() { if (!cRet.ok() || cRet.value().empty()) { LOG(FATAL) << "elasticsearch clients error"; } + std::vector esClients; for (const auto& c : cRet.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; + auto host = c.host; + std::string user, password; if (c.user_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); + user = *c.user_ref(); + password = *c.pwd_ref(); } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - esClients_.emplace_back(std::move(hc)); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + esClients.emplace_back(protocol, host.toString(), user, password); } - + esAdapter_.setClients(std::move(esClients)); auto sRet = schemaMan_->toGraphSpaceName(spaceId_); if (!sRet.ok()) { LOG(FATAL) << "space name error"; @@ -43,30 +44,93 @@ void ESListener::init() { spaceName_ = std::make_unique(sRet.value()); } -bool ESListener::apply(const std::vector& data) { - std::vector docItems; - for (const auto& kv : data) { - if (!nebula::NebulaKeyUtils::isTag(vIdLen_, kv.first) && - !nebula::NebulaKeyUtils::isEdge(vIdLen_, kv.first)) { - continue; +bool ESListener::apply(const BatchHolder& batch) { + nebula::plugin::ESBulk bulk; + auto callback = [&bulk](BatchLogType type, + const std::string& index, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text) { + if (type == BatchLogType::OP_BATCH_PUT) { + bulk.put(index, vid, src, dst, rank, text); + } else if (type == BatchLogType::OP_BATCH_REMOVE) { + bulk.delete_(index, vid, src, dst, rank); + } else { + LOG(FATAL) << "Unexpect"; } - if (!appendDocItem(docItems, kv)) { + }; + for (const auto& log : batch.getBatch()) { + pickTagAndEdgeData(std::get<0>(log), std::get<1>(log), std::get<2>(log), callback); + } + if (!bulk.empty()) { + auto status = esAdapter_.bulk(bulk); + if (!status.ok()) { + LOG(ERROR) << status; return false; } - if (docItems.size() >= static_cast(FLAGS_ft_bulk_batch_size)) { - auto suc = writeData(docItems); - if (!suc) { - return suc; - } - docItems.clear(); - } - } - if (!docItems.empty()) { - return writeData(docItems); } return true; } +void ESListener::pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& callback) { + if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) { + auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); + if (reader == nullptr) { + LOG(ERROR) << "get tag reader failed, tagID " << tagId; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); + std::string text = std::move(v).getStr(); + callback(type, indexName, vid, "", "", 0, text); + } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); + if (reader == nullptr) { + LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); + std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); + int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); + std::string text = std::move(v).getStr(); + callback(type, indexName, "", src, dst, rank, text); + } +} + bool ESListener::persist(LogID lastId, TermID lastTerm, LogID lastApplyLogId) { if (!writeAppliedId(lastId, lastTerm, lastApplyLogId)) { LOG(FATAL) << "last apply ids write failed"; @@ -146,105 +210,6 @@ std::string ESListener::encodeAppliedId(LogID lastId, return val; } -bool ESListener::appendDocItem(std::vector& items, const KV& kv) const { - auto isEdge = NebulaKeyUtils::isEdge(vIdLen_, kv.first); - return isEdge ? appendEdgeDocItem(items, kv) : appendTagDocItem(items, kv); -} - -bool ESListener::appendEdgeDocItem(std::vector& items, const KV& kv) const { - auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, edgeType); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, kv.second); - if (reader == nullptr) { - VLOG(3) << "get edge reader failed, schema ID " << edgeType; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendTagDocItem(std::vector& items, const KV& kv) const { - auto tagId = NebulaKeyUtils::getTagId(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, tagId); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, kv.second); - if (reader == nullptr) { - VLOG(3) << "get tag reader failed, tagID " << tagId; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const { - for (const auto& field : fti.second.get_fields()) { - auto v = reader->getValueByName(field); - if (v.type() != Value::Type::STRING) { - continue; - } - items.emplace_back(DocItem(fti.first, field, partId_, std::move(v).getStr())); - } - return true; -} - -bool ESListener::writeData(const std::vector& items) const { - bool isNeedWriteOneByOne = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->bulk(esClients_[index], items); - if (!suc.ok()) { - VLOG(3) << "bulk failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - isNeedWriteOneByOne = true; - break; - } - return true; - } - if (isNeedWriteOneByOne) { - return writeDatum(items); - } - LOG(WARNING) << idStr_ << "Failed to bulk into es."; - return false; -} - -bool ESListener::writeDatum(const std::vector& items) const { - bool done = false; - for (const auto& item : items) { - done = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->put(esClients_[index], item); - if (!suc.ok()) { - VLOG(3) << "put failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - // TODO (sky) : Record failed data - break; - } - done = true; - break; - } - if (!done) { - // means CURL fails, and no need to take the next step - LOG(INFO) << idStr_ << "Failed to put into es."; - return false; - } - } - return true; -} - void ESListener::processLogs() { std::unique_ptr iter; { diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 19c9fd8c73f..f9e2780cd76 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -7,14 +7,11 @@ #define KVSTORE_PLUGINS_ES_LISTENER_H_ #include "codec/RowReaderWrapper.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "kvstore/Listener.h" - namespace nebula { namespace kvstore { -using nebula::plugin::DocItem; - class ESListener : public Listener { public: /** @@ -56,7 +53,8 @@ class ESListener : public Listener { * @param data Key/value to apply * @return True if succeed. False if failed. */ - bool apply(const std::vector& data); + bool apply(const BatchHolder& batch); + /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId @@ -106,66 +104,22 @@ class ESListener : public Listener { */ std::string encodeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId) const noexcept; - /** - * @brief Convert key value to DocItem - * - * @param items DocItems to send - * @param kv Key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert edge key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendEdgeDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert tag key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendTagDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Add the fulltext index field to DocItem - * - * @param items DocItems to send - * @param reader Key/value's reader - * @param fti Fulltext index schema - * @return Whether append DocItem succeed - */ - bool appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const; - - /** - * @brief Bulk DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeData(const std::vector& items) const; - - /** - * @brief Put DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeDatum(const std::vector& items) const; - private: meta::SchemaManager* schemaMan_{nullptr}; + using PickFunc = std::function; + void pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& func); std::unique_ptr lastApplyLogFile_{nullptr}; std::unique_ptr spaceName_{nullptr}; - std::vector esClients_; + ::nebula::plugin::ESAdapter esAdapter_; int32_t vIdLen_; }; diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index 32ab49bbbf7..50761adadda 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -36,7 +36,7 @@ set(KVSTORE_TEST_LIBS $ $ $ - $ + $ $ $ $ @@ -45,6 +45,7 @@ set(KVSTORE_TEST_LIBS $ $ $ + $ ) nebula_add_test( @@ -60,6 +61,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -75,6 +77,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -90,6 +93,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -105,6 +109,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -120,6 +125,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -135,6 +141,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -150,6 +157,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -165,6 +173,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -180,6 +189,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -196,6 +206,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_executable( @@ -213,6 +224,7 @@ nebula_add_executable( follybenchmark gtest boost_regex + curl ) nebula_add_test( @@ -228,4 +240,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 816c2093e80..5d91fba3cf1 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include "common/base/Base.h" #include "common/fs/FileUtils.h" #include "common/fs/TempDir.h" @@ -44,7 +46,11 @@ class DummyListener : public Listener { : Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {} std::vector data() { - return data_; + std::vector ret; + for (auto& [key, value] : data_) { + ret.emplace_back(key, value); + } + return ret; } std::tuple commitSnapshot(const std::vector& rows, @@ -186,10 +192,30 @@ class DummyListener : public Listener { protected: void init() override {} - bool apply(const std::vector& kvs) { - for (const auto& kv : kvs) { - data_.emplace_back(kv); + bool apply(const BatchHolder& batch) override { + for (auto& log : batch.getBatch()) { + switch (std::get<0>(log)) { + case BatchLogType::OP_BATCH_PUT: { + data_[std::get<1>(log)] = std::get<2>(log); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + data_.erase(std::get<1>(log)); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + auto iter = data_.lower_bound(std::get<1>(log)); + while (iter != data_.end()) { + if (iter->first < std::get<2>(log)) { + iter = data_.erase(iter); + } else { + break; + } + } + } + } } + return true; } @@ -214,7 +240,7 @@ class DummyListener : public Listener { } private: - std::vector data_; + std::map data_; std::pair committedSnapshot_{0, 0}; int32_t snapshotBatchCount_{0}; }; @@ -450,9 +476,14 @@ TEST_P(ListenerBasicTest, SimpleTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -520,9 +551,14 @@ TEST_P(ListenerBasicTest, TransLeaderTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(200, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -554,9 +590,14 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 0c31c9404c2..a325edfc38a 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -163,7 +163,7 @@ set(meta_test_deps $ $ $ - $ + $ $ $ $ @@ -172,6 +172,7 @@ set(meta_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index c4d059fb41b..6f54cc20ba8 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -17,4 +17,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 52146ed1915..f963bb4f1bf 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -15,6 +15,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -30,6 +31,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -45,6 +47,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -60,6 +63,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -69,12 +73,14 @@ nebula_add_test( MetaClientTest.cpp OBJECTS ${meta_test_deps} + $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -90,6 +96,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -107,6 +114,7 @@ nebula_add_test( wangle gtest gmock + curl ) @@ -145,6 +153,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -161,6 +170,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -176,6 +186,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -192,6 +203,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -208,6 +220,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -223,6 +236,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -238,6 +252,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -253,6 +268,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -268,6 +284,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -283,4 +300,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/parser/MaintainSentences.cpp b/src/parser/MaintainSentences.cpp index d882e5b7873..308c3736914 100644 --- a/src/parser/MaintainSentences.cpp +++ b/src/parser/MaintainSentences.cpp @@ -495,9 +495,7 @@ std::string CreateFTIndexSentence::toString() const { buf += *schemaName_; buf += "("; std::vector fieldDefs; - for (const auto& field : fields()) { - fieldDefs.emplace_back(field); - } + fieldDefs.emplace_back(field()); std::string fields; folly::join(", ", fieldDefs, fields); buf += fields; diff --git a/src/parser/MaintainSentences.h b/src/parser/MaintainSentences.h index cc56f3cdc43..6a2c59c4730 100644 --- a/src/parser/MaintainSentences.h +++ b/src/parser/MaintainSentences.h @@ -1127,11 +1127,11 @@ class CreateFTIndexSentence final : public Sentence { CreateFTIndexSentence(bool isEdge, std::string *indexName, std::string *schemaName, - NameLabelList *fields) { + std::string *field) { isEdge_ = isEdge; indexName_.reset(indexName); schemaName_.reset(schemaName); - fields_.reset(fields); + field_.reset(field); kind_ = Kind::kCreateFTIndex; } @@ -1148,20 +1148,15 @@ class CreateFTIndexSentence final : public Sentence { return schemaName_.get(); } - std::vector fields() const { - std::vector result; - auto fields = fields_->labels(); - result.resize(fields.size()); - auto get = [](auto ptr) { return *ptr; }; - std::transform(fields.begin(), fields.end(), result.begin(), get); - return result; + std::string field() const { + return *field_; } private: bool isEdge_; std::unique_ptr indexName_; std::unique_ptr schemaName_; - std::unique_ptr fields_; + std::unique_ptr field_; }; class DropFTIndexSentence final : public Sentence { public: diff --git a/src/parser/parser.yy b/src/parser/parser.yy index bfd06536385..264af87946c 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -2693,10 +2693,10 @@ create_edge_index_sentence ; create_fulltext_index_sentence - : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(false, $5, $7, $9); } - | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(true, $5, $7, $9); } ; diff --git a/src/parser/test/CMakeLists.txt b/src/parser/test/CMakeLists.txt index 27762d80742..474a29b7cfc 100644 --- a/src/parser/test/CMakeLists.txt +++ b/src/parser/test/CMakeLists.txt @@ -32,7 +32,8 @@ set(PARSER_TEST_LIBS $ $ $ - $ + $ + $ $ $ $ @@ -60,7 +61,7 @@ nebula_add_test( SOURCES ParserTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( @@ -68,21 +69,21 @@ nebula_add_test( SOURCES ScannerTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_executable( NAME parser_bm SOURCES ParserBenchmark.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( NAME expression_parsing_test SOURCES ExpressionParsingTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) if(ENABLE_FUZZ_TEST) diff --git a/src/parser/test/fuzzing/CMakeLists.txt b/src/parser/test/fuzzing/CMakeLists.txt index 2de1ac7fd94..c2814163cdb 100644 --- a/src/parser/test/fuzzing/CMakeLists.txt +++ b/src/parser/test/fuzzing/CMakeLists.txt @@ -7,5 +7,5 @@ nebula_add_test( FUZZER ON SOURCES ParserFuzzer.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle + LIBRARIES ${THRIFT_LIBRARIES} wangle curl ) diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 39bc5dfd5cb..22318a37669 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -52,7 +52,7 @@ set(storage_test_deps $ $ $ - $ + $ $ $ $ @@ -60,6 +60,7 @@ set(storage_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) @@ -82,6 +83,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -97,6 +99,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -112,6 +115,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -127,6 +131,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -142,6 +147,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -157,6 +163,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -172,6 +179,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -187,6 +195,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -202,6 +211,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -217,6 +227,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -232,6 +243,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -248,6 +260,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) @@ -265,6 +278,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -280,6 +294,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -295,6 +310,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -310,6 +326,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -327,6 +344,7 @@ nebula_add_executable( gtest follybenchmark boost_regex + curl ) nebula_add_executable( @@ -342,6 +360,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -357,6 +376,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -372,6 +392,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -391,6 +412,7 @@ nebula_add_executable( follybenchmark boost_regex gtest + curl ) nebula_add_test( @@ -407,6 +429,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -446,6 +469,7 @@ nebula_add_executable( wangle boost_regex gtest + curl ) nebula_add_test( @@ -461,6 +485,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -514,6 +539,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -529,6 +555,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -544,6 +571,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) #nebula_add_executable( @@ -575,6 +603,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -590,6 +619,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -605,6 +635,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -625,6 +656,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -645,6 +677,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -665,6 +698,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -680,6 +714,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -696,6 +731,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_test( @@ -711,6 +747,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -725,6 +762,7 @@ nebula_add_test( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) nebula_add_executable( @@ -740,6 +778,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -757,4 +796,5 @@ nebula_add_executable( boost_regex wangle gtest + curl ) diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index d6c2d226143..05d3454376d 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -56,10 +56,11 @@ set(tools_test_deps $ $ $ - $ + $ $ $ $ + $ ) if(NOT ENABLE_STANDALONE_VERSION) diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index 850c4c56d09..1113afcc022 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -15,4 +15,5 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 518aa7f7718..7be1bd82fd1 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -38,6 +38,7 @@ nebula_add_executable( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) #install(