Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebuild index for new prop #3332

Merged
merged 4 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/common/utils/IndexKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <thrift/lib/cpp2/protocol/Serializer.h>

#include "common/geo/GeoIndex.h"
#include "common/utils/DefaultValueContext.h"

namespace nebula {

Expand Down Expand Up @@ -170,25 +171,55 @@ Value IndexKeyUtils::parseIndexTTL(const folly::StringPiece& raw) {

// static
StatusOr<std::vector<std::string>> IndexKeyUtils::collectIndexValues(
RowReader* reader, const meta::cpp2::IndexItem* indexItem) {
RowReader* reader,
const meta::cpp2::IndexItem* indexItem,
const meta::SchemaProviderIf* latestSchema) {
if (reader == nullptr) {
return Status::Error("Invalid row reader");
}
auto& cols = indexItem->get_fields();
std::vector<Value> values;
for (const auto& col : cols) {
auto v = reader->getValueByName(col.get_name());
auto propName = col.get_name();
auto val = readValueWithLatestSche(reader, propName, latestSchema);
if (!val.ok()) {
LOG(ERROR) << "prop error by : " << propName << ". status : " << val.status();
return val.status();
}
auto v = val.value();
auto isNullable = col.nullable_ref().value_or(false);
auto ret = checkValue(v, isNullable);
if (!ret.ok()) {
LOG(ERROR) << "prop error by : " << col.get_name() << ". status : " << ret;
LOG(ERROR) << "prop error by : " << propName << ". status : " << ret;
return ret;
}
values.emplace_back(std::move(v));
}
return encodeValues(std::move(values), indexItem);
}

// static
StatusOr<Value> IndexKeyUtils::readValueWithLatestSche(RowReader* reader,
const std::string propName,
const meta::SchemaProviderIf* latestSchema) {
auto value = reader->getValueByName(propName);
if (latestSchema == nullptr || !value.isNull() || value.getNull() != NullType::UNKNOWN_PROP) {
return value;
}
auto field = latestSchema->field(propName);
if (field == nullptr) {
return Status::Error("Unknown prop");
}
if (field->hasDefault()) {
DefaultValueContext expCtx;
auto expr = field->defaultValue()->clone();
return Expression::eval(expr, expCtx);
} else if (field->nullable()) {
return NullType::__NULL__;
}
return Status::Error(folly::stringPrintf("Fail to read prop %s ", propName.c_str()));
}

// static
Status IndexKeyUtils::checkValue(const Value& v, bool isNullable) {
if (!v.isNull()) {
Expand Down
8 changes: 7 additions & 1 deletion src/common/utils/IndexKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,17 @@ class IndexKeyUtils final {
static Value parseIndexTTL(const folly::StringPiece& raw);

static StatusOr<std::vector<std::string>> collectIndexValues(
RowReader* reader, const meta::cpp2::IndexItem* indexItem);
RowReader* reader,
const meta::cpp2::IndexItem* indexItem,
const meta::SchemaProviderIf* latestSchema = nullptr);

private:
IndexKeyUtils() = delete;

static StatusOr<Value> readValueWithLatestSche(RowReader* reader,
const std::string propName,
const meta::SchemaProviderIf* latestSchema);

static Status checkValue(const Value& v, bool isNullable);
};

Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac

for (const auto& item : items) {
if (item->get_schema_id().get_edge_type() == edgeType) {
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get());
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get(), schema);
if (!valuesRet.ok()) {
LOG(WARNING) << "Collect index value failed";
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space

for (const auto& item : items) {
if (item->get_schema_id().get_tag_id() == tagID) {
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get());
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get(), schema);
if (!valuesRet.ok()) {
LOG(WARNING) << "Collect index value failed";
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/exec/UpdateNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class UpdateTagNode : public UpdateNode<VertexID> {
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), schema_);
if (!values.ok()) {
return {};
}
Expand Down Expand Up @@ -752,7 +752,7 @@ class UpdateEdgeNode : public UpdateNode<cpp2::EdgeKey> {
RowReader* reader,
const cpp2::EdgeKey& edgeKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), schema_);
if (!values.ok()) {
return {};
}
Expand Down
13 changes: 7 additions & 6 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
* step 1 , Delete old version index if exists.
*/
if (oReader != nullptr) {
auto ois = indexKeys(partId, oReader.get(), key, index);
auto ois = indexKeys(partId, oReader.get(), key, index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or not.
auto indexState = env_->getIndexState(spaceId_, partId);
Expand All @@ -276,7 +276,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
* step 2 , Insert new edge index
*/
if (nReader != nullptr) {
auto niks = indexKeys(partId, nReader.get(), key, index);
auto niks = indexKeys(partId, nReader.get(), key, index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -384,7 +384,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> AddEdgesProcessor::addEdges(
}

if (!val.empty()) {
auto ois = indexKeys(partId, oReader.get(), e.first, index);
auto ois = indexKeys(partId, oReader.get(), e.first, index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or not.
auto indexState = env_->getIndexState(spaceId_, partId);
Expand Down Expand Up @@ -416,7 +416,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> AddEdgesProcessor::addEdges(
}
}

auto niks = indexKeys(partId, nReader.get(), e.first, index);
auto niks = indexKeys(partId, nReader.get(), e.first, index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -473,8 +473,9 @@ std::vector<std::string> AddEdgesProcessor::indexKeys(
PartitionID partId,
RowReader* reader,
const folly::StringPiece& rawKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::vector<std::string> indexKeys(PartitionID partId,
RowReader* reader,
const folly::StringPiece& rawKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

private:
GraphSpaceID spaceId_;
Expand Down
9 changes: 5 additions & 4 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 1 , Delete old version index if exists.
*/
if (oReader != nullptr) {
auto ois = indexKeys(partId, vid, oReader.get(), index);
auto ois = indexKeys(partId, vid, oReader.get(), index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or
// not.
Expand All @@ -256,7 +256,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 2 , Insert new vertex index
*/
if (nReader != nullptr) {
auto niks = indexKeys(partId, vid, nReader.get(), index);
auto niks = indexKeys(partId, vid, nReader.get(), index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -334,8 +334,9 @@ std::vector<std::string> AddVerticesProcessor::indexKeys(
PartitionID partId,
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::vector<std::string> indexKeys(PartitionID partId,
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

private:
GraphSpaceID spaceId_;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, partId, srcId, type, rank, dstId);
std::string val;
auto ret = env_->kvstore_->get(spaceId_, partId, key, &val);
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(type));

if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
/**
Expand All @@ -162,7 +163,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
}
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), index.get());
auto valuesRet =
IndexKeyUtils::collectIndexValues(reader.get(), index.get(), schema.get());
if (!valuesRet.ok()) {
continue;
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteVerticesProcessor::deleteVer
}
target.emplace_back(std::move(l));
}
auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId);
RowReaderWrapper reader;
for (auto& index : indexes_) {
if (index->get_schema_id().get_tag_id() == tagId) {
Expand All @@ -147,7 +148,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteVerticesProcessor::deleteVer
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
}
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), index.get());
auto valuesRet =
IndexKeyUtils::collectIndexValues(reader.get(), index.get(), schema.get());
if (!valuesRet.ok()) {
continue;
}
Expand Down
14 changes: 8 additions & 6 deletions src/tools/db-upgrade/DbUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ void UpgraderSpace::encodeVertexValue(PartitionID partId,
return;
}
for (auto& index : it->second) {
auto newIndexKeys = indexVertexKeys(partId, strVid, nReader.get(), index);
auto newIndexKeys = indexVertexKeys(partId, strVid, nReader.get(), index, schema);
for (auto& newIndexKey : newIndexKeys) {
data.emplace_back(std::move(newIndexKey), "");
}
Expand Down Expand Up @@ -997,8 +997,9 @@ std::vector<std::string> UpgraderSpace::indexVertexKeys(
PartitionID partId,
VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down Expand Up @@ -1039,7 +1040,7 @@ void UpgraderSpace::encodeEdgeValue(PartitionID partId,
return;
}
for (auto& index : it->second) {
auto newIndexKeys = indexEdgeKeys(partId, nReader.get(), svId, rank, dstId, index);
auto newIndexKeys = indexEdgeKeys(partId, nReader.get(), svId, rank, dstId, index, schema);
for (auto& newIndexKey : newIndexKeys) {
data.emplace_back(std::move(newIndexKey), "");
}
Expand All @@ -1053,8 +1054,9 @@ std::vector<std::string> UpgraderSpace::indexEdgeKeys(
VertexID& svId,
EdgeRanking rank,
VertexID& dstId,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
6 changes: 4 additions & 2 deletions src/tools/db-upgrade/DbUpgrader.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class UpgraderSpace {
std::vector<std::string> indexVertexKeys(PartitionID partId,
VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

void encodeEdgeValue(PartitionID partId,
RowReader* reader,
Expand All @@ -104,7 +105,8 @@ class UpgraderSpace {
VertexID& svId,
EdgeRanking rank,
VertexID& dstId,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

WriteResult convertValue(const meta::NebulaSchemaProvider* newSchema,
const meta::SchemaProviderIf* oldSchema,
Expand Down
Loading