Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix coredump for update if encountering bad format row #2024

Merged
merged 2 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions src/dataman/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ std::unique_ptr<RowReader> RowReader::getTagPropReader(
folly::StringPiece row,
GraphSpaceID space,
TagID tag) {
CHECK_NOTNULL(schemaMan);
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, ver);
Expand All @@ -130,9 +133,7 @@ std::unique_ptr<RowReader> RowReader::getTagPropReader(
row,
schema));
} else {
// Invalid data
// TODO We need a better error handler here
LOG(FATAL) << "Invalid schema version in the row data!";
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
}
}
Expand All @@ -144,16 +145,21 @@ std::unique_ptr<RowReader> RowReader::getEdgePropReader(
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
CHECK_NOTNULL(schemaMan);
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, ver);
if (schema == nullptr) {
return nullptr;
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schemaMan->getEdgeSchema(space, edge, ver)));
schema));
} else {
// Invalid data
// TODO We need a better error handler here
LOG(FATAL) << "Invalid schema version in the row data!";
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
}
}
Expand All @@ -173,7 +179,7 @@ int32_t RowReader::getSchemaVer(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
if (reinterpret_cast<const char*>(it) == row.end()) {
LOG(ERROR) << "Row data is empty, so there is no schema version";
return 0;
return -1;
}

// The first three bits indicate the number of bytes for the
Expand All @@ -185,7 +191,7 @@ int32_t RowReader::getSchemaVer(folly::StringPiece row) {
if (verBytes + 1 > row.size()) {
// Data is too short
LOG(ERROR) << "Row data is too short";
return 0;
return -1;
}
// Schema Version is stored in Little Endian
for (size_t i = 0; i < verBytes; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ enum ResultCode {
ERR_TAG_NOT_FOUND = -11,
ERR_EDGE_NOT_FOUND = -12,
ERR_ATOMIC_OP_FAILED = -13,
ERR_CORRUPT_DATA = -14,
ERR_PARTIAL_RESULT = -99,
ERR_UNKNOWN = -100,
};
Expand Down
1 change: 1 addition & 0 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ enum class FilterResult {
SUCCEEDED = 0, // pass filter
E_FILTER_OUT = -1, // filter out
E_ERROR = -2, // exception when filter
E_BAD_SCHEMA = -3, // Bad schema
};


Expand Down
54 changes: 42 additions & 12 deletions src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ kvstore::ResultCode UpdateEdgeProcessor::collectVertexProps(
iter->val(),
this->spaceId_,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Can't find the schema for tagId " << tagId;
// It offen happens after updating schema but current storaged has not
// load it. To protect the data, we just return failed to graphd.
return kvstore::ResultCode::ERR_UNKNOWN;
}
const auto constSchema = reader->getSchema();
for (auto& prop : props) {
auto res = RowReader::getPropByName(reader.get(), prop.prop_.name);
Expand Down Expand Up @@ -147,6 +153,15 @@ kvstore::ResultCode UpdateEdgeProcessor::collectEdgesProps(
val_,
this->spaceId_,
std::abs(edgeKey.edge_type));
if (reader == nullptr) {
LOG(WARNING) << "Can't find related edge "
<< edgeKey.edge_type << " schema";
// TODO(heng)
// The case offen happens when updating the reverse edge but it is not exist.
// Because we don't ensure the consistency when inserting edges (Bidirect).
// So we leave the issue here. Now we just return failed to graphd.
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
const auto constSchema = reader->getSchema();
for (auto index = 0UL; index < constSchema->getNumFields(); index++) {
auto propName = std::string(constSchema->getFieldName(index));
Expand Down Expand Up @@ -314,10 +329,15 @@ std::string UpdateEdgeProcessor::updateAndWriteBack(PartitionID partId,


FilterResult UpdateEdgeProcessor::checkFilter(const PartitionID partId,
const cpp2::EdgeKey& edgeKey) {
const cpp2::EdgeKey& edgeKey) {
auto ret = collectEdgesProps(partId, edgeKey);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return FilterResult::E_ERROR;
switch (ret) {
case kvstore::ResultCode::SUCCEEDED:
break;
case kvstore::ResultCode::ERR_CORRUPT_DATA:
return FilterResult::E_BAD_SCHEMA;
default:
return FilterResult::E_ERROR;
}
for (auto& tc : this->tagContexts_) {
VLOG(3) << "partId " << partId << ", vId " << edgeKey.src
Expand Down Expand Up @@ -497,15 +517,25 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
handleLeaderChanged(this->spaceId_, partId);
break;
}
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED
&& filterResult_ == FilterResult::E_FILTER_OUT) {
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
} else if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED
&& filterResult_ == FilterResult::E_ERROR) {
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) {
switch (filterResult_) {
case FilterResult::E_FILTER_OUT:
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
break;
case FilterResult::E_ERROR:
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
break;
case FilterResult::E_BAD_SCHEMA:
this->pushResultCode(cpp2::ErrorCode::E_EDGE_NOT_FOUND, partId);
break;
default:
this->pushResultCode(to(code), partId);
break;
}
} else {
this->pushResultCode(to(code), partId);
}
Expand Down
45 changes: 33 additions & 12 deletions src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ kvstore::ResultCode UpdateVertexProcessor::collectVertexProps(
iter->val(),
this->spaceId_,
tagId);
if (reader == nullptr) {
LOG(WARNING) << "Can't find the schema for tagId " << tagId;
// It offen happens after updating schema but current storaged has not
// load it. To protect the data, we just return failed to graphd.
return kvstore::ResultCode::ERR_CORRUPT_DATA;
}
const auto constSchema = reader->getSchema();
for (auto& prop : props) {
auto res = RowReader::getPropByName(reader.get(), prop.prop_.name);
Expand Down Expand Up @@ -157,8 +163,13 @@ FilterResult UpdateVertexProcessor::checkFilter(const PartitionID partId, const
VLOG(3) << "partId " << partId << ", vId " << vId
<< ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size();
auto ret = collectVertexProps(partId, vId, tc.tagId_, tc.props_);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return FilterResult::E_ERROR;
switch (ret) {
case kvstore::ResultCode::SUCCEEDED:
break;
case kvstore::ResultCode::ERR_CORRUPT_DATA:
return FilterResult::E_BAD_SCHEMA;
default:
return FilterResult::E_ERROR;
}
}

Expand Down Expand Up @@ -425,6 +436,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
// Fallthrough
case FilterResult::E_ERROR:
// Fallthrough
case FilterResult::E_BAD_SCHEMA:
default: {
return folly::none;
}
Expand All @@ -442,16 +454,25 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
handleLeaderChanged(this->spaceId_, partId);
break;
}
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED
&& filterResult_ == FilterResult::E_FILTER_OUT) {
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
} else if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED
&& filterResult_ == FilterResult::E_ERROR) {
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) {
switch (filterResult_) {
case FilterResult::E_FILTER_OUT:
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
break;
case FilterResult::E_ERROR:
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
break;
case FilterResult::E_BAD_SCHEMA:
this->pushResultCode(cpp2::ErrorCode::E_TAG_NOT_FOUND, partId);
break;
default:
this->pushResultCode(to(code), partId);
break;
}
} else {
this->pushResultCode(to(code), partId);
}
Expand Down
66 changes: 66 additions & 0 deletions src/storage/test/UpdateEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,72 @@ TEST(UpdateEdgeTest, Insertable_Test) {
EXPECT_STREQ("", boost::get<std::string>(v3).c_str());
}


TEST(UpdateEdgeTest, CorruptDataTest) {
fs::TempDir rootPath("/tmp/UpdateEdgeTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());
LOG(INFO) << "Prepare meta...";
auto schemaMan = TestUtils::mockSchemaMan();
auto indexMan = TestUtils::mockIndexMan();
LOG(INFO) << "Write an edge with empty value!";

// partId, srcId, edgeType, rank, dstId, version
auto key = NebulaKeyUtils::edgeKey(0, 10, 101, 0, 11, 0);
std::vector<kvstore::KV> data;
data.emplace_back(std::make_pair(key, ""));
folly::Baton<> baton;
kv->asyncMultiPut(0, 0, std::move(data),
[&](kvstore::ResultCode code) {
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED);
baton.post();
});
baton.wait();

LOG(INFO) << "Build UpdateEdgeRequest...";
GraphSpaceID spaceId = 0;
PartitionID partId = 0;
VertexID srcId = 10;
VertexID dstId = 11;
// src = 1, edge_type = 101, ranking = 0, dst = 10001
Copy link
Contributor

@panda-sheep panda-sheep Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update src = 10 , dst = 11 or remove the comment ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding. Let's me take care of it in next pr.

storage::cpp2::EdgeKey edgeKey;
edgeKey.set_src(srcId);
edgeKey.set_edge_type(101);
edgeKey.set_ranking(0);
edgeKey.set_dst(dstId);

cpp2::UpdateEdgeRequest req;
req.set_space_id(spaceId);
req.set_edge_key(edgeKey);
req.set_part_id(partId);
req.set_filter("");
LOG(INFO) << "Build update items...";
std::vector<cpp2::UpdateItem> items;
// string: 101.col_10 = string_col_10_2_new
cpp2::UpdateItem item;
item.set_name("101");
item.set_prop("col_10");
std::string col10new("string_col_10_2_new");
PrimaryExpression val2(col10new);
item.set_value(Expression::encode(&val2));
items.emplace_back(item);
req.set_update_items(std::move(items));
req.set_insertable(true);

LOG(INFO) << "Test UpdateEdgeRequest...";
auto* processor = UpdateEdgeProcessor::instance(kv.get(),
schemaMan.get(),
indexMan.get(),
nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();


EXPECT_EQ(1, resp.result.failed_codes.size());
EXPECT_TRUE(cpp2::ErrorCode::E_EDGE_NOT_FOUND == resp.result.failed_codes[0].code);
EXPECT_EQ(0, resp.result.failed_codes[0].part_id);
}

} // namespace storage
} // namespace nebula

Expand Down
57 changes: 57 additions & 0 deletions src/storage/test/UpdateVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,63 @@ TEST(UpdateVertexTest, Invalid_Filter_Test) {
== resp.result.failed_codes[0].code);
}

TEST(UpdateVertexTest, CorruptDataTest) {
fs::TempDir rootPath("/tmp/UpdateVertexTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());

LOG(INFO) << "Prepare meta...";
auto schemaMan = TestUtils::mockSchemaMan();
auto indexMan = TestUtils::mockIndexMan();
LOG(INFO) << "Write a vertex with empty value!";

// partId, srcId, tagId, version
auto key = NebulaKeyUtils::vertexKey(0, 10, 3001, 0);
std::vector<kvstore::KV> data;
data.emplace_back(std::make_pair(key, ""));
folly::Baton<> baton;
kv->asyncMultiPut(0, 0, std::move(data),
[&](kvstore::ResultCode code) {
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED);
baton.post();
});
baton.wait();

LOG(INFO) << "Build UpdateVertexRequest...";
GraphSpaceID spaceId = 0;
PartitionID partId = 0;
VertexID vertexId = 10;
cpp2::UpdateVertexRequest req;
req.set_space_id(spaceId);
req.set_vertex_id(vertexId);
req.set_part_id(partId);
req.set_filter("");
LOG(INFO) << "Build update items...";
std::vector<cpp2::UpdateItem> items;
// int: 3001.tag_3001_col_0 = 1L
cpp2::UpdateItem item;
item.set_name("3001");
item.set_prop("tag_3001_col_0");
PrimaryExpression val(1L);
item.set_value(Expression::encode(&val));
items.emplace_back(item);
req.set_update_items(std::move(items));
req.set_insertable(false);

LOG(INFO) << "Test UpdateVertexRequest...";
auto* processor = UpdateVertexProcessor::instance(kv.get(),
schemaMan.get(),
indexMan.get(),
nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
EXPECT_EQ(1, resp.result.failed_codes.size());
EXPECT_TRUE(cpp2::ErrorCode::E_TAG_NOT_FOUND == resp.result.failed_codes[0].code);
EXPECT_EQ(0, resp.result.failed_codes[0].part_id);
}

} // namespace storage
} // namespace nebula

Expand Down