Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multi fulltext index on a tag/edge #5038

Merged
merged 6 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3456,8 +3456,8 @@ StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexB
return indexes;
}

StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId) {
StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(
GraphSpaceID spaceId, int32_t schemaId, const std::string& field) {
if (!ready_) {
return Status::Error("Not ready!");
}
Expand All @@ -3467,13 +3467,34 @@ StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSch
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
// There will only be one field. However, in order to minimize changes, the IDL was not modified
auto f = it.second.fields()->front();
if (it.second.get_space_id() == spaceId && id == schemaId && f == field) {
return std::make_pair(it.first, it.second);
}
}
return Status::IndexNotFound();
}

StatusOr<std::map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
const auto& metadata = *metadata_.load();
std::map<std::string, cpp2::FTIndex> ret;
for (auto& it : metadata.fulltextIndexMap_) {
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
ret[it.first] = it.second;
}
}
return ret;
}

StatusOr<cpp2::FTIndex> MetaClient::getFTIndexByNameFromCache(GraphSpaceID spaceId,
const std::string& name) {
if (!ready_) {
Expand Down
8 changes: 6 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,12 @@ class MetaClient : public BaseMetaClient {
StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> getFTIndexBySpaceFromCache(
GraphSpaceID spaceId);

StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId);
StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId,
const std::string& field);

StatusOr<std::map<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId);

StatusOr<cpp2::FTIndex> getFTIndexByNameFromCache(GraphSpaceID spaceId, const std::string& name);

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SchemaManager {
StatusOr<std::pair<bool, int32_t>> getSchemaIDByName(GraphSpaceID space,
folly::StringPiece schemaName);

virtual StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
virtual StatusOr<std::map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) = 0;

protected:
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ ServerBasedSchemaManager::getServiceClients(meta::cpp2::ExternalServiceType type
return std::move(ret).value();
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> ServerBasedSchemaManager::getFTIndex(
StatusOr<std::map<std::string, nebula::meta::cpp2::FTIndex>> ServerBasedSchemaManager::getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) {
auto ret = metaClient_->getFTIndexBySpaceSchemaFromCache(spaceId, schemaId);
auto ret = metaClient_->getFTIndexFromCache(spaceId, schemaId);
if (!ret.ok()) {
return ret.status();
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/ServerBasedSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ServerBasedSchemaManager : public SchemaManager {
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) override;

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
StatusOr<std::map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) override;

void init(MetaClient *client);
Expand Down
13 changes: 3 additions & 10 deletions src/graph/validator/LookupValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,19 +501,12 @@ StatusOr<Expression*> LookupValidator::checkConstExpr(Expression* expr,

// Check does test search contains properties search in test search expression
StatusOr<std::string> LookupValidator::checkTSExpr(Expression* expr) {
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();
auto metaClient = qctx_->getMetaClient();
auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId());
auto tsi = metaClient->getFTIndexFromCache(spaceId(), schemaId(), prop);
NG_RETURN_IF_ERROR(tsi);
auto tsName = tsi.value().first;

auto ftFields = tsi.value().second.get_fields();
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();

auto iter = std::find(ftFields.begin(), ftFields.end(), prop);
if (iter == ftFields.end()) {
return Status::SemanticError("Column %s not found in %s", prop.c_str(), tsName.c_str());
}
return tsName;
}

Expand Down
4 changes: 2 additions & 2 deletions src/graph/validator/test/MockSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class MockSchemaManager final : public nebula::meta::SchemaManager {
LOG(FATAL) << "Unimplemented.";
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
StatusOr<std::map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
LOG(FATAL) << "Unimplemented";
return Status::Error("Unimplemented");
}
Expand Down
52 changes: 28 additions & 24 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,20 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
}
std::string indexName = ftIndex.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType);
Expand All @@ -114,20 +116,22 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
std::string indexName = ftIndex.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
}

Expand Down
20 changes: 12 additions & 8 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>> BaseProcessor<RES
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge) {
ErrorOr<nebula::cpp2::ErrorCode, std::map<std::string, cpp2::FTIndex>>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaKeyUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
Expand All @@ -422,18 +422,18 @@ ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

std::map<std::string, cpp2::FTIndex> ret;
cangfengzhs marked this conversation as resolved.
Show resolved Hide resolved
while (indexIter->valid()) {
auto index = MetaKeyUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
ret[indexIter->key().toString()] = index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
return ret;
}

template <typename RESP>
Expand Down Expand Up @@ -464,14 +464,18 @@ nebula::cpp2::ErrorCode BaseProcessor<RESP>::indexCheck(

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::ftIndexCheck(
const std::vector<std::string>& cols, const std::vector<cpp2::AlterSchemaItem>& alterItems) {
const std::map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
std::set<std::string> cols;
for (auto& [indexName, index] : ftIndices) {
cols.insert(index.fields_ref()->front());
}
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it =
std::find_if(cols.begin(), cols.end(), [&](const auto& c) { return c == iCol.name; });
auto it = cols.find(iCol.name);
if (it != cols.end()) {
LOG(INFO) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_CONFLICT;
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class BaseProcessor {
* @param alterItems
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode ftIndexCheck(const std::vector<std::string>& cols,
nebula::cpp2::ErrorCode ftIndexCheck(const std::map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

/**
Expand All @@ -374,8 +374,8 @@ class BaseProcessor {
* @param tagOrEdge
* @return ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
*/
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> getFTIndex(GraphSpaceID spaceId,
int32_t tagOrEdge);
ErrorOr<nebula::cpp2::ErrorCode, std::map<std::string, cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge);

/**
* @brief Check if index on given fields alredy exist.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), edgeItems);
auto ftStatus = ftIndexCheck(fti, edgeItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
auto ftIdxRet = getFTIndex(spaceId, tagId);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), tagItems);
auto ftStatus = ftIndexCheck(fti, tagItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
16 changes: 8 additions & 8 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {

auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}

if (nebula::error(ftIdxRet) != nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND) {
if (!nebula::value(ftIdxRet).empty()) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
} else {
handleErrorCode(nebula::error(ftIdxRet));
onFinished();
return;
Expand Down
16 changes: 8 additions & 8 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {

auto ftIdxRet = getFTIndex(spaceId, tagId);
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}

if (nebula::error(ftIdxRet) != nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND) {
if (!nebula::value(ftIdxRet).empty()) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
} else {
handleErrorCode(nebula::error(ftIdxRet));
onFinished();
return;
Expand Down
4 changes: 2 additions & 2 deletions src/mock/AdHocSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class AdHocSchemaManager final : public nebula::meta::SchemaManager {

void addServiceClient(const nebula::meta::cpp2::ServiceClient& client);

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
StatusOr<std::map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
LOG(FATAL) << "Unimplemented";
return Status::Error("Unimplemented");
}
Expand Down