Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tablet erros when close_wait return error #9619

Merged
merged 9 commits into from
May 22, 2022
22 changes: 2 additions & 20 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ Status DeltaWriter::close() {
return Status::OK();
}

Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken) {
Status DeltaWriter::close_wait() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
Expand All @@ -303,15 +301,7 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
}

// return error if previous flush failed
Status s = _flush_token->wait();
if (!s.ok()) {
#ifndef BE_TEST
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(_tablet->tablet_id());
tablet_error->set_msg(s.get_error_msg());
#endif
return s;
}
RETURN_NOT_OK(_flush_token->wait());

// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
Expand All @@ -327,14 +317,6 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
return res;
}

#ifndef BE_TEST
if (!is_broken) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(_tablet->tablet_id());
tablet_info->set_schema_hash(_tablet->schema_hash());
}
#endif

_delta_written_success = true;

const FlushStatistic& stat = _flush_token->get_stats();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ class DeltaWriter {
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken);
Status close_wait();

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Expand All @@ -91,6 +89,8 @@ class DeltaWriter {

int64_t tablet_id() { return _tablet->tablet_id(); }

int32_t schema_hash() { return _tablet->schema_hash(); }

int64_t save_mem_consumption_snapshot();

int64_t get_mem_consumption_snapshot() const;
Expand Down Expand Up @@ -133,4 +133,4 @@ class DeltaWriter {
int64_t _mem_consumption_snapshot = 0;
};

} // namespace doris
} // namespace doris
21 changes: 18 additions & 3 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge it.
writer->close_wait(
tablet_vec, tablet_errors,
(_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
_close_wait(writer, tablet_vec, tablet_errors);
}
}
return Status::OK();
}

void TabletsChannel::_close_wait(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
Status st = writer->close_wait();
if (st.ok()) {
if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
tablet_info->set_schema_hash(writer->schema_hash());
}
} else {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(writer->tablet_id());
tablet_error->set_msg(st.get_error_msg());
}
}

Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class TabletsChannel {
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);

// deal with DeltaWriter close_wait(), add tablet to list for return.
void _close_wait(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);

// id of this load channel
TabletsChannelKey _key;

Expand Down
12 changes: 6 additions & 6 deletions be/test/olap/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);

Expand All @@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);

Expand Down Expand Up @@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down Expand Up @@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) {

res = delta_writer->close();
ASSERT_TRUE(res.ok());
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());

// publish version success
Expand Down Expand Up @@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down Expand Up @@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {

res = delta_writer->close();
ASSERT_TRUE(res.ok());
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());

// publish version success
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/engine_storage_migration_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down