Skip to content

Commit

Permalink
fix es delete error
Browse files Browse the repository at this point in the history
1. remove get Rowreader if op is delete
2. delete es data when value is null
  • Loading branch information
cangfengzhs committed Jan 12, 2023
1 parent c4da7bc commit 74d77fd
Showing 1 changed file with 43 additions and 51 deletions.
94 changes: 43 additions & 51 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,47 +73,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
const std::string& key,
const std::string& value,
const PickFunc& callback) {
bool isTag = nebula::NebulaKeyUtils::isTag(vIdLen_, key);
bool isEdge = nebula::NebulaKeyUtils::isEdge(vIdLen_, key);
if (!(isTag || isEdge)) {
return;
}
std::unordered_map<std::string, nebula::meta::cpp2::FTIndex> ftIndexes;
nebula::RowReaderWrapper reader;

std::string vid;
std::string src;
std::string dst;
int rank = 0;
if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) {
auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId);
if (!ftIndexRes.ok()) {
LOG(ERROR) << ftIndexRes.status().message();
return;
}
auto ftIndex = std::move(ftIndexRes).value();
if (ftIndex.empty()) {
return;
}
nebula::RowReaderWrapper reader;
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value);
if (reader == nullptr) {
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
}
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
std::string text;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
text = std::move(v).getStr();
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
callback(type, indexName, vid, "", "", 0, text);
}
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
} else {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
if (edgeType < 0) {
return;
Expand All @@ -122,41 +111,44 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
if (!ftIndexRes.ok()) {
return;
}
auto ftIndex = std::move(ftIndexRes).value();
nebula::RowReaderWrapper reader;
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value);
if (reader == nullptr) {
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
}
src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
rank = NebulaKeyUtils::getRank(vIdLen_, key);

for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
src = truncateVid(src);
dst = truncateVid(dst);
}
if (ftIndexes.empty()) {
return;
}

for (auto& index : ftIndexes) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
std::string text;
std::string indexName = index.first;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text);
continue;
}
std::string text;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
text = std::move(v).getStr();
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);

src = truncateVid(src);
dst = truncateVid(dst);
callback(type, indexName, "", src, dst, rank, text);
text = std::move(v).getStr();
}
callback(type, indexName, vid, src, dst, rank, text);
}
}

Expand Down

0 comments on commit 74d77fd

Please sign in to comment.