diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 97cf436339e..3ff7c9d5239 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -254,6 +254,18 @@ jobs: volumes: - /tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}:/tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }} options: --cap-add=SYS_PTRACE + services: + elasticsearch: + image: elasticsearch:7.17.7 + ports: + - 9200:9200 + env: + discovery.type: single-node + options: >- + --health-cmd "curl elasticsearch:9200" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: webiny/action-post-run@2.0.1 with: diff --git a/src/common/expression/test/CMakeLists.txt b/src/common/expression/test/CMakeLists.txt index 15baabdec6e..b2884aecd67 100644 --- a/src/common/expression/test/CMakeLists.txt +++ b/src/common/expression/test/CMakeLists.txt @@ -33,7 +33,7 @@ set(expression_test_common_libs $ $ $ - $ + $ $ $ $ @@ -47,6 +47,7 @@ set(expression_test_common_libs $ $ $ + $ ) @@ -95,6 +96,7 @@ nebula_add_test( gtest ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( @@ -110,6 +112,7 @@ nebula_add_executable( boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( diff --git a/src/common/http/test/CMakeLists.txt b/src/common/http/test/CMakeLists.txt index 06142e7da26..764b1883a08 100644 --- a/src/common/http/test/CMakeLists.txt +++ b/src/common/http/test/CMakeLists.txt @@ -23,5 +23,4 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} gtest curl - ) diff --git a/src/common/http/test/HttpClientTest.cpp b/src/common/http/test/HttpClientTest.cpp index b671de7ec9f..2d1603de4e1 100644 --- a/src/common/http/test/HttpClientTest.cpp +++ b/src/common/http/test/HttpClientTest.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. +/* Copyright (c) 2022 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ diff --git a/src/common/plugin/fulltext/CMakeLists.txt b/src/common/plugin/fulltext/CMakeLists.txt index 1f26a1f0c2d..813a7fee234 100644 --- a/src/common/plugin/fulltext/CMakeLists.txt +++ b/src/common/plugin/fulltext/CMakeLists.txt @@ -3,13 +3,9 @@ # This source code is licensed under Apache 2.0 License. nebula_add_library( - ft_es_graph_adapter_obj OBJECT - elasticsearch/ESGraphAdapter.cpp -) - -nebula_add_library( - ft_es_storage_adapter_obj OBJECT - elasticsearch/ESStorageAdapter.cpp + es_adapter_obj OBJECT + elasticsearch/ESAdapter.cpp + elasticsearch/ESClient.cpp ) nebula_add_subdirectory(test) diff --git a/src/common/plugin/fulltext/FTGraphAdapter.h b/src/common/plugin/fulltext/FTGraphAdapter.h deleted file mode 100644 index cf89ad82d08..00000000000 --- a/src/common/plugin/fulltext/FTGraphAdapter.h +++ /dev/null @@ -1,58 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTGraphAdapter { - public: - FTGraphAdapter() = default; - - virtual ~FTGraphAdapter() = default; - - virtual StatusOr prefix(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr wildcard(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr regexp(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr fuzzy(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const = 0; - - virtual StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const = 0; - - virtual StatusOr dropIndex(const HttpClient& client, const std::string& index) const = 0; - - // Clear the fulltext index data and keep the index schema. - virtual StatusOr clearIndex(const HttpClient& client, const std::string& index) const = 0; - - virtual StatusOr indexExists(const HttpClient& client, const std::string& index) const = 0; -}; -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/FTStorageAdapter.h b/src/common/plugin/fulltext/FTStorageAdapter.h deleted file mode 100644 index 2c046988f8f..00000000000 --- a/src/common/plugin/fulltext/FTStorageAdapter.h +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTStorageAdapter { - public: - virtual ~FTStorageAdapter() = default; - - virtual StatusOr put(const HttpClient& client, const DocItem& item) const = 0; - - virtual StatusOr bulk(const HttpClient& client, - const std::vector& items) const = 0; - - protected: - FTStorageAdapter() = default; -}; - -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp new file mode 100644 index 00000000000..7dd0f4bfe69 --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp @@ -0,0 +1,245 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" + +#include + +#include "fmt/printf.h" +#include "openssl/sha.h" +namespace nebula::plugin { + +using namespace fmt::literals; // NOLINT + +void ESBulk::put(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text) { + folly::dynamic action = folly::dynamic::object(); + folly::dynamic metadata = folly::dynamic::object(); + folly::dynamic body = folly::dynamic::object(); + auto docId = ESAdapter::genDocID(vid, src, dst, rank); + metadata["_id"] = docId; + metadata["_type"] = "_doc"; + metadata["_index"] = indexName; + action["index"] = std::move(metadata); + body["vid"] = vid; + body["src"] = src; + body["dst"] = dst; + body["rank"] = rank; + body["content"] = text; + documents_[docId] = {std::move(action), std::move(body)}; +} + +void ESBulk::delete_(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank) { + folly::dynamic action = folly::dynamic::object(); + folly::dynamic metadata = folly::dynamic::object(); + auto docId = ESAdapter::genDocID(vid, src, dst, rank); + metadata["_id"] = docId; + metadata["_type"] = "_doc"; + metadata["_index"] = indexName; + action["delete"] = std::move(metadata); + documents_[docId] = {std::move(action)}; +} + +bool ESBulk::empty() { + return documents_.empty(); +} + +ESAdapter::ESAdapter(std::vector&& clients) : clients_(clients) {} + +void ESAdapter::setClients(std::vector&& clients) { + clients_ = std::move(clients); +} + +Status ESAdapter::createIndex(const std::string& name) { + folly::dynamic mappings = folly::parseJson(R"( + { + "mappings":{ + "properties":{ + "vid": { + "type": "keyword" + }, + "src": { + "type": "keyword" + }, + "dst": { + "type": "keyword" + }, + "rank": { + "type": "long" + }, + "text": { + "type": "text" + } + } + } + } + )"); + auto result = randomClient().createIndex(name, mappings); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["acknowledged"].isBool() && resp["acknowledged"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::dropIndex(const std::string& name) { + auto result = randomClient().dropIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["acknowledged"].isBool() && resp["acknowledged"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::clearIndex(const std::string& name) { + auto result = randomClient().clearIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["failures"].isArray() && resp["failures"].size() == 0) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +StatusOr ESAdapter::isIndexExist(const std::string& name) { + auto result = randomClient().getIndex(name); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + + if (resp[name].isObject()) { + return true; + } + auto error = resp["error"]; + if (error.isObject()) { + if (error["type"].isString() && error["type"].getString() == "index_not_found_exception") { + return false; + } + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +Status ESAdapter::bulk(const ESBulk& bulk) { + std::vector jsonArray; + for (auto& [docId, objs] : bulk.documents_) { + for (auto& obj : objs) { + jsonArray.push_back(obj); + } + } + auto result = randomClient().bulk(jsonArray); + if (!result.ok()) { + return result.status(); + } + auto resp = std::move(result).value(); + if (resp["errors"].isBool() && !resp["errors"].getBool()) { + return Status::OK(); + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +StatusOr ESAdapter::prefix(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("prefix", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::wildcard(const std::string& index, const std::string& pattern) { + folly::dynamic query = + folly::dynamic::object("query", folly::dynamic::object("wildcard", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::regexp(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("regexp", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::fuzzy(const std::string& index, const std::string& pattern) { + folly::dynamic query = folly::dynamic::object("query", folly::dynamic::object("fuzzy", pattern)); + return ESAdapter::query(index, query); +} + +StatusOr ESAdapter::query(const std::string& index, const folly::dynamic& query) { + auto result = randomClient().search(index, query); + if (!result.ok()) { + return std::move(result).status(); + } + auto resp = std::move(result).value(); + auto hits = resp["hits"]; + if (hits.isObject()) { + ESQueryResult res; + for (auto& hit : hits) { + auto source = hit["_source"]; + ESQueryResult::Item item; + item.text = source["text"].getString(); + item.src = source["src"].getString(); + item.dst = source["dst"].getString(); + item.rank = source["rank"].getInt(); + item.vid = source["vid"].getString(); + res.items.emplace_back(std::move(item)); + } + return res; + } + auto error = resp["error"]; + if (error.isObject()) { + return Status::Error(folly::toJson(error)); + } + return Status::Error(folly::toJson(resp)); +} + +std::string ESAdapter::genDocID(const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank) { + std::string str; + unsigned char mdStr[33] = {0}; + if (!vid.empty()) { + str = vid; + } else { + str = src + dst + std::to_string(rank); + } + SHA256(reinterpret_cast(str.data()), str.size(), mdStr); + return folly::hexDump(mdStr, 32); +} + +ESClient& ESAdapter::randomClient() { + static thread_local std::default_random_engine engine; + static thread_local std::uniform_int_distribution d(0, clients_.size() - 1); + return clients_[d(engine)]; +} +} // namespace nebula::plugin diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h new file mode 100644 index 00000000000..05f172bf10e --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h @@ -0,0 +1,77 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_ESADAPTER_H_ +#define COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_ESADAPTER_H_ + +#include "common/base/StatusOr.h" +#include "common/plugin/fulltext/elasticsearch/ESClient.h" +#include "folly/container/F14Map.h" +#include "folly/dynamic.h" +namespace nebula::plugin { + +struct ESQueryResult { + struct Item { + std::string text; + std::string vid; // for vertex + std::string src, dst; // for edge + int64_t rank; // for edge + }; + std::vector items; +}; + +class ESBulk { + public: + void put(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text); + void delete_(const std::string& indexName, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank); + + bool empty(); + + private: + folly::F14FastMap> documents_; + friend class ESAdapter; +}; + +class ESAdapter { + public: + explicit ESAdapter(std::vector&& clients); + ESAdapter() = default; + void setClients(std::vector&& clients); + Status createIndex(const std::string& name); + Status dropIndex(const std::string& name); + Status clearIndex(const std::string& name); + StatusOr isIndexExist(const std::string& name); + + Status bulk(const ESBulk& bulk); + StatusOr prefix(const std::string& index, const std::string& pattern); + StatusOr fuzzy(const std::string& index, const std::string& pattern); + StatusOr regexp(const std::string& index, const std::string& pattern); + StatusOr wildcard(const std::string& index, const std::string& pattern); + StatusOr query(const std::string& index, const folly::dynamic& query); + + private: + static std::string genDocID(const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank); + + ESClient& randomClient(); + std::vector clients_; + + friend class ESBulk; +}; + +} // namespace nebula::plugin + +#endif diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.cpp b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp new file mode 100644 index 00000000000..a4721e3c514 --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp @@ -0,0 +1,118 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/plugin/fulltext/elasticsearch/ESClient.h" + +#include "common/http/HttpClient.h" +namespace nebula::plugin { + +ESClient::ESClient(const std::string& protocol, + const std::string& address, + const std::string& user, + const std::string& password) + : protocol_(protocol), address_(address), user_(user), password_(password) { + // TODO(hs.zhang): enable protocol + // TODO(hs.zhang): enable user&password +} + +StatusOr ESClient::createIndex(const std::string& name, + const folly::dynamic& body) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::put(url, {"Content-Type: application/json"}, folly::toJson(body)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + auto ret = folly::parseJson(resp.body); + return ret; +} + +StatusOr ESClient::dropIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::delete_(url, {"Content-Type: application/json"}); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::clearIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}/_delete_by_query?refresh", address_, name); + std::string body = R"( + { + "query":{ + "match_all":{} + } + } + )"; + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, body); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::getIndex(const std::string& name) { + std::string url = fmt::format("http://{}/{}", address_, name); + auto resp = HttpClient::get(url, {"Content-Type: application/json"}); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::deleteByQuery(const std::string& index, + const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_delete_by_query", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::updateByQuery(const std::string& index, + const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_update_by_query", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +// StatusOr ESClient::put(const std::string& index, const folly::dynamic& document) +// { +// std::string url = fmt::format("http://{}/{}/_doc", address_, index); +// auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(document)); +// if (resp.curlCode != 0) { +// return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); +// } +// return folly::parseJson(resp.body); +// } + +StatusOr ESClient::search(const std::string& index, const folly::dynamic& query) { + std::string url = fmt::format("http://{}/{}/_search", address_, index); + auto resp = HttpClient::post(url, {"Content-Type: application/json"}, folly::toJson(query)); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +StatusOr ESClient::bulk(const std::vector& bulk) { + std::string url = fmt::format("http://{}/_bulk", address_); + std::string body; + for (auto& obj : bulk) { + body += folly::toJson(obj); + body += "\n"; + } + auto resp = HttpClient::post(url, {"Content-Type: application/x-ndjson"}, body); + if (resp.curlCode != 0) { + return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); + } + return folly::parseJson(resp.body); +} + +} // namespace nebula::plugin diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.h b/src/common/plugin/fulltext/elasticsearch/ESClient.h new file mode 100644 index 00000000000..f861398b78a --- /dev/null +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_CLIENT_ +#define COMMON_PLUGIN_FULLTEXT_ELASTICSEARCH_CLIENT_ +#include + +#include +#include + +#include "common/base/StatusOr.h" +namespace nebula::plugin { + +class ESClient { + public: + ESClient(const std::string& protocol, + const std::string& address, + const std::string& user, + const std::string& password); + + StatusOr createIndex(const std::string& name, const folly::dynamic& object); + StatusOr dropIndex(const std::string& name); + + StatusOr clearIndex(const std::string& name); + + StatusOr getIndex(const std::string& name); + + StatusOr deleteByQuery(const std::string& index, const folly::dynamic& query); + StatusOr updateByQuery(const std::string& index, const folly::dynamic& query); + StatusOr search(const std::string& index, const folly::dynamic& query); + // StatusOr put(const std::string& index, const folly::dynamic& object); + StatusOr bulk(const std::vector& bulk); + + private: + std::string protocol_; + std::string address_; + std::string user_; + std::string password_; + + StatusOr sendHttpRequest(const std::string& url, + const std::string& method, + const std::vector& header, + const std::string& body); +}; + +} // namespace nebula::plugin + +#endif diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp deleted file mode 100644 index d3395cf6c84..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" - -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { - -std::unique_ptr ESGraphAdapter::kAdapter = - std::unique_ptr(new ESGraphAdapter()); - -StatusOr ESGraphAdapter::prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kPrefix); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kWildcard); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kRegexp); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kFuzzy, fuzziness, op); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -std::string ESGraphAdapter::header() const noexcept { - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON; - return os.str(); -} - -std::string ESGraphAdapter::header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET http://127.0.0.1:9200/my_temp_index_3/_search?timeout=10ms - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XGET; - os << client.toString() << item.index << "/_search?timeout=" << limit.timeout_ << "ms" - << "\""; - return os.str(); -} - -folly::dynamic ESGraphAdapter::columnBody(const std::string& col) const noexcept { - // "term": {"column_id": "col1"} - folly::dynamic itemColumn = folly::dynamic::object("column_id", DocIDTraits::column(col)); - return folly::dynamic::object("term", itemColumn); -} - -std::string ESGraphAdapter::body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - folly::dynamic obj; - switch (type) { - case FT_SEARCH_OP::kPrefix: { - obj = prefixBody(item.val); - break; - } - case FT_SEARCH_OP::kWildcard: { - obj = wildcardBody(item.val); - break; - } - case FT_SEARCH_OP::kRegexp: { - obj = regexpBody(item.val); - break; - } - case FT_SEARCH_OP::kFuzzy: { - obj = fuzzyBody(item.val, fuzziness, op); - } - } - auto itemArray = folly::dynamic::array(columnBody(item.column), obj); - folly::dynamic itemMust = folly::dynamic::object("must", itemArray); - folly::dynamic itemBool = folly::dynamic::object("bool", itemMust); - folly::dynamic itemQuery = - folly::dynamic::object("query", itemBool)("_source", "value")("size", maxRows)("from", 0); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(itemQuery)) << "'"; - return os.str(); -} - -folly::dynamic ESGraphAdapter::prefixBody(const std::string& prefix) const noexcept { - // {"prefix": {"value": "a"}} - folly::dynamic itemValue = folly::dynamic::object("value", prefix); - return folly::dynamic::object("prefix", itemValue); -} - -folly::dynamic ESGraphAdapter::wildcardBody(const std::string& wildcard) const noexcept { - // {"wildcard": {"value": "*a"}} - folly::dynamic itemValue = folly::dynamic::object("value", wildcard); - return folly::dynamic::object("wildcard", itemValue); -} - -folly::dynamic ESGraphAdapter::regexpBody(const std::string& regexp) const noexcept { - // {"regexp": {"value": "c+"}} - folly::dynamic itemValue = folly::dynamic::object("value", regexp); - return folly::dynamic::object("regexp", itemValue); -} - -folly::dynamic ESGraphAdapter::fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - // {"match": {"value": {"query":"ccc aaa","fuzziness": "AUTO","operator": - // "and"}}} - folly::dynamic items = - folly::dynamic::object("query", regexp)("fuzziness", fuzziness)("operator", op); - folly::dynamic value = folly::dynamic::object("value", items); - return folly::dynamic::object("match", value); -} - -StatusOr ESGraphAdapter::createIndex(const HttpClient& client, - const std::string& index, - const std::string&) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/index_exist" - std::string cmd = createIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string&) const noexcept { - std::stringstream os; - os << header() << XPUT << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::dropIndex(const HttpClient& client, const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XDELETE "http://127.0.0.1:9200/index_exist" - std::string cmd = dropIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http DELETE Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XDELETE << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::clearIndex(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5" - // -d '{"query": {"match_all":{}}}' - std::string cmd = clearIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http POST Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return clearCheck(ret.value()); -} - -std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\"" - << " -d '{\"query\": {\"match_all\":{}}}'"; - return os.str(); -} - -StatusOr ESGraphAdapter::indexExists(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET "http://127.0.0.1:9200/_cat/indices/index_exist?format=json" - std::string cmd = indexExistsCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("Http GET Failed: : %s", cmd.c_str()); - } - return indexCheck(ret.value()); -} - -std::string ESGraphAdapter::indexExistsCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XGET << client.toString() << "_cat/indices/" << index << "?format=json" - << "\""; - return os.str(); -} - -bool ESGraphAdapter::result(const std::string& ret, std::vector& rows) const { - try { - auto root = folly::parseJson(ret); - auto rootHits = root.find("hits"); - if (rootHits != root.items().end()) { - auto subHits = rootHits->second.find("hits"); - if (subHits != rootHits->second.items().end()) { - for (auto& item : subHits->second) { - auto s = item.find("_source"); - if (s != item.items().end()) { - auto v = s->second.find("value"); - if (v != s->second.items().end()) { - rows.emplace_back(v->second.getString()); - } else { - continue; - } - } else { - continue; - } - } - } - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::statusCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("acknowledged"); - if (result != root.items().end() && result->second.isBool() && result->second.getBool()) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::indexCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (!root.isArray()) { - return false; - } - for (auto& entry : root) { - auto exists = entry.find("index"); - if (exists != entry.items().end()) { - return true; - } - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::clearCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("failures"); - if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h deleted file mode 100644 index beac919d59e..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESGRAPHADAPTER_H -#define NEBULA_PLUGIN_ESGRAPHADAPTER_H - -#include - -#include "common/plugin/fulltext/FTGraphAdapter.h" - -namespace nebula { -namespace plugin { -class ESGraphAdapter final : public FTGraphAdapter { - FRIEND_TEST(FulltextPluginTest, ESIndexCheckTest); - FRIEND_TEST(FulltextPluginTest, ESResultTest); - FRIEND_TEST(FulltextPluginTest, ESPrefixTest); - FRIEND_TEST(FulltextPluginTest, ESWildcardTest); - FRIEND_TEST(FulltextPluginTest, ESRegexpTest); - FRIEND_TEST(FulltextPluginTest, ESFuzzyTest); - FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest); - FRIEND_TEST(FulltextPluginTest, ESDropIndexTest); - FRIEND_TEST(FulltextPluginTest, ESClearIndexTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const override; - - StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const override; - - StatusOr dropIndex(const HttpClient& client, const std::string& index) const override; - - // Clear the fulltext index data on es and keep the index schema. - // client: es client - // index: fulltext index name - StatusOr clearIndex(const HttpClient& client, const std::string& index) const override; - - StatusOr indexExists(const HttpClient& client, const std::string& index) const override; - - private: - ESGraphAdapter() {} - - std::string header() const noexcept; - - std::string header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept; - - folly::dynamic columnBody(const std::string& col) const noexcept; - - std::string body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness = nullptr, - const std::string& op = "") const noexcept; - - folly::dynamic prefixBody(const std::string& prefix) const noexcept; - - folly::dynamic wildcardBody(const std::string& wildcard) const noexcept; - - folly::dynamic regexpBody(const std::string& regexp) const noexcept; - - folly::dynamic fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept; - - bool result(const std::string& ret, std::vector& rows) const; - - bool statusCheck(const std::string& ret) const; - - bool indexCheck(const std::string& ret) const; - - // check the result - bool clearCheck(const std::string& ret) const; - - std::string createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const noexcept; - - std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - // Encapsulates the clearIndex command. - // client: es client - // index: fulltext index name - std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept; -}; -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESGRAPHADAPTER_H diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp deleted file mode 100644 index a84e16b9fc0..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" - -#include "common/plugin/fulltext/FTUtils.h" -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { -std::unique_ptr ESStorageAdapter::kAdapter = - std::unique_ptr(new ESStorageAdapter()); - -bool ESStorageAdapter::checkPut(const std::string& ret, const std::string& cmd) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("index1", "col1", 1, 2, "aaaa"); - // - // Command should be : - // /usr/bin/curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT - // "http://127.0.0.1:9200/index1/_doc/0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa" - // // NOLINT - // -d'{"value":"aaaa","schema_id":2,"column_id":"8c43de7b01bca674276c43e09b3ec5ba"}' - // - // If successful, the result is returned: - // { - // "_primary_term": 1, - // "_shards": { - // "failed": 0, - // "total": 2, - // "successful": 1 - // }, - // "_id": - // "0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa", - // "result": "created", - // "_seq_no": 0, - // "_type": "_doc", - // "_index": "index1", - // "_version": 1 - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("result"); - if (result != root.items().end() && - (result->second.getString() == "created" || result->second.getString() == "updated")) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Command : " << cmd << "failed : " << ret; - return false; -} - -bool ESStorageAdapter::checkBulk(const std::string& ret) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("bulk_index", "col1", 1, 2, "row_1") - // ("bulk_index", "col1", 1, 2, "row_2") - // - // Command should be : - // curl -H "Content-Type: application/x-ndjson" -XPOST - // localhost:9200/_bulk -d ' { "index" : { "_index" : "bulk_index", "_id" : - // "1" } } { "schema_id" : 1 , "column_id" : "col1", "value" : "row_1"} { - // "index" : { "_index" : "bulk_index", "_id" : "2" } } { "schema_id" : 1 , - // "column_id" : "col1", "value" : "row_2"} - // ' - // - // If successful, the result is returned: - // { - // "took": 18, - // "errors": false, - // "items": [{ - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "1", - // "_version": 4, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 4, - // "_primary_term": 1, - // "status": 200 - // } - // }, { - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "2", - // "_version": 2, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 5, - // "_primary_term": 1, - // "status": 200 - // } - // }] - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("errors"); - if (result != root.items().end() && result->second.isBool() && !result->second.getBool()) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Bulk insert failed"; - VLOG(3) << ret; - return false; -} - -StatusOr ESStorageAdapter::put(const HttpClient& client, const DocItem& item) const { - auto command = putCmd(client, item); - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << command; - return Status::Error("command failed : %s", command.c_str()); - } - return checkPut(ret.value(), command); -} - -StatusOr ESStorageAdapter::bulk(const HttpClient& client, - const std::vector& items) const { - auto command = bulkCmd(client, items); - if (command.empty()) { - return true; - } - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - VLOG(3) << "Http PUT Failed"; - VLOG(3) << command; - return Status::Error("bulk command failed"); - } - return checkBulk(ret.value()); -} - -std::string ESStorageAdapter::putHeader(const HttpClient& client, - const DocItem& item) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/my_temp_index_3/_doc/part1|tag4|col4|hello" - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XPUT; - os << client.toString() << item.index << "/_doc/" << DocIDTraits::docId(item) << "\""; - return os.str(); -} - -std::string ESStorageAdapter::putBody(const DocItem& item) const noexcept { - // -d'{"column_id" : "col4", "value" : "hello"}' - folly::dynamic d = folly::dynamic::object("column_id", DocIDTraits::column(item.column))( - "value", DocIDTraits::val(item.val)); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(d)) << "'"; - return os.str(); -} - -std::string ESStorageAdapter::putCmd(const HttpClient& client, const DocItem& item) const noexcept { - std::stringstream os; - os << putHeader(client, item) << putBody(item); - return os.str(); -} - -std::string ESStorageAdapter::bulkHeader(const HttpClient& client) const noexcept { - // curl -H "Content-Type: application/x-ndjson; charset=utf-8" - // http://127.0.0.1:9200/_bulk - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_NDJSON << XPOST; - os << client.toString() << "_bulk\""; - return os.str(); -} - -std::string ESStorageAdapter::bulkBody(const std::vector& items) const noexcept { - // -d ' - // { "index" : { "_index" : "bulk_index", "_id" : "1" } } - // { "column_id" : "col1", "value" : "row_1"} - // { "index" : { "_index" : "bulk_index", "_id" : "2" } } - // { "column_id" : "col1", "value" : "row_2"} - // ' - if (items.empty()) { - return ""; - } - std::stringstream os; - os << " -d '" - << "\n"; - for (const auto& item : items) { - folly::dynamic meta = - folly::dynamic::object("_id", DocIDTraits::docId(item))("_index", item.index); - folly::dynamic data = folly::dynamic::object("value", DocIDTraits::val(item.val))( - "column_id", DocIDTraits::column(item.column)); - os << folly::toJson(folly::dynamic::object("index", meta)) << "\n"; - os << DocIDTraits::normalizedJson(folly::toJson(data)) << "\n"; - } - os << "'"; - return os.str(); -} - -std::string ESStorageAdapter::bulkCmd(const HttpClient& client, - const std::vector& items) const noexcept { - auto body = bulkBody(items); - if (body.empty()) { - return ""; - } - std::stringstream os; - os << bulkHeader(client) << body; - return os.str(); -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h deleted file mode 100644 index 323018724c6..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESSTORAGEADAPTER_H -#define NEBULA_PLUGIN_ESSTORAGEADAPTER_H - -#include - -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" - -namespace nebula { -namespace plugin { - -class ESStorageAdapter final : public FTStorageAdapter { - FRIEND_TEST(FulltextPluginTest, ESPutTest); - FRIEND_TEST(FulltextPluginTest, ESBulkTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr put(const HttpClient& client, const DocItem& item) const override; - - StatusOr bulk(const HttpClient& client, const std::vector& items) const override; - - private: - ESStorageAdapter() {} - - std::string putHeader(const HttpClient& client, const DocItem& item) const noexcept; - - std::string putBody(const DocItem& item) const noexcept; - - std::string putCmd(const HttpClient& client, const DocItem& item) const noexcept; - - std::string bulkHeader(const HttpClient& client) const noexcept; - - std::string bulkBody(const std::vector& items) const noexcept; - - std::string bulkCmd(const HttpClient& client, const std::vector& items) const noexcept; - - bool checkPut(const std::string& ret, const std::string& cmd) const; - - bool checkBulk(const std::string& ret) const; -}; - -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESSTORAGEADAPTER_H diff --git a/src/common/plugin/fulltext/test/CMakeLists.txt b/src/common/plugin/fulltext/test/CMakeLists.txt index 1446e8a34b3..26488314679 100644 --- a/src/common/plugin/fulltext/test/CMakeLists.txt +++ b/src/common/plugin/fulltext/test/CMakeLists.txt @@ -2,20 +2,19 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_test( - NAME - plugin_fulltext_test - SOURCES - FulltextPluginTest.cpp - OBJECTS - $ - $ - $ - $ - $ - $ - LIBRARIES - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - gtest -) +# nebula_add_test( +# NAME +# plugin_fulltext_test +# SOURCES +# FulltextPluginTest.cpp +# OBJECTS +# $ +# $ +# $ +# $ +# $ +# LIBRARIES +# ${THRIFT_LIBRARIES} +# ${PROXYGEN_LIBRARIES} +# gtest +# ) diff --git a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp index c17c5291f3d..10671dce966 100644 --- a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp +++ b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp @@ -8,8 +8,6 @@ #include "common/base/Base.h" #include "common/network/NetworkUtils.h" #include "common/plugin/fulltext/FTUtils.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" namespace nebula { namespace plugin { diff --git a/src/common/utils/test/CMakeLists.txt b/src/common/utils/test/CMakeLists.txt index 940999b720f..afbdc3e9de4 100644 --- a/src/common/utils/test/CMakeLists.txt +++ b/src/common/utils/test/CMakeLists.txt @@ -100,7 +100,7 @@ nebula_add_test( $ $ $ - $ + # $ LIBRARIES ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 1b2665e2941..6affb99edd3 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -53,7 +53,7 @@ set(storage_meta_deps $ $ $ - $ + $ $ $ ) @@ -146,7 +146,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ ${common_deps} @@ -233,7 +233,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ $ diff --git a/src/graph/context/test/CMakeLists.txt b/src/graph/context/test/CMakeLists.txt index b83013e5101..d01223a6696 100644 --- a/src/graph/context/test/CMakeLists.txt +++ b/src/graph/context/test/CMakeLists.txt @@ -27,7 +27,7 @@ SET(CONTEXT_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/executor/admin/SpaceExecutor.cpp b/src/graph/executor/admin/SpaceExecutor.cpp index da761521210..9aa7c4abf50 100644 --- a/src/graph/executor/admin/SpaceExecutor.cpp +++ b/src/graph/executor/admin/SpaceExecutor.cpp @@ -134,15 +134,16 @@ folly::Future DropSpaceExecutor::execute() { qctx()->rctx()->session()->setSpace(std::move(spaceInfo)); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::dropTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.dropIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(ERROR) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet; } } } @@ -189,15 +190,16 @@ folly::Future ClearSpaceExecutor::execute() { return resp.status(); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { LOG(WARNING) << "Get text search clients failed"; return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::clearTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.clearIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet; } } } diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 4b32d3939c6..92d0018ace9 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -41,14 +41,14 @@ folly::Future DropFTIndexExecutor::execute() { << "' failed: " << resp.status(); return resp.status(); } - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); } - auto ftRet = FTIndexUtils::dropTSIndex(std::move(tsRet).value(), inode->getName()); + auto esAdapter = std::move(esAdapterRet).value(); + auto ftRet = esAdapter.dropIndex(inode->getName()); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index '" << inode->getName() - << "' failed: " << ftRet.status(); + LOG(WARNING) << "Drop fulltext index '" << inode->getName() << "' failed: " << ftRet; } return Status::OK(); }); diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index af16fa5f3df..cbb5aee6994 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -34,7 +34,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/optimizer/test/CMakeLists.txt b/src/graph/optimizer/test/CMakeLists.txt index 04bd1330ba3..03c56547921 100644 --- a/src/graph/optimizer/test/CMakeLists.txt +++ b/src/graph/optimizer/test/CMakeLists.txt @@ -29,7 +29,7 @@ set(OPTIMIZER_TEST_LIB $ $ $ - $ + $ $ $ $ diff --git a/src/graph/util/FTIndexUtils.cpp b/src/graph/util/FTIndexUtils.cpp index 685c60ede1a..4f2c32c5945 100644 --- a/src/graph/util/FTIndexUtils.cpp +++ b/src/graph/util/FTIndexUtils.cpp @@ -25,8 +25,7 @@ bool FTIndexUtils::needTextSearch(const Expression* expr) { } } -StatusOr> FTIndexUtils::getTSClients( - meta::MetaClient* client) { +StatusOr<::nebula::plugin::ESAdapter> FTIndexUtils::getESAdapter(meta::MetaClient* client) { auto tcs = client->getServiceClientsFromCache(meta::cpp2::ExternalServiceType::ELASTICSEARCH); if (!tcs.ok()) { return tcs.status(); @@ -34,76 +33,30 @@ StatusOr> FTIndexUtils::getTSClients( if (tcs.value().empty()) { return Status::SemanticError("No text search client found"); } - std::vector tsClients; + std::vector<::nebula::plugin::ESClient> clients; for (const auto& c : tcs.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; - if (c.user_ref().has_value() && c.pwd_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); - } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - tsClients.emplace_back(std::move(hc)); - } - return tsClients; -} - -StatusOr FTIndexUtils::checkTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->indexExists(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("fulltext index get failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::dropTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->dropIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("drop fulltext index failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::clearTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + std::string address = c.host.toString(); + std::string user = c.user_ref().has_value() ? *c.user_ref() : ""; + std::string password = c.pwd_ref().has_value() ? *c.pwd_ref() : ""; + clients.emplace_back(protocol, address, user, password); } - return Status::Error("clear fulltext index failed : %s", index.c_str()); + return ::nebula::plugin::ESAdapter(std::move(clients)); } -StatusOr FTIndexUtils::rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients) { - auto vRet = textSearch(expr, index, tsClients); +StatusOr FTIndexUtils::rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter) { + auto vRet = textSearch(expr, index, esAdapter); if (!vRet.ok()) { - return Status::SemanticError("Text search error."); + return vRet.status(); } - if (vRet.value().empty()) { + auto result = std::move(vRet).value(); + if (result.items.empty()) { return nullptr; } - auto tsArg = static_cast(expr)->arg(); Expression* propExpr; if (isEdge) { @@ -112,8 +65,8 @@ StatusOr FTIndexUtils::rewriteTSFilter( propExpr = TagPropertyExpression::make(pool, tsArg->from(), tsArg->prop()); } std::vector rels; - for (const auto& row : vRet.value()) { - auto constExpr = ConstantExpression::make(pool, Value(row)); + for (auto& item : result.items) { + auto constExpr = ConstantExpression::make(pool, Value(item.text)); rels.emplace_back(RelationalExpression::makeEQ(pool, propExpr, constExpr)); } if (rels.size() == 1) { @@ -122,67 +75,43 @@ StatusOr FTIndexUtils::rewriteTSFilter( return ExpressionUtils::pushOrs(pool, rels); } -StatusOr> FTIndexUtils::textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients) { +StatusOr FTIndexUtils::textSearch( + Expression* expr, const std::string& index, ::nebula::plugin::ESAdapter& esAdapter) { auto tsExpr = static_cast(expr); - - nebula::plugin::DocItem doc(index, tsExpr->arg()->prop(), tsExpr->arg()->val()); - nebula::plugin::LimitItem limit(tsExpr->arg()->timeout(), tsExpr->arg()->limit()); - std::vector result; - // TODO (sky) : External index load balancing - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - StatusOr ret = Status::Error(); - switch (tsExpr->kind()) { - case Expression::Kind::kTSFuzzy: { - folly::dynamic fuzz = folly::dynamic::object(); - if (tsExpr->arg()->fuzziness() < 0) { - fuzz = "AUTO"; - } else { - fuzz = tsExpr->arg()->fuzziness(); - } - std::string op = tsExpr->arg()->op().empty() ? "or" : tsExpr->arg()->op(); - ret = nebula::plugin::ESGraphAdapter::kAdapter->fuzzy( - randomFTClient(tsClients), doc, limit, fuzz, op, result); - break; - } - case Expression::Kind::kTSPrefix: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->prefix( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSRegexp: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->regexp( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSWildcard: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->wildcard( - randomFTClient(tsClients), doc, limit, result); - break; - } - default: - return Status::SemanticError("text search expression error"); + std::string pattern = tsExpr->arg()->val(); + std::function()> execFunc; + switch (tsExpr->kind()) { + case Expression::Kind::kTSFuzzy: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.fuzzy(index, pattern); }; + break; } - if (!ret.ok()) { - continue; + case Expression::Kind::kTSPrefix: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.prefix(index, pattern); }; + break; } - if (ret.value()) { - return result; + case Expression::Kind::kTSRegexp: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.regexp(index, pattern); }; + break; + } + case Expression::Kind::kTSWildcard: { + execFunc = [&index, &pattern, &esAdapter]() { return esAdapter.wildcard(index, pattern); }; + break; + } + default: { + return Status::SemanticError("text search expression error"); } - return Status::SemanticError( - "External index error. " - "please check the status of fulltext cluster"); } - return Status::SemanticError("scan external index failed"); -} -const nebula::plugin::HttpClient& FTIndexUtils::randomFTClient( - const std::vector& tsClients) { - auto i = folly::Random::rand32(tsClients.size() - 1); - return tsClients[i]; + auto retryCnt = FLAGS_ft_request_retry_times > 0 ? FLAGS_ft_request_retry_times : 1; + StatusOr result; + while (retryCnt-- > 0) { + result = execFunc(); + if (!result.ok()) { + continue; + } + break; + } + return result; } } // namespace graph diff --git a/src/graph/util/FTIndexUtils.h b/src/graph/util/FTIndexUtils.h index 1231f76ddcc..14339fb80f3 100644 --- a/src/graph/util/FTIndexUtils.h +++ b/src/graph/util/FTIndexUtils.h @@ -7,7 +7,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/StatusOr.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/util/SchemaUtil.h" #include "parser/MaintainSentences.h" @@ -19,37 +19,21 @@ class FTIndexUtils final { FTIndexUtils() = delete; // Checks if the filter expression is full-text search related static bool needTextSearch(const Expression* expr); - // Checks meta client and returns the full-text search client if there is one - static StatusOr> getTSClients(meta::MetaClient* client); - // Checks if the full-text index exists - static StatusOr checkTSIndex(const std::vector& tsClients, - const std::string& index); - // Drops the full-text index - static StatusOr dropTSIndex(const std::vector& tsClients, - const std::string& index); - - // Clears the full-text index data, but keeps the index schema - static StatusOr clearTSIndex(const std::vector& tsClients, - const std::string& index); - - // Converts TextSearchExpression into a relational expression that could be pushed down - static StatusOr rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients); + + static StatusOr<::nebula::plugin::ESAdapter> getESAdapter(meta::MetaClient* client); + + // Converts TextSearchExpression into a relational expresion that could be pushed down + static StatusOr rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); // Performs full-text search using elastic search adapter - // Search type is defined by the expression kind - static StatusOr> textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients); - - // Picks a random full-text search client from the given list - static const nebula::plugin::HttpClient& randomFTClient( - const std::vector& tsClients); + // Search type is defiend by the expression kind + static StatusOr textSearch(Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); }; } // namespace graph diff --git a/src/graph/util/test/CMakeLists.txt b/src/graph/util/test/CMakeLists.txt index 5de9c601e43..0abef7eaeda 100644 --- a/src/graph/util/test/CMakeLists.txt +++ b/src/graph/util/test/CMakeLists.txt @@ -52,7 +52,6 @@ nebula_add_test( $ $ $ - $ $ $ $ @@ -72,11 +71,13 @@ nebula_add_test( $ $ $ + $ + $ LIBRARIES gtest gtest_main ${Boost_Thread_LIBRARY} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} - # curl + curl ) diff --git a/src/graph/validator/AdminValidator.h b/src/graph/validator/AdminValidator.h index b0a2a87ec10..e9f10b234d5 100644 --- a/src/graph/validator/AdminValidator.h +++ b/src/graph/validator/AdminValidator.h @@ -7,7 +7,6 @@ #define GRAPH_VALIDATOR_ADMINVALIDATOR_H_ #include "clients/meta/MetaClient.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" #include "parser/AdminSentences.h" #include "parser/MaintainSentences.h" diff --git a/src/graph/validator/LookupValidator.cpp b/src/graph/validator/LookupValidator.cpp index 2ea3be001ce..ea88bb0c2e3 100644 --- a/src/graph/validator/LookupValidator.cpp +++ b/src/graph/validator/LookupValidator.cpp @@ -505,11 +505,7 @@ StatusOr LookupValidator::checkTSExpr(Expression* expr) { auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId()); NG_RETURN_IF_ERROR(tsi); auto tsName = tsi.value().first; - auto ret = FTIndexUtils::checkTSIndex(tsClients_, tsName); - NG_RETURN_IF_ERROR(ret); - if (!ret.value()) { - return Status::SemanticError("text search index not found : %s", tsName.c_str()); - } + auto ftFields = tsi.value().second.get_fields(); auto tsExpr = static_cast(expr); auto prop = tsExpr->arg()->prop(); @@ -606,13 +602,13 @@ Status LookupValidator::getSchemaProvider(shared_ptr // Generate text search filter, check validity and rewrite StatusOr LookupValidator::genTsFilter(Expression* filter) { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - tsClients_ = std::move(tsRet).value(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); auto tsIndex = checkTSExpr(filter); NG_RETURN_IF_ERROR(tsIndex); return FTIndexUtils::rewriteTSFilter( - qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), tsClients_); + qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), esAdapter); } } // namespace graph diff --git a/src/graph/validator/LookupValidator.h b/src/graph/validator/LookupValidator.h index 9889d99971e..79ed4ce4e42 100644 --- a/src/graph/validator/LookupValidator.h +++ b/src/graph/validator/LookupValidator.h @@ -5,7 +5,6 @@ #ifndef _VALIDATOR_LOOKUP_VALIDATOR_H_ #define _VALIDATOR_LOOKUP_VALIDATOR_H_ -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" namespace nebula { @@ -56,7 +55,6 @@ class LookupValidator final : public Validator { void extractExprProps(); std::unique_ptr lookupCtx_; - std::vector tsClients_; ExpressionProps exprProps_; std::vector idxReturnCols_; std::vector schemaIds_; diff --git a/src/graph/validator/MaintainValidator.cpp b/src/graph/validator/MaintainValidator.cpp index b347fceda5b..02c93c173fe 100644 --- a/src/graph/validator/MaintainValidator.cpp +++ b/src/graph/validator/MaintainValidator.cpp @@ -10,6 +10,7 @@ #include "common/base/Status.h" #include "common/charset/Charset.h" #include "common/expression/ConstantExpression.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/planner/plan/Admin.h" #include "graph/planner/plan/Maintain.h" #include "graph/planner/plan/Query.h" @@ -601,11 +602,12 @@ Status CreateFTIndexValidator::validateImpl() { if (containUpper) { return Status::SyntaxError("Fulltext index names cannot contain uppercase letters"); } - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - auto tsIndex = FTIndexUtils::checkTSIndex(std::move(tsRet).value(), name); - NG_RETURN_IF_ERROR(tsIndex); - if (tsIndex.value()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); + auto existResult = esAdapter.isIndexExist(name); + NG_RETURN_IF_ERROR(existResult); + if (existResult.value()) { return Status::Error("text search index exist : %s", name.c_str()); } auto space = vctx_->whichSpace(); @@ -621,7 +623,7 @@ Status CreateFTIndexValidator::validateImpl() { } index_.space_id_ref() = space.id; index_.depend_schema_ref() = std::move(id); - index_.fields_ref() = sentence->fields(); + index_.fields_ref()->push_back(sentence->field()); return Status::OK(); } @@ -634,8 +636,6 @@ Status CreateFTIndexValidator::toPlan() { } Status DropFTIndexValidator::validateImpl() { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); return Status::OK(); } diff --git a/src/graph/validator/test/CMakeLists.txt b/src/graph/validator/test/CMakeLists.txt index 616fa901998..d1401408bc4 100644 --- a/src/graph/validator/test/CMakeLists.txt +++ b/src/graph/validator/test/CMakeLists.txt @@ -55,7 +55,7 @@ set(VALIDATOR_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/visitor/test/CMakeLists.txt b/src/graph/visitor/test/CMakeLists.txt index e7fa08ce8d9..caf85a5cbe1 100644 --- a/src/graph/visitor/test/CMakeLists.txt +++ b/src/graph/visitor/test/CMakeLists.txt @@ -70,7 +70,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 2bccddd26dd..0bac41055df 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -155,8 +155,6 @@ void Listener::doApply() { }); } - - void Listener::resetListener() { std::lock_guard g(raftLock_); reset(); diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 4e8b024472b..88ef21f70bc 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -200,7 +200,7 @@ class BatchHolder : public boost::noncopyable, public nebula::cpp::NonMovable { * * @return const std::vector>& */ - const std::vector>& getBatch() { + const std::vector>& getBatch() const { return batch_; } diff --git a/src/kvstore/plugins/elasticsearch/ESListener.cpp b/src/kvstore/plugins/elasticsearch/ESListener.cpp index 05121ea196b..4ea02a4e78b 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.cpp +++ b/src/kvstore/plugins/elasticsearch/ESListener.cpp @@ -5,7 +5,7 @@ #include "kvstore/plugins/elasticsearch/ESListener.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "common/utils/NebulaKeyUtils.h" DECLARE_uint32(ft_request_retry_times); @@ -25,17 +25,18 @@ void ESListener::init() { if (!cRet.ok() || cRet.value().empty()) { LOG(FATAL) << "elasticsearch clients error"; } + std::vector esClients; for (const auto& c : cRet.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; + auto host = c.host; + std::string user, password; if (c.user_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); + user = *c.user_ref(); + password = *c.pwd_ref(); } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - esClients_.emplace_back(std::move(hc)); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + esClients.emplace_back(protocol, host.toString(), user, password); } - + esAdapter_.setClients(std::move(esClients)); auto sRet = schemaMan_->toGraphSpaceName(spaceId_); if (!sRet.ok()) { LOG(FATAL) << "space name error"; @@ -43,30 +44,93 @@ void ESListener::init() { spaceName_ = std::make_unique(sRet.value()); } -bool ESListener::apply(const std::vector& data) { - std::vector docItems; - for (const auto& kv : data) { - if (!nebula::NebulaKeyUtils::isTag(vIdLen_, kv.first) && - !nebula::NebulaKeyUtils::isEdge(vIdLen_, kv.first)) { - continue; +bool ESListener::apply(const BatchHolder& batch) { + nebula::plugin::ESBulk bulk; + auto callback = [&bulk](BatchLogType type, + const std::string& index, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text) { + if (type == BatchLogType::OP_BATCH_PUT) { + bulk.put(index, vid, src, dst, rank, text); + } else if (type == BatchLogType::OP_BATCH_REMOVE) { + bulk.delete_(index, vid, src, dst, rank); + } else { + LOG(FATAL) << "Unexpect"; } - if (!appendDocItem(docItems, kv)) { + }; + for (const auto& log : batch.getBatch()) { + pickTagAndEdgeData(std::get<0>(log), std::get<1>(log), std::get<2>(log), callback); + } + if (!bulk.empty()) { + auto status = esAdapter_.bulk(bulk); + if (!status.ok()) { + LOG(ERROR) << status; return false; } - if (docItems.size() >= static_cast(FLAGS_ft_bulk_batch_size)) { - auto suc = writeData(docItems); - if (!suc) { - return suc; - } - docItems.clear(); - } - } - if (!docItems.empty()) { - return writeData(docItems); } return true; } +void ESListener::pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& callback) { + if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) { + auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); + if (reader == nullptr) { + LOG(ERROR) << "get tag reader failed, tagID " << tagId; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); + std::string text = std::move(v).getStr(); + callback(type, indexName, vid, "", "", 0, text); + } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); + if (reader == nullptr) { + LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); + std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); + int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); + std::string text = std::move(v).getStr(); + callback(type, indexName, "", src, dst, rank, text); + } +} + bool ESListener::persist(LogID lastId, TermID lastTerm, LogID lastApplyLogId) { if (!writeAppliedId(lastId, lastTerm, lastApplyLogId)) { LOG(FATAL) << "last apply ids write failed"; @@ -146,105 +210,6 @@ std::string ESListener::encodeAppliedId(LogID lastId, return val; } -bool ESListener::appendDocItem(std::vector& items, const KV& kv) const { - auto isEdge = NebulaKeyUtils::isEdge(vIdLen_, kv.first); - return isEdge ? appendEdgeDocItem(items, kv) : appendTagDocItem(items, kv); -} - -bool ESListener::appendEdgeDocItem(std::vector& items, const KV& kv) const { - auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, edgeType); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, kv.second); - if (reader == nullptr) { - VLOG(3) << "get edge reader failed, schema ID " << edgeType; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendTagDocItem(std::vector& items, const KV& kv) const { - auto tagId = NebulaKeyUtils::getTagId(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, tagId); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, kv.second); - if (reader == nullptr) { - VLOG(3) << "get tag reader failed, tagID " << tagId; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const { - for (const auto& field : fti.second.get_fields()) { - auto v = reader->getValueByName(field); - if (v.type() != Value::Type::STRING) { - continue; - } - items.emplace_back(DocItem(fti.first, field, partId_, std::move(v).getStr())); - } - return true; -} - -bool ESListener::writeData(const std::vector& items) const { - bool isNeedWriteOneByOne = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->bulk(esClients_[index], items); - if (!suc.ok()) { - VLOG(3) << "bulk failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - isNeedWriteOneByOne = true; - break; - } - return true; - } - if (isNeedWriteOneByOne) { - return writeDatum(items); - } - LOG(WARNING) << idStr_ << "Failed to bulk into es."; - return false; -} - -bool ESListener::writeDatum(const std::vector& items) const { - bool done = false; - for (const auto& item : items) { - done = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->put(esClients_[index], item); - if (!suc.ok()) { - VLOG(3) << "put failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - // TODO (sky) : Record failed data - break; - } - done = true; - break; - } - if (!done) { - // means CURL fails, and no need to take the next step - LOG(INFO) << idStr_ << "Failed to put into es."; - return false; - } - } - return true; -} - void ESListener::processLogs() { std::unique_ptr iter; { diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 19c9fd8c73f..f9e2780cd76 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -7,14 +7,11 @@ #define KVSTORE_PLUGINS_ES_LISTENER_H_ #include "codec/RowReaderWrapper.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "kvstore/Listener.h" - namespace nebula { namespace kvstore { -using nebula::plugin::DocItem; - class ESListener : public Listener { public: /** @@ -56,7 +53,8 @@ class ESListener : public Listener { * @param data Key/value to apply * @return True if succeed. False if failed. */ - bool apply(const std::vector& data); + bool apply(const BatchHolder& batch); + /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId @@ -106,66 +104,22 @@ class ESListener : public Listener { */ std::string encodeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId) const noexcept; - /** - * @brief Convert key value to DocItem - * - * @param items DocItems to send - * @param kv Key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert edge key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendEdgeDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert tag key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendTagDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Add the fulltext index field to DocItem - * - * @param items DocItems to send - * @param reader Key/value's reader - * @param fti Fulltext index schema - * @return Whether append DocItem succeed - */ - bool appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const; - - /** - * @brief Bulk DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeData(const std::vector& items) const; - - /** - * @brief Put DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeDatum(const std::vector& items) const; - private: meta::SchemaManager* schemaMan_{nullptr}; + using PickFunc = std::function; + void pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& func); std::unique_ptr lastApplyLogFile_{nullptr}; std::unique_ptr spaceName_{nullptr}; - std::vector esClients_; + ::nebula::plugin::ESAdapter esAdapter_; int32_t vIdLen_; }; diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index 32ab49bbbf7..50761adadda 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -36,7 +36,7 @@ set(KVSTORE_TEST_LIBS $ $ $ - $ + $ $ $ $ @@ -45,6 +45,7 @@ set(KVSTORE_TEST_LIBS $ $ $ + $ ) nebula_add_test( @@ -60,6 +61,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -75,6 +77,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -90,6 +93,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -105,6 +109,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -120,6 +125,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -135,6 +141,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -150,6 +157,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -165,6 +173,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -180,6 +189,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -196,6 +206,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_executable( @@ -213,6 +224,7 @@ nebula_add_executable( follybenchmark gtest boost_regex + curl ) nebula_add_test( @@ -228,4 +240,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 816c2093e80..5d91fba3cf1 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include "common/base/Base.h" #include "common/fs/FileUtils.h" #include "common/fs/TempDir.h" @@ -44,7 +46,11 @@ class DummyListener : public Listener { : Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {} std::vector data() { - return data_; + std::vector ret; + for (auto& [key, value] : data_) { + ret.emplace_back(key, value); + } + return ret; } std::tuple commitSnapshot(const std::vector& rows, @@ -186,10 +192,30 @@ class DummyListener : public Listener { protected: void init() override {} - bool apply(const std::vector& kvs) { - for (const auto& kv : kvs) { - data_.emplace_back(kv); + bool apply(const BatchHolder& batch) override { + for (auto& log : batch.getBatch()) { + switch (std::get<0>(log)) { + case BatchLogType::OP_BATCH_PUT: { + data_[std::get<1>(log)] = std::get<2>(log); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + data_.erase(std::get<1>(log)); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + auto iter = data_.lower_bound(std::get<1>(log)); + while (iter != data_.end()) { + if (iter->first < std::get<2>(log)) { + iter = data_.erase(iter); + } else { + break; + } + } + } + } } + return true; } @@ -214,7 +240,7 @@ class DummyListener : public Listener { } private: - std::vector data_; + std::map data_; std::pair committedSnapshot_{0, 0}; int32_t snapshotBatchCount_{0}; }; @@ -450,9 +476,14 @@ TEST_P(ListenerBasicTest, SimpleTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -520,9 +551,14 @@ TEST_P(ListenerBasicTest, TransLeaderTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(200, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -554,9 +590,14 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 0c31c9404c2..a325edfc38a 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -163,7 +163,7 @@ set(meta_test_deps $ $ $ - $ + $ $ $ $ @@ -172,6 +172,7 @@ set(meta_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index c4d059fb41b..6f54cc20ba8 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -17,4 +17,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 52146ed1915..f963bb4f1bf 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -15,6 +15,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -30,6 +31,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -45,6 +47,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -60,6 +63,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -69,12 +73,14 @@ nebula_add_test( MetaClientTest.cpp OBJECTS ${meta_test_deps} + $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -90,6 +96,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -107,6 +114,7 @@ nebula_add_test( wangle gtest gmock + curl ) @@ -145,6 +153,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -161,6 +170,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -176,6 +186,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -192,6 +203,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -208,6 +220,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -223,6 +236,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -238,6 +252,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -253,6 +268,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -268,6 +284,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -283,4 +300,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/parser/MaintainSentences.cpp b/src/parser/MaintainSentences.cpp index d882e5b7873..308c3736914 100644 --- a/src/parser/MaintainSentences.cpp +++ b/src/parser/MaintainSentences.cpp @@ -495,9 +495,7 @@ std::string CreateFTIndexSentence::toString() const { buf += *schemaName_; buf += "("; std::vector fieldDefs; - for (const auto& field : fields()) { - fieldDefs.emplace_back(field); - } + fieldDefs.emplace_back(field()); std::string fields; folly::join(", ", fieldDefs, fields); buf += fields; diff --git a/src/parser/MaintainSentences.h b/src/parser/MaintainSentences.h index cc56f3cdc43..6a2c59c4730 100644 --- a/src/parser/MaintainSentences.h +++ b/src/parser/MaintainSentences.h @@ -1127,11 +1127,11 @@ class CreateFTIndexSentence final : public Sentence { CreateFTIndexSentence(bool isEdge, std::string *indexName, std::string *schemaName, - NameLabelList *fields) { + std::string *field) { isEdge_ = isEdge; indexName_.reset(indexName); schemaName_.reset(schemaName); - fields_.reset(fields); + field_.reset(field); kind_ = Kind::kCreateFTIndex; } @@ -1148,20 +1148,15 @@ class CreateFTIndexSentence final : public Sentence { return schemaName_.get(); } - std::vector fields() const { - std::vector result; - auto fields = fields_->labels(); - result.resize(fields.size()); - auto get = [](auto ptr) { return *ptr; }; - std::transform(fields.begin(), fields.end(), result.begin(), get); - return result; + std::string field() const { + return *field_; } private: bool isEdge_; std::unique_ptr indexName_; std::unique_ptr schemaName_; - std::unique_ptr fields_; + std::unique_ptr field_; }; class DropFTIndexSentence final : public Sentence { public: diff --git a/src/parser/parser.yy b/src/parser/parser.yy index bfd06536385..264af87946c 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -2693,10 +2693,10 @@ create_edge_index_sentence ; create_fulltext_index_sentence - : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(false, $5, $7, $9); } - | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(true, $5, $7, $9); } ; diff --git a/src/parser/test/CMakeLists.txt b/src/parser/test/CMakeLists.txt index 27762d80742..474a29b7cfc 100644 --- a/src/parser/test/CMakeLists.txt +++ b/src/parser/test/CMakeLists.txt @@ -32,7 +32,8 @@ set(PARSER_TEST_LIBS $ $ $ - $ + $ + $ $ $ $ @@ -60,7 +61,7 @@ nebula_add_test( SOURCES ParserTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( @@ -68,21 +69,21 @@ nebula_add_test( SOURCES ScannerTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_executable( NAME parser_bm SOURCES ParserBenchmark.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( NAME expression_parsing_test SOURCES ExpressionParsingTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) if(ENABLE_FUZZ_TEST) diff --git a/src/parser/test/fuzzing/CMakeLists.txt b/src/parser/test/fuzzing/CMakeLists.txt index 2de1ac7fd94..c2814163cdb 100644 --- a/src/parser/test/fuzzing/CMakeLists.txt +++ b/src/parser/test/fuzzing/CMakeLists.txt @@ -7,5 +7,5 @@ nebula_add_test( FUZZER ON SOURCES ParserFuzzer.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle + LIBRARIES ${THRIFT_LIBRARIES} wangle curl ) diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 39bc5dfd5cb..22318a37669 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -52,7 +52,7 @@ set(storage_test_deps $ $ $ - $ + $ $ $ $ @@ -60,6 +60,7 @@ set(storage_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) @@ -82,6 +83,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -97,6 +99,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -112,6 +115,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -127,6 +131,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -142,6 +147,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -157,6 +163,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -172,6 +179,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -187,6 +195,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -202,6 +211,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -217,6 +227,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -232,6 +243,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -248,6 +260,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) @@ -265,6 +278,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -280,6 +294,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -295,6 +310,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -310,6 +326,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -327,6 +344,7 @@ nebula_add_executable( gtest follybenchmark boost_regex + curl ) nebula_add_executable( @@ -342,6 +360,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -357,6 +376,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -372,6 +392,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -391,6 +412,7 @@ nebula_add_executable( follybenchmark boost_regex gtest + curl ) nebula_add_test( @@ -407,6 +429,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -446,6 +469,7 @@ nebula_add_executable( wangle boost_regex gtest + curl ) nebula_add_test( @@ -461,6 +485,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -514,6 +539,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -529,6 +555,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -544,6 +571,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) #nebula_add_executable( @@ -575,6 +603,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -590,6 +619,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -605,6 +635,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -625,6 +656,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -645,6 +677,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -665,6 +698,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -680,6 +714,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -696,6 +731,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_test( @@ -711,6 +747,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -725,6 +762,7 @@ nebula_add_test( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) nebula_add_executable( @@ -740,6 +778,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -757,4 +796,5 @@ nebula_add_executable( boost_regex wangle gtest + curl ) diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index d6c2d226143..05d3454376d 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -56,10 +56,11 @@ set(tools_test_deps $ $ $ - $ + $ $ $ $ + $ ) if(NOT ENABLE_STANDALONE_VERSION) diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index 850c4c56d09..1113afcc022 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -15,4 +15,5 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 518aa7f7718..7be1bd82fd1 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -38,6 +38,7 @@ nebula_add_executable( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) #install(