Skip to content

Commit

Permalink
[fix](merge-on-write) fix duplicate key in schema change (#25705)
Browse files Browse the repository at this point in the history
It should be ensured that the obtained versions are continuous when calculate delete bitmap calculations in publish.
The remaining NOTREADY tablet in the schema change failure should be dropped.
When a rowset was deleted, the delete bitmap cannot be deleted until there are no read requests to use the rowset.
  • Loading branch information
liaoxin01 authored Oct 25, 2023
1 parent 8a8ae44 commit f31c1d8
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 54 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
std::vector<RowsetSharedPtr> tmp_rowsets {};

// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (_tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
if (_tablet->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
<< _tablet->tablet_id();
return Status::OK();
Expand Down
16 changes: 4 additions & 12 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
std::lock_guard<std::shared_mutex> lck(tablet->get_header_lock());
int64_t cur_max_version = tablet->max_version_unlocked().second;
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
// Disable 'partial_update' when the tablet is undergoing a 'schema changing process'
if (_req.table_schema_param->is_partial_update()) {
return Status::InternalError(
Expand All @@ -122,7 +121,7 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
}
_rowset_ids.clear();
} else {
_rowset_ids = tablet->all_rs_id(cur_max_version);
RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids));
}
_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
mow_context =
Expand Down Expand Up @@ -238,8 +237,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_submit_delete_bitmap_timer);
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< tablet->tablet_id() << " txn_id: " << _req.txn_id;
Expand All @@ -248,11 +246,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) {
return Status::OK();
}
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(
Expand Down Expand Up @@ -293,8 +286,7 @@ Status RowsetBuilder::commit_txn() {
auto tablet = static_cast<Tablet*>(_tablet.get());
if (tablet->enable_unique_key_merge_on_write() &&
config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0 &&
!(tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(tablet->tablet_id()))) {
tablet->tablet_state() != TABLET_NOTREADY) {
auto st = tablet->check_delete_bitmap_correctness(
_delete_bitmap, _rowset->end_version() - 1, _req.txn_id, _rowset_ids);
if (!st.ok()) {
Expand Down
13 changes: 3 additions & 10 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,12 +912,9 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
res = _convert_historical_rowsets(sc_params);
if (new_tablet->keys_type() != UNIQUE_KEYS ||
!new_tablet->enable_unique_key_merge_on_write() || !res) {
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
if (!res) {
break;
Expand Down Expand Up @@ -977,10 +974,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}

// step 4
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,13 @@ void StorageEngine::start_delete_unused_rowset() {
VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id()
<< ", version:" << it->second->version().first << "-"
<< it->second->version().second;
auto tablet_id = it->second->rowset_meta()->tablet_id();
auto tablet = _tablet_manager->get_tablet(tablet_id);
// delete delete_bitmap of unused rowsets
if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0},
{it->second->rowset_id(), UINT32_MAX, 0});
}
Status status = it->second->remove();
VLOG_NOTICE << "remove rowset:" << it->second->rowset_id()
<< " finished. status:" << status;
Expand Down
32 changes: 19 additions & 13 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3300,7 +3300,8 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
<< tablet_id() << " cur max_version: " << cur_version;
return Status::OK();
}
RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
RowsetIdUnorderedSet cur_rowset_ids;
RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));

Expand Down Expand Up @@ -3349,7 +3350,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
{
std::shared_lock meta_rlock(_meta_lock);
cur_version = max_version_unlocked().second;
cur_rowset_ids = all_rs_id(cur_version);
RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
Expand Down Expand Up @@ -3390,13 +3391,12 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
{
std::shared_lock meta_rlock(_meta_lock);
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(tablet_id())) {
if (tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
<< tablet_id();
return Status::OK();
}
cur_rowset_ids = all_rs_id(cur_version - 1);
RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
}
auto t2 = watch.get_elapse_time_us();

Expand Down Expand Up @@ -3432,7 +3432,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ", cur version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) {
Expand Down Expand Up @@ -3564,18 +3564,24 @@ Status Tablet::check_rowid_conversion(
return Status::OK();
}

RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const {
RowsetIdUnorderedSet rowset_ids;
for (const auto& rs_it : _rs_version_map) {
if (rs_it.first.second == 1) {
Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const {
// Ensure that the obtained versions of rowsets are continuous
std::vector<Version> version_path;
RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), &version_path));
for (auto& ver : version_path) {
if (ver.second == 1) {
// [0-1] rowset is empty for each tablet, skip it
continue;
}
if (rs_it.first.second <= max_version) {
rowset_ids.insert(rs_it.second->rowset_id());
auto it = _rs_version_map.find(ver);
if (it == _rs_version_map.end()) {
return Status::Error<CAPTURE_ROWSET_ERROR, false>(
"fail to find Rowset for version. tablet={}, version={}", tablet_id(),
ver.to_string());
}
rowset_ids->emplace(it->second->rowset_id());
}
return rowset_ids;
return Status::OK();
}

bool Tablet::check_all_rowset_segment() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ class Tablet final : public BaseTablet {
RowsetSharedPtr dst_rowset,
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
location_map);
RowsetIdUnorderedSet all_rs_id(int64_t max_version) const;
Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const;
void sort_block(vectorized::Block& in_block, vectorized::Block& output_block);

bool check_all_rowset_segment();
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,11 +747,6 @@ void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
++it;
}
}
// delete delete_bitmap of to_delete's rowsets if not added to _stale_rs_metas.
if (same_version && _enable_unique_key_merge_on_write) {
delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0},
{rs_to_del->rowset_id(), UINT32_MAX, 0});
}
}
if (!same_version) {
// put to_delete rowsets in _stale_rs_metas.
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ Status EngineCloneTask::_do_clone() {
// Check local tablet exist or not
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);

// The status of a tablet is not ready, indicating that it is a residual tablet after a schema
// change failure. It should not provide normal read and write, so drop it here.
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id="
<< tablet->tablet_id();
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
// try to incremental clone
std::vector<Version> missed_versions;
Expand Down
32 changes: 21 additions & 11 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,24 @@ Status EnginePublishVersionTask::finish() {
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
tablet_info.tablet_id, version.second, transaction_id);
}
Version max_version;
int64_t max_version;
TabletState tablet_state;
{
std::shared_lock rdlock(tablet->get_header_lock());
max_version = tablet->max_version_unlocked();
max_version = tablet->max_version_unlocked().second;
tablet_state = tablet->tablet_state();
}
if (tablet_state == TabletState::TABLET_RUNNING &&
version.first != max_version.second + 1) {
// If a tablet migrates out and back, the previously failed
// publish task may retry on the new tablet, so check
// whether the version exists. if not exist, then set
// publish failed
if (!tablet->check_version_exist(version)) {
if (version.first != max_version + 1) {
if (tablet->check_version_exist(version)) {
continue;
}
auto handle_version_not_continuous = [&]() {
add_error_tablet_id(tablet_info.tablet_id);
_discontinuous_version_tablets->emplace_back(
partition_id, tablet_info.tablet_id, version.first);
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"check_version_exist failed");
int64_t missed_version = max_version.second + 1;
int64_t missed_version = max_version + 1;
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
Expand All @@ -186,8 +184,20 @@ Status EnginePublishVersionTask::finish() {
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
};
// The versions during the schema change period need to be also continuous
if (tablet_state == TabletState::TABLET_NOTREADY) {
Version max_continuous_version = {-1, 0};
tablet->max_continuous_version_from_beginning(&max_continuous_version);
if (max_version > 1 && version.first > max_version &&
max_continuous_version.second != max_version) {
handle_version_not_continuous();
continue;
}
} else {
handle_version_not_continuous();
continue;
}
continue;
}
}

Expand Down

0 comments on commit f31c1d8

Please sign in to comment.