From f31c1d858a2e4efdceda91228084b81fcab84439 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Wed, 25 Oct 2023 18:59:48 +0800 Subject: [PATCH] [fix](merge-on-write) fix duplicate key in schema change (#25705) It should be ensured that the obtained versions are continuous when calculate delete bitmap calculations in publish. The remaining NOTREADY tablet in the schema change failure should be dropped. When a rowset was deleted, the delete bitmap cannot be deleted until there are no read requests to use the rowset. --- be/src/olap/full_compaction.cpp | 3 +- be/src/olap/rowset_builder.cpp | 16 +++------- be/src/olap/schema_change.cpp | 13 ++------ be/src/olap/storage_engine.cpp | 7 ++++ be/src/olap/tablet.cpp | 32 +++++++++++-------- be/src/olap/tablet.h | 2 +- be/src/olap/tablet_meta.cpp | 5 --- be/src/olap/task/engine_clone_task.cpp | 10 ++++++ .../olap/task/engine_publish_version_task.cpp | 32 ++++++++++++------- 9 files changed, 66 insertions(+), 54 deletions(-) diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 393bbd8c579fb3..246056794c9a5d 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -147,8 +147,7 @@ Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP std::vector tmp_rowsets {}; // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + if (_tablet->tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" << _tablet->tablet_id(); return Status::OK(); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 85b72683007d67..afe7fd385ff235 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -112,8 +112,7 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr& mow_context) std::lock_guard lck(tablet->get_header_lock()); int64_t cur_max_version = tablet->max_version_unlocked().second; // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + if (tablet->tablet_state() == TABLET_NOTREADY) { // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' if (_req.table_schema_param->is_partial_update()) { return Status::InternalError( @@ -122,7 +121,7 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr& mow_context) } _rowset_ids.clear(); } else { - _rowset_ids = tablet->all_rs_id(cur_max_version); + RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids)); } _delete_bitmap = std::make_shared(tablet->tablet_id()); mow_context = @@ -238,8 +237,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { std::lock_guard l(_lock); SCOPED_TIMER(_submit_delete_bitmap_timer); // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) { + if (tablet->tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " "tablet_id: " << tablet->tablet_id() << " txn_id: " << _req.txn_id; @@ -248,11 +246,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { auto beta_rowset = reinterpret_cast(_rowset.get()); std::vector segments; RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) { - return Status::OK(); - } if (segments.size() > 1) { // calculate delete bitmap between segments RETURN_IF_ERROR( @@ -293,8 +286,7 @@ Status RowsetBuilder::commit_txn() { auto tablet = static_cast(_tablet.get()); if (tablet->enable_unique_key_merge_on_write() && config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0 && - !(tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet->tablet_id()))) { + tablet->tablet_state() != TABLET_NOTREADY) { auto st = tablet->check_delete_bitmap_correctness( _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, _rowset_ids); if (!st.ok()) { diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 2e732b27ac2aa2..e331564d9d28e3 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -912,12 +912,9 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& _tablet_ids_in_converting.insert(new_tablet->tablet_id()); } res = _convert_historical_rowsets(sc_params); - if (new_tablet->keys_type() != UNIQUE_KEYS || - !new_tablet->enable_unique_key_merge_on_write() || !res) { - { - std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.erase(new_tablet->tablet_id()); - } + { + std::lock_guard wrlock(_mutex); + _tablet_ids_in_converting.erase(new_tablet->tablet_id()); } if (!res) { break; @@ -977,10 +974,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } // step 4 - { - std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.erase(new_tablet->tablet_id()); - } res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); if (!res) { break; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index f17a6de84146cd..3bdc6967af19d8 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1081,6 +1081,13 @@ void StorageEngine::start_delete_unused_rowset() { VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id() << ", version:" << it->second->version().first << "-" << it->second->version().second; + auto tablet_id = it->second->rowset_meta()->tablet_id(); + auto tablet = _tablet_manager->get_tablet(tablet_id); + // delete delete_bitmap of unused rowsets + if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { + tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0}, + {it->second->rowset_id(), UINT32_MAX, 0}); + } Status status = it->second->remove(); VLOG_NOTICE << "remove rowset:" << it->second->rowset_id() << " finished. status:" << status; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 937a7d20533ca8..2dbcd46751eff3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3300,7 +3300,8 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) << tablet_id() << " cur max_version: " << cur_version; return Status::OK(); } - RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1); + RowsetIdUnorderedSet cur_rowset_ids; + RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); DeleteBitmapPtr delete_bitmap = std::make_shared(tablet_id()); RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); @@ -3349,7 +3350,7 @@ Status Tablet::commit_phase_update_delete_bitmap( { std::shared_lock meta_rlock(_meta_lock); cur_version = max_version_unlocked().second; - cur_rowset_ids = all_rs_id(cur_version); + RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids)); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); @@ -3390,13 +3391,12 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, { std::shared_lock meta_rlock(_meta_lock); // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet_id())) { + if (tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" << tablet_id(); return Status::OK(); } - cur_rowset_ids = all_rs_id(cur_version - 1); + RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); } auto t2 = watch.get_elapse_time_us(); @@ -3432,7 +3432,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id() << ", rowset_ids to add: " << rowset_ids_to_add.size() << ", rowset_ids to del: " << rowset_ids_to_del.size() - << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << "," + << ", cur version: " << cur_version << ", transaction_id: " << txn_id << "," << ss.str() << " , total rows: " << total_rows; if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) { @@ -3564,18 +3564,24 @@ Status Tablet::check_rowid_conversion( return Status::OK(); } -RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const { - RowsetIdUnorderedSet rowset_ids; - for (const auto& rs_it : _rs_version_map) { - if (rs_it.first.second == 1) { +Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { + // Ensure that the obtained versions of rowsets are continuous + std::vector version_path; + RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), &version_path)); + for (auto& ver : version_path) { + if (ver.second == 1) { // [0-1] rowset is empty for each tablet, skip it continue; } - if (rs_it.first.second <= max_version) { - rowset_ids.insert(rs_it.second->rowset_id()); + auto it = _rs_version_map.find(ver); + if (it == _rs_version_map.end()) { + return Status::Error( + "fail to find Rowset for version. tablet={}, version={}", tablet_id(), + ver.to_string()); } + rowset_ids->emplace(it->second->rowset_id()); } - return rowset_ids; + return Status::OK(); } bool Tablet::check_all_rowset_segment() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 4a074952ab9e4c..58a2c7f8f6c127 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -502,7 +502,7 @@ class Tablet final : public BaseTablet { RowsetSharedPtr dst_rowset, const std::map>>& location_map); - RowsetIdUnorderedSet all_rs_id(int64_t max_version) const; + Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const; void sort_block(vectorized::Block& in_block, vectorized::Block& output_block); bool check_all_rowset_segment(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 201ffd0a2d2f9e..b5f3b4fd36d748 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -747,11 +747,6 @@ void TabletMeta::modify_rs_metas(const std::vector& to_add, ++it; } } - // delete delete_bitmap of to_delete's rowsets if not added to _stale_rs_metas. - if (same_version && _enable_unique_key_merge_on_write) { - delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0}, - {rs_to_del->rowset_id(), UINT32_MAX, 0}); - } } if (!same_version) { // put to_delete rowsets in _stale_rs_metas. diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index dddec4daba8a0d..6edc746da3afa2 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -116,6 +116,16 @@ Status EngineCloneTask::_do_clone() { // Check local tablet exist or not TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id); + + // The status of a tablet is not ready, indicating that it is a residual tablet after a schema + // change failure. It should not provide normal read and write, so drop it here. + if (tablet && tablet->tablet_state() == TABLET_NOTREADY) { + LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id=" + << tablet->tablet_id(); + RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet( + tablet->tablet_id(), tablet->replica_id(), false)); + tablet.reset(); + } bool is_new_tablet = tablet == nullptr; // try to incremental clone std::vector missed_versions; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index c5a8c32b84c2e1..f9e013667bcaaf 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -152,26 +152,24 @@ Status EnginePublishVersionTask::finish() { StorageEngine::instance()->txn_manager()->update_tablet_version_txn( tablet_info.tablet_id, version.second, transaction_id); } - Version max_version; + int64_t max_version; TabletState tablet_state; { std::shared_lock rdlock(tablet->get_header_lock()); - max_version = tablet->max_version_unlocked(); + max_version = tablet->max_version_unlocked().second; tablet_state = tablet->tablet_state(); } - if (tablet_state == TabletState::TABLET_RUNNING && - version.first != max_version.second + 1) { - // If a tablet migrates out and back, the previously failed - // publish task may retry on the new tablet, so check - // whether the version exists. if not exist, then set - // publish failed - if (!tablet->check_version_exist(version)) { + if (version.first != max_version + 1) { + if (tablet->check_version_exist(version)) { + continue; + } + auto handle_version_not_continuous = [&]() { add_error_tablet_id(tablet_info.tablet_id); _discontinuous_version_tablets->emplace_back( partition_id, tablet_info.tablet_id, version.first); res = Status::Error( "check_version_exist failed"); - int64_t missed_version = max_version.second + 1; + int64_t missed_version = max_version + 1; int64_t missed_txn_id = StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( tablet->tablet_id(), missed_version); @@ -186,8 +184,20 @@ Status EnginePublishVersionTask::finish() { } else { LOG_EVERY_SECOND(INFO) << msg; } + }; + // The versions during the schema change period need to be also continuous + if (tablet_state == TabletState::TABLET_NOTREADY) { + Version max_continuous_version = {-1, 0}; + tablet->max_continuous_version_from_beginning(&max_continuous_version); + if (max_version > 1 && version.first > max_version && + max_continuous_version.second != max_version) { + handle_version_not_continuous(); + continue; + } + } else { + handle_version_not_continuous(); + continue; } - continue; } }