diff --git a/src/storage/CompactionFilter.h b/src/storage/CompactionFilter.h index ec83e7b3fa4..6425cbc8efb 100644 --- a/src/storage/CompactionFilter.h +++ b/src/storage/CompactionFilter.h @@ -103,6 +103,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter { return false; } auto reader = nebula::RowReader::getTagPropReader(schemaMan_, val, spaceId, tagId); + if (reader == nullptr) { + VLOG(3) << "Remove the bad format vertex"; + return false; + } return checkDataTtlValid(schema.get(), reader.get()); } else if (NebulaKeyUtils::isEdge(key)) { auto edgeType = NebulaKeyUtils::getEdgeType(key); @@ -113,6 +117,10 @@ class StorageCompactionFilter final : public kvstore::KVFilter { } auto reader = nebula::RowReader::getEdgePropReader(schemaMan_, val, spaceId, std::abs(edgeType)); + if (reader == nullptr) { + VLOG(3) << "Remove the bad format edge!"; + return false; + } return checkDataTtlValid(schema.get(), reader.get()); } return true; diff --git a/src/storage/admin/RebuildEdgeIndexProcessor.cpp b/src/storage/admin/RebuildEdgeIndexProcessor.cpp index edbf194a42d..e255051c390 100644 --- a/src/storage/admin/RebuildEdgeIndexProcessor.cpp +++ b/src/storage/admin/RebuildEdgeIndexProcessor.cpp @@ -82,6 +82,10 @@ void RebuildEdgeIndexProcessor::process(const cpp2::RebuildIndexRequest& req) { std::move(val), space, edgeType); + if (reader == nullptr) { + iter->next(); + continue; + } auto values = collectIndexValues(reader.get(), item->get_fields()); auto indexKey = NebulaKeyUtils::edgeIndexKey(part, indexID, source, ranking, destination, values); diff --git a/src/storage/admin/RebuildTagIndexProcessor.cpp b/src/storage/admin/RebuildTagIndexProcessor.cpp index 31154bbfd76..b498f2bc421 100644 --- a/src/storage/admin/RebuildTagIndexProcessor.cpp +++ b/src/storage/admin/RebuildTagIndexProcessor.cpp @@ -79,6 +79,10 @@ void RebuildTagIndexProcessor::process(const cpp2::RebuildIndexRequest& req) { std::move(val), space, tagID); + if (reader == nullptr) { + iter->next(); + continue; + } auto values = collectIndexValues(reader.get(), item->get_fields()); auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID, vertex, values); diff --git a/src/storage/index/IndexExecutor.inl b/src/storage/index/IndexExecutor.inl index a43386ffec9..08d8e2a2b20 100644 --- a/src/storage/index/IndexExecutor.inl +++ b/src/storage/index/IndexExecutor.inl @@ -184,6 +184,9 @@ kvstore::ResultCode IndexExecutor::getVertexRow(PartitionID partId, if (result.ok()) { auto v = std::move(result).value(); auto reader = RowReader::getTagPropReader(schemaMan_, v, spaceId_, tagOrEdge_); + if (reader == nullptr) { + return kvstore::ResultCode::ERR_CORRUPT_DATA; + } auto row = getRowFromReader(reader.get()); data->set_props(std::move(row)); VLOG(3) << "Hit cache for vId " << vId << ", tagId " << tagOrEdge_; @@ -204,6 +207,9 @@ kvstore::ResultCode IndexExecutor::getVertexRow(PartitionID partId, iter->val(), spaceId_, tagOrEdge_); + if (reader == nullptr) { + return kvstore::ResultCode::ERR_CORRUPT_DATA; + } auto row = getRowFromReader(reader.get()); data->set_props(std::move(row)); if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 9275b07a843..947604923ab 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -102,6 +102,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId, val, spaceId_, edgeType); + if (reader == nullptr) { + LOG(WARNING) << "Bad format row"; + return ""; + } auto oi = indexKey(partId, reader.get(), e.first, index); if (!oi.empty()) { batchHolder->remove(std::move(oi)); @@ -115,6 +119,10 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId, e.second, spaceId_, edgeType); + if (nReader == nullptr) { + LOG(WARNING) << "Bad format row"; + return ""; + } } auto ni = indexKey(partId, nReader.get(), e.first, index); batchHolder->put(std::move(ni), ""); diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 1758bfd6a12..afc7d5d0ddd 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -119,6 +119,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI val, spaceId_, tagId); + if (reader == nullptr) { + LOG(WARNING) << "Bad format row"; + return ""; + } auto oi = indexKey(partId, vId, reader.get(), index); if (!oi.empty()) { batchHolder->remove(std::move(oi)); @@ -132,6 +136,10 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI v.second, spaceId_, tagId); + if (nReader == nullptr) { + LOG(WARNING) << "Bad format row"; + return ""; + } } auto ni = indexKey(partId, vId, nReader.get(), index); batchHolder->put(std::move(ni), ""); diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index ec92f510386..9a5093462a4 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -107,6 +107,10 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId, iter->val(), spaceId, type); + if (reader == nullptr) { + LOG(WARNING) << "Bad format row!"; + return folly::none; + } } auto values = collectIndexValues(reader.get(), index->get_fields()); diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index b57a96e2517..b5d0532f764 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -128,6 +128,10 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId, iter->val(), spaceId, tagId); + if (reader == nullptr) { + LOG(WARNING) << "Bad format row"; + return folly::none; + } } const auto& cols = index->get_fields(); auto values = collectIndexValues(reader.get(), cols); diff --git a/src/storage/query/QueryBaseProcessor.inl b/src/storage/query/QueryBaseProcessor.inl index 6c262760a3f..c4ddea7e0cc 100644 --- a/src/storage/query/QueryBaseProcessor.inl +++ b/src/storage/query/QueryBaseProcessor.inl @@ -412,6 +412,9 @@ kvstore::ResultCode QueryBaseProcessor::collectVertexProps( if (result.ok()) { auto v = std::move(result).value(); auto reader = RowReader::getTagPropReader(this->schemaMan_, v, spaceId_, tagId); + if (reader == nullptr) { + return kvstore::ResultCode::ERR_CORRUPT_DATA; + } // Check if ttl data expired auto retTtlOpt = getTagTTLInfo(tagId); @@ -445,7 +448,9 @@ kvstore::ResultCode QueryBaseProcessor::collectVertexProps( // stored along with the properties if (iter && iter->valid()) { auto reader = RowReader::getTagPropReader(this->schemaMan_, iter->val(), spaceId_, tagId); - + if (reader == nullptr) { + return kvstore::ResultCode::ERR_CORRUPT_DATA; + } // Check if ttl data expired auto retTtlOpt = getTagTTLInfo(tagId); if (retTtlOpt.hasValue()) { @@ -525,6 +530,10 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( val, spaceId_, std::abs(edgeType)); + if (reader == nullptr) { + LOG(WARNING) << "Skip the bad format row!"; + continue; + } // Check if ttl data expired if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(), reader.get(), diff --git a/src/storage/query/QueryEdgePropsProcessor.cpp b/src/storage/query/QueryEdgePropsProcessor.cpp index 58e41e90114..94b7ca053d2 100644 --- a/src/storage/query/QueryEdgePropsProcessor.cpp +++ b/src/storage/query/QueryEdgePropsProcessor.cpp @@ -41,6 +41,9 @@ kvstore::ResultCode QueryEdgePropsProcessor::collectEdgesProps( iter->val(), spaceId_, std::abs(edgeKey.edge_type)); + if (reader == nullptr) { + return kvstore::ResultCode::ERR_CORRUPT_DATA; + } // Check if ttl data expired if (retTTLOpt.has_value()) { diff --git a/src/storage/query/QueryVertexPropsProcessor.cpp b/src/storage/query/QueryVertexPropsProcessor.cpp index d27124229fb..c1633c4e434 100644 --- a/src/storage/query/QueryVertexPropsProcessor.cpp +++ b/src/storage/query/QueryVertexPropsProcessor.cpp @@ -136,6 +136,10 @@ kvstore::ResultCode QueryVertexPropsProcessor::collectVertexProps( continue; } auto reader = RowReader::getTagPropReader(this->schemaMan_, val, spaceId_, tagId); + if (reader == nullptr) { + VLOG(3) << "Skip the bad format row!"; + continue; + } // Check if ttl data expired auto retTTL = getTagTTLInfo(tagId, schema.get()); if (retTTL.has_value() && checkDataExpiredForTTL(schema.get(), diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 15f60128fd3..aebf630ef83 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -89,6 +89,10 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) { } else if (!ctxIter->second.empty()) { // only return specified columns auto reader = RowReader::getEdgePropReader(schemaMan_, value, spaceId_, edgeType); + if (reader == nullptr) { + LOG(WARNING) << "Skip the bad format row"; + continue; + } RowWriter writer; PropsCollector collector(&writer); auto& props = ctxIter->second; diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 598fee9a53e..8fce51b3626 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -84,6 +84,9 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) { } else if (!ctxIter->second.empty()) { // only return specified columns auto reader = RowReader::getTagPropReader(schemaMan_, value, spaceId_, tagId); + if (reader == nullptr) { + continue; + } RowWriter writer; PropsCollector collector(&writer); auto& props = ctxIter->second; diff --git a/src/storage/test/UpdateEdgeTest.cpp b/src/storage/test/UpdateEdgeTest.cpp index 9c679cb7aad..b35bfd12e0f 100644 --- a/src/storage/test/UpdateEdgeTest.cpp +++ b/src/storage/test/UpdateEdgeTest.cpp @@ -378,7 +378,7 @@ TEST(UpdateEdgeTest, CorruptDataTest) { PartitionID partId = 0; VertexID srcId = 10; VertexID dstId = 11; - // src = 1, edge_type = 101, ranking = 0, dst = 10001 + // src = 10, edge_type = 101, ranking = 0, dst = 11 storage::cpp2::EdgeKey edgeKey; edgeKey.set_src(srcId); edgeKey.set_edge_type(101);