Skip to content

Commit

Permalink
Check all reader is null or not before using it (#2029)
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored Mar 31, 2020
1 parent f4c8361 commit f9a2a08
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
return false;
}
auto reader = nebula::RowReader::getTagPropReader(schemaMan_, val, spaceId, tagId);
if (reader == nullptr) {
VLOG(3) << "Remove the bad format vertex";
return false;
}
return checkDataTtlValid(schema.get(), reader.get());
} else if (NebulaKeyUtils::isEdge(key)) {
auto edgeType = NebulaKeyUtils::getEdgeType(key);
Expand All @@ -113,6 +117,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
}
auto reader = nebula::RowReader::getEdgePropReader(schemaMan_, val,
spaceId, std::abs(edgeType));
if (reader == nullptr) {
VLOG(3) << "Remove the bad format edge!";
return false;
}
return checkDataTtlValid(schema.get(), reader.get());
}
return true;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/admin/RebuildEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ void RebuildEdgeIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
std::move(val),
space,
edgeType);
if (reader == nullptr) {
iter->next();
continue;
}
auto values = collectIndexValues(reader.get(), item->get_fields());
auto indexKey = NebulaKeyUtils::edgeIndexKey(part, indexID, source,
ranking, destination, values);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/admin/RebuildTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ void RebuildTagIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
std::move(val),
space,
tagID);
if (reader == nullptr) {
iter->next();
continue;
}
auto values = collectIndexValues(reader.get(), item->get_fields());

auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID, vertex, values);
Expand Down
6 changes: 6 additions & 0 deletions src/storage/index/IndexExecutor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ kvstore::ResultCode IndexExecutor<RESP>::getVertexRow(PartitionID partId,
if (result.ok()) {
auto v = std::move(result).value();
auto reader = RowReader::getTagPropReader(schemaMan_, v, spaceId_, tagOrEdge_);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
auto row = getRowFromReader(reader.get());
data->set_props(std::move(row));
VLOG(3) << "Hit cache for vId " << vId << ", tagId " << tagOrEdge_;
Expand All @@ -204,6 +207,9 @@ kvstore::ResultCode IndexExecutor<RESP>::getVertexRow(PartitionID partId,
iter->val(),
spaceId_,
tagOrEdge_);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
auto row = getRowFromReader(reader.get());
data->set_props(std::move(row));
if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
val,
spaceId_,
edgeType);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
auto oi = indexKey(partId, reader.get(), e.first, index);
if (!oi.empty()) {
batchHolder->remove(std::move(oi));
Expand All @@ -115,6 +119,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
e.second,
spaceId_,
edgeType);
if (nReader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
}
auto ni = indexKey(partId, nReader.get(), e.first, index);
batchHolder->put(std::move(ni), "");
Expand Down
8 changes: 8 additions & 0 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
val,
spaceId_,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
auto oi = indexKey(partId, vId, reader.get(), index);
if (!oi.empty()) {
batchHolder->remove(std::move(oi));
Expand All @@ -132,6 +136,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
v.second,
spaceId_,
tagId);
if (nReader == nullptr) {
LOG(WARNING) << "Bad format row";
return "";
}
}
auto ni = indexKey(partId, vId, nReader.get(), index);
batchHolder->put(std::move(ni), "");
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
iter->val(),
spaceId,
type);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row!";
return folly::none;
}
}
auto values = collectIndexValues(reader.get(),
index->get_fields());
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
iter->val(),
spaceId,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Bad format row";
return folly::none;
}
}
const auto& cols = index->get_fields();
auto values = collectIndexValues(reader.get(), cols);
Expand Down
11 changes: 10 additions & 1 deletion src/storage/query/QueryBaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectVertexProps(
if (result.ok()) {
auto v = std::move(result).value();
auto reader = RowReader::getTagPropReader(this->schemaMan_, v, spaceId_, tagId);
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}

// Check if ttl data expired
auto retTtlOpt = getTagTTLInfo(tagId);
Expand Down Expand Up @@ -445,7 +448,9 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectVertexProps(
// stored along with the properties
if (iter && iter->valid()) {
auto reader = RowReader::getTagPropReader(this->schemaMan_, iter->val(), spaceId_, tagId);

if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
// Check if ttl data expired
auto retTtlOpt = getTagTTLInfo(tagId);
if (retTtlOpt.hasValue()) {
Expand Down Expand Up @@ -525,6 +530,10 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
val,
spaceId_,
std::abs(edgeType));
if (reader == nullptr) {
LOG(WARNING) << "Skip the bad format row!";
continue;
}
// Check if ttl data expired
if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(),
reader.get(),
Expand Down
3 changes: 3 additions & 0 deletions src/storage/query/QueryEdgePropsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ kvstore::ResultCode QueryEdgePropsProcessor::collectEdgesProps(
iter->val(),
spaceId_,
std::abs(edgeKey.edge_type));
if (reader == nullptr) {
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}

// Check if ttl data expired
if (retTTLOpt.has_value()) {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/query/QueryVertexPropsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ kvstore::ResultCode QueryVertexPropsProcessor::collectVertexProps(
continue;
}
auto reader = RowReader::getTagPropReader(this->schemaMan_, val, spaceId_, tagId);
if (reader == nullptr) {
VLOG(3) << "Skip the bad format row!";
continue;
}
// Check if ttl data expired
auto retTTL = getTagTTLInfo(tagId, schema.get());
if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(),
Expand Down
4 changes: 4 additions & 0 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) {
} else if (!ctxIter->second.empty()) {
// only return specified columns
auto reader = RowReader::getEdgePropReader(schemaMan_, value, spaceId_, edgeType);
if (reader == nullptr) {
LOG(WARNING) << "Skip the bad format row";
continue;
}
RowWriter writer;
PropsCollector collector(&writer);
auto& props = ctxIter->second;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) {
} else if (!ctxIter->second.empty()) {
// only return specified columns
auto reader = RowReader::getTagPropReader(schemaMan_, value, spaceId_, tagId);
if (reader == nullptr) {
continue;
}
RowWriter writer;
PropsCollector collector(&writer);
auto& props = ctxIter->second;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/UpdateEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ TEST(UpdateEdgeTest, CorruptDataTest) {
PartitionID partId = 0;
VertexID srcId = 10;
VertexID dstId = 11;
// src = 1, edge_type = 101, ranking = 0, dst = 10001
// src = 10, edge_type = 101, ranking = 0, dst = 11
storage::cpp2::EdgeKey edgeKey;
edgeKey.set_src(srcId);
edgeKey.set_edge_type(101);
Expand Down

0 comments on commit f9a2a08

Please sign in to comment.