Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check reader is null or not before using it #2029

Merged
merged 2 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
return false;
}
auto reader = nebula::RowReader::getTagPropReader(schemaMan_, val, spaceId, tagId);
if (reader == nullptr) {
VLOG(3) << "Remove the bad format vertex";
return false;
}
return checkDataTtlValid(schema.get(), reader.get());
} else if (NebulaKeyUtils::isEdge(key)) {
auto edgeType = NebulaKeyUtils::getEdgeType(key);
Expand All @@ -113,6 +117,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
}
auto reader = nebula::RowReader::getEdgePropReader(schemaMan_, val,
spaceId, std::abs(edgeType));
if (reader == nullptr) {
VLOG(3) << "Remove the bad format edge!";
return false;
}
return checkDataTtlValid(schema.get(), reader.get());
}
return true;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/admin/RebuildEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ void RebuildEdgeIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
std::move(val),
space,
edgeType);
if (reader == nullptr) {
iter->next();
continue;
}
auto values = collectIndexValues(reader.get(), item->get_fields());
auto indexKey = NebulaKeyUtils::edgeIndexKey(part, indexID, source,
ranking, destination, values);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/admin/RebuildTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ void RebuildTagIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
std::move(val),
space,
tagID);
if (reader == nullptr) {
iter->next();
continue;
}
auto values = collectIndexValues(reader.get(), item->get_fields());

auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID, vertex, values);
Expand Down
6 changes: 6 additions & 0 deletions src/storage/index/IndexExecutor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ kvstore::ResultCode IndexExecutor<RESP>::getVertexRow(PartitionID partId,
if (result.ok()) {
auto v = std::move(result).value();
auto reader = RowReader::getTagPropReader(schemaMan_, v, spaceId_, tagOrEdge_);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
auto row = getRowFromReader(reader.get());
data->set_props(std::move(row));
VLOG(3) << "Hit cache for vId " << vId << ", tagId " << tagOrEdge_;
Expand All @@ -204,6 +207,9 @@ kvstore::ResultCode IndexExecutor<RESP>::getVertexRow(PartitionID partId,
iter->val(),
spaceId_,
tagOrEdge_);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
auto row = getRowFromReader(reader.get());
data->set_props(std::move(row));
if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
val,
spaceId_,
edgeType);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
auto oi = indexKey(partId, reader.get(), e.first, index);
if (!oi.empty()) {
batchHolder->remove(std::move(oi));
Expand All @@ -115,6 +119,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
e.second,
spaceId_,
edgeType);
if (nReader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
}
auto ni = indexKey(partId, nReader.get(), e.first, index);
batchHolder->put(std::move(ni), "");
Expand Down
8 changes: 8 additions & 0 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
val,
spaceId_,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
auto oi = indexKey(partId, vId, reader.get(), index);
if (!oi.empty()) {
batchHolder->remove(std::move(oi));
Expand All @@ -132,6 +136,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
v.second,
spaceId_,
tagId);
if (nReader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
}
auto ni = indexKey(partId, vId, nReader.get(), index);
batchHolder->put(std::move(ni), "");
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
iter->val(),
spaceId,
type);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row!";
return folly::none;
}
}
auto values = collectIndexValues(reader.get(),
index->get_fields());
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
iter->val(),
spaceId,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return folly::none;
}
}
const auto& cols = index->get_fields();
auto values = collectIndexValues(reader.get(), cols);
Expand Down
11 changes: 10 additions & 1 deletion src/storage/query/QueryBaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectVertexProps(
if (result.ok()) {
auto v = std::move(result).value();
auto reader = RowReader::getTagPropReader(this->schemaMan_, v, spaceId_, tagId);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}

// Check if ttl data expired
auto retTtlOpt = getTagTTLInfo(tagId);
Expand Down Expand Up @@ -445,7 +448,9 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectVertexProps(
// stored along with the properties
if (iter && iter->valid()) {
auto reader = RowReader::getTagPropReader(this->schemaMan_, iter->val(), spaceId_, tagId);

if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
// Check if ttl data expired
auto retTtlOpt = getTagTTLInfo(tagId);
if (retTtlOpt.hasValue()) {
Expand Down Expand Up @@ -525,6 +530,10 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
val,
spaceId_,
std::abs(edgeType));
if (reader == nullptr) {
LOG(WARNING) << "Skip the bad format row!";
continue;
}
// Check if ttl data expired
if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(),
reader.get(),
Expand Down
3 changes: 3 additions & 0 deletions src/storage/query/QueryEdgePropsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ kvstore::ResultCode QueryEdgePropsProcessor::collectEdgesProps(
iter->val(),
spaceId_,
std::abs(edgeKey.edge_type));
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}

// Check if ttl data expired
if (retTTLOpt.has_value()) {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/query/QueryVertexPropsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ kvstore::ResultCode QueryVertexPropsProcessor::collectVertexProps(
continue;
}
auto reader = RowReader::getTagPropReader(this->schemaMan_, val, spaceId_, tagId);
if (reader == nullptr) {
VLOG(3) << "Skip the bad format row!";
continue;
}
// Check if ttl data expired
auto retTTL = getTagTTLInfo(tagId, schema.get());
if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(),
Expand Down
4 changes: 4 additions & 0 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) {
} else if (!ctxIter->second.empty()) {
// only return specified columns
auto reader = RowReader::getEdgePropReader(schemaMan_, value, spaceId_, edgeType);
if (reader == nullptr) {
LOG(WARNING) << "Skip the bad format row";
continue;
}
RowWriter writer;
PropsCollector collector(&writer);
auto& props = ctxIter->second;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) {
} else if (!ctxIter->second.empty()) {
// only return specified columns
auto reader = RowReader::getTagPropReader(schemaMan_, value, spaceId_, tagId);
if (reader == nullptr) {
continue;
}
RowWriter writer;
PropsCollector collector(&writer);
auto& props = ctxIter->second;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/UpdateEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ TEST(UpdateEdgeTest, CorruptDataTest) {
PartitionID partId = 0;
VertexID srcId = 10;
VertexID dstId = 11;
// src = 1, edge_type = 101, ranking = 0, dst = 10001
// src = 10, edge_type = 101, ranking = 0, dst = 11
storage::cpp2::EdgeKey edgeKey;
edgeKey.set_src(srcId);
edgeKey.set_edge_type(101);
Expand Down