Skip to content

Commit

Permalink
add new es client (#4970)
Browse files Browse the repository at this point in the history
* add new es client

* fix mem leak

* address some comment
  • Loading branch information
cangfengzhs authored Dec 2, 2022
1 parent 8fff4aa commit 9058610
Show file tree
Hide file tree
Showing 17 changed files with 1,425 additions and 32 deletions.
5 changes: 5 additions & 0 deletions src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ CurlHandle* CurlHandle::instance() {
return &handle;
}

HttpClient& HttpClient::instance() {
static HttpClient instance_;
return instance_;
}

HttpResponse HttpClient::get(const std::string& url) {
return HttpClient::get(url, {});
}
Expand Down
19 changes: 12 additions & 7 deletions src/common/http/HttpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,21 @@ struct HttpResponse {

class HttpClient {
public:
static HttpResponse get(const std::string& url);
static HttpResponse get(const std::string& url, const std::vector<std::string>& headers);
static HttpClient& instance();
virtual ~HttpClient() = default;
virtual HttpResponse get(const std::string& url);
virtual HttpResponse get(const std::string& url, const std::vector<std::string>& headers);

static HttpResponse post(const std::string& url,
virtual HttpResponse post(const std::string& url,
const std::vector<std::string>& headers,
const std::string& body);
virtual HttpResponse delete_(const std::string& url, const std::vector<std::string>& headers);
virtual HttpResponse put(const std::string& url,
const std::vector<std::string>& headers,
const std::string& body);
static HttpResponse delete_(const std::string& url, const std::vector<std::string>& headers);
static HttpResponse put(const std::string& url,
const std::vector<std::string>& headers,
const std::string& body);

protected:
HttpClient() = default;

private:
static HttpResponse sendRequest(const std::string& url,
Expand Down
8 changes: 4 additions & 4 deletions src/common/http/test/HttpClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class HTTPClientTest : public ::testing::Test {};
TEST_F(HTTPClientTest, GET) {
FakeHttpServer server(3659);
server.start();
auto resp = HttpClient::get("http://localhost:3659");
auto resp = HttpClient::instance().get("http://localhost:3659");
ASSERT_EQ(resp.curlCode, 0) << resp.curlMessage;
ASSERT_EQ(resp.body, "GET");
server.stop();
Expand All @@ -24,7 +24,7 @@ TEST_F(HTTPClientTest, GET) {
TEST_F(HTTPClientTest, POST) {
FakeHttpServer server(3660);
server.start();
auto resp = HttpClient::post("http://localhost:3660", {}, "");
auto resp = HttpClient::instance().post("http://localhost:3660", {}, "");
ASSERT_EQ(resp.curlCode, 0) << resp.curlMessage;
ASSERT_EQ(resp.body, "POST");
server.stop();
Expand All @@ -34,7 +34,7 @@ TEST_F(HTTPClientTest, POST) {
TEST_F(HTTPClientTest, DELETE) {
FakeHttpServer server(3661);
server.start();
auto resp = HttpClient::delete_("http://localhost:3661", {});
auto resp = HttpClient::instance().delete_("http://localhost:3661", {});
ASSERT_EQ(resp.curlCode, 0) << resp.curlMessage;
ASSERT_EQ(resp.body, "DELETE");
server.stop();
Expand All @@ -44,7 +44,7 @@ TEST_F(HTTPClientTest, DELETE) {
TEST_F(HTTPClientTest, PUT) {
FakeHttpServer server(3662);
server.start();
auto resp = HttpClient::put("http://localhost:3662", {}, "");
auto resp = HttpClient::instance().put("http://localhost:3662", {}, "");
ASSERT_EQ(resp.curlCode, 0) << resp.curlMessage;
ASSERT_EQ(resp.body, "PUT");
server.stop();
Expand Down
6 changes: 6 additions & 0 deletions src/common/plugin/fulltext/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ nebula_add_library(
elasticsearch/ESStorageAdapter.cpp
)

nebula_add_library(
es_adapter_obj OBJECT
elasticsearch/ESAdapter.cpp
elasticsearch/ESClient.cpp
)

nebula_add_subdirectory(test)
280 changes: 280 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "common/plugin/fulltext/elasticsearch/ESAdapter.h"

#include <random>

#include "fmt/printf.h"
#include "openssl/sha.h"
namespace nebula::plugin {

using namespace fmt::literals; // NOLINT

ESQueryResult::Item::Item(const std::string& v, const std::string& t) : vid(v), text(t) {}
ESQueryResult::Item::Item(const std::string& s,
const std::string& d,
int64_t r,
const std::string& t)
: src(s), dst(d), rank(r), text(t) {}

void ESBulk::put(const std::string& indexName,
const std::string& vid,
const std::string& src,
const std::string& dst,
int64_t rank,
const std::string& text) {
folly::dynamic action = folly::dynamic::object();
folly::dynamic metadata = folly::dynamic::object();
folly::dynamic body = folly::dynamic::object();
auto docId = ESAdapter::genDocID(vid, src, dst, rank);
metadata["_id"] = docId;
metadata["_type"] = "_doc";
metadata["_index"] = indexName;
action["index"] = std::move(metadata);
body["vid"] = vid;
body["src"] = src;
body["dst"] = dst;
body["rank"] = rank;
body["text"] = text;
documents_[docId] = {std::move(action), std::move(body)};
}

void ESBulk::delete_(const std::string& indexName,
const std::string& vid,
const std::string& src,
const std::string& dst,
int64_t rank) {
folly::dynamic action = folly::dynamic::object();
folly::dynamic metadata = folly::dynamic::object();
auto docId = ESAdapter::genDocID(vid, src, dst, rank);
metadata["_id"] = docId;
metadata["_type"] = "_doc";
metadata["_index"] = indexName;
action["delete"] = std::move(metadata);
documents_[docId] = {std::move(action)};
}

bool ESBulk::empty() {
return documents_.empty();
}

ESAdapter::ESAdapter(std::vector<ESClient>&& clients) : clients_(clients) {}

void ESAdapter::setClients(std::vector<ESClient>&& clients) {
clients_ = std::move(clients);
}

Status ESAdapter::createIndex(const std::string& name) {
folly::dynamic mappings = folly::parseJson(R"(
{
"mappings":{
"properties":{
"vid": {
"type": "keyword"
},
"src": {
"type": "keyword"
},
"dst": {
"type": "keyword"
},
"rank": {
"type": "long"
},
"text": {
"type": "keyword"
}
}
}
}
)");
auto result = randomClient().createIndex(name, mappings);
if (!result.ok()) {
return result.status();
}
auto resp = std::move(result).value();
if (resp.count("acknowledged") && resp["acknowledged"].isBool() &&
resp["acknowledged"].getBool()) {
return Status::OK();
}
if (resp.count("error")) {
return Status::Error(folly::toJson(resp["error"]));
}
return Status::Error(folly::toJson(resp));
}

Status ESAdapter::dropIndex(const std::string& name) {
auto result = randomClient().dropIndex(name);
if (!result.ok()) {
return result.status();
}
auto resp = std::move(result).value();
if (resp["acknowledged"].isBool() && resp["acknowledged"].getBool()) {
return Status::OK();
}
auto error = resp["error"];
if (error.isObject()) {
return Status::Error(folly::toJson(error));
}
return Status::Error(folly::toJson(resp));
}

Status ESAdapter::clearIndex(const std::string& name, bool refresh) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object("match_all", folly::dynamic::object());
auto result = randomClient().deleteByQuery(name, body, refresh);
if (!result.ok()) {
return result.status();
}
auto resp = std::move(result).value();
if (resp["failures"].isArray() && resp["failures"].size() == 0) {
return Status::OK();
}
auto error = resp["error"];
if (error.isObject()) {
return Status::Error(folly::toJson(error));
}
return Status::Error(folly::toJson(resp));
}

StatusOr<bool> ESAdapter::isIndexExist(const std::string& name) {
auto result = randomClient().getIndex(name);
if (!result.ok()) {
return result.status();
}
auto resp = std::move(result).value();
if (resp.count(name)) {
return true;
}
if (resp.count("status") && resp["status"].getInt() == 404) {
return false;
}
if (resp.count("error")) {
return Status::Error(folly::toJson(resp["error"]));
}
return Status::Error(folly::toJson(resp));
}

Status ESAdapter::bulk(const ESBulk& bulk, bool refresh) {
std::vector<folly::dynamic> jsonArray;
for (auto& [docId, objs] : bulk.documents_) {
for (auto& obj : objs) {
jsonArray.push_back(obj);
}
}
auto result = randomClient().bulk(jsonArray, refresh);
if (!result.ok()) {
return result.status();
}
auto resp = std::move(result).value();
if (!resp.count("error")) {
return Status::OK();
}
auto error = resp["error"];
if (error.isObject()) {
return Status::Error(folly::toJson(error));
}
return Status::Error(folly::toJson(resp));
}

StatusOr<ESQueryResult> ESAdapter::prefix(const std::string& index, const std::string& pattern) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object();
body["query"]["prefix"] = folly::dynamic::object();
body["query"]["prefix"]["text"] = pattern;
return ESAdapter::query(index, body);
}

StatusOr<ESQueryResult> ESAdapter::wildcard(const std::string& index, const std::string& pattern) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object();
body["query"]["wildcard"] = folly::dynamic::object("text", pattern);
return ESAdapter::query(index, body);
}

StatusOr<ESQueryResult> ESAdapter::regexp(const std::string& index, const std::string& pattern) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object();
body["query"]["regexp"] = folly::dynamic::object("text", pattern);
return ESAdapter::query(index, body);
}

StatusOr<ESQueryResult> ESAdapter::fuzzy(const std::string& index,
const std::string& pattern,
const std::string& fuzziness) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object();
body["query"]["fuzzy"] = folly::dynamic::object();
body["query"]["fuzzy"]["text"] = folly::dynamic::object("value", pattern)("fuzziness", fuzziness);
return ESAdapter::query(index, body);
}

// StatusOr<ESQueryResult> ESAdapter::term(const std::string& index,const std::vector<std::string>&
// words){
// folly::dynamic query = folly::dynamic::object("query",folly::dynam)
// }

StatusOr<ESQueryResult> ESAdapter::match_all(const std::string& index) {
folly::dynamic body = folly::dynamic::object();
body["query"] = folly::dynamic::object();
body["query"]["match_all"] = folly::dynamic::object();
return ESAdapter::query(index, body);
}

StatusOr<ESQueryResult> ESAdapter::query(const std::string& index, const folly::dynamic& query) {
auto result = randomClient().search(index, query);
if (!result.ok()) {
return std::move(result).status();
}
auto resp = std::move(result).value();
if (resp.count("hits")) {
auto hits = resp["hits"]["hits"];
ESQueryResult res;
for (auto& hit : hits) {
auto source = hit["_source"];
ESQueryResult::Item item;
item.text = source["text"].getString();
item.src = source["src"].getString();
item.dst = source["dst"].getString();
item.rank = source["rank"].getInt();
item.vid = source["vid"].getString();
res.items.emplace_back(std::move(item));
}
return res;
}
auto error = resp["error"];
if (error.isObject()) {
return Status::Error(folly::toJson(error));
}
return Status::Error(folly::toJson(resp));
}

std::string ESAdapter::genDocID(const std::string& vid,
const std::string& src,
const std::string& dst,
int64_t rank) {
std::string str;
unsigned char mdStr[32] = {0};
if (!vid.empty()) {
str = vid;
} else {
str = src + dst + std::to_string(rank);
}
SHA256(reinterpret_cast<unsigned char*>(str.data()), str.size(), mdStr);
char hex[64] = {0};
for (int i = 0; i < 32; i++) {
hex[i * 2] = 'A' + mdStr[i] / 16;
hex[i * 2 + 1] = 'A' + mdStr[i] % 16;
}
return std::string(hex, 64);
}

ESClient& ESAdapter::randomClient() {
static thread_local std::default_random_engine engine;
static thread_local std::uniform_int_distribution<size_t> d(0, clients_.size() - 1);
return clients_[d(engine)];
}
} // namespace nebula::plugin
Loading

0 comments on commit 9058610

Please sign in to comment.