Skip to content

Commit

Permalink
[fix](merge-on-write) fix duplicate key in schema change (#24782)
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 authored and xiaokang committed Sep 22, 2023
1 parent 72cee27 commit 12fc3b4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 21 deletions.
7 changes: 0 additions & 7 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,6 @@ class Rowset : public std::enable_shared_from_this<Rowset> {

bool check_rowset_segment();

bool start_publish() {
bool expect = false;
return _is_publish_running.compare_exchange_strong(expect, true);
}
void finish_publish() { _is_publish_running.store(false); }

[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }

protected:
Expand Down Expand Up @@ -346,7 +340,6 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
// rowset state machine
RowsetStateMachine _rowset_state_machine;
std::atomic<uint64_t> _delayed_expired_timestamp = 0;
std::atomic<bool> _is_publish_running {false};
};

} // namespace doris
1 change: 0 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3331,7 +3331,6 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
std::vector<segment_v2::SegmentSharedPtr> segments;
_load_rowset_segments(rowset, &segments);

std::lock_guard<std::mutex> rwlock(_rowset_update_lock);
{
std::shared_lock meta_rlock(_meta_lock);
// tablet is under alter process. The delete bitmap will be calculated after conversion.
Expand Down
20 changes: 7 additions & 13 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono> // IWYU pragma: keep
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <set>
#include <shared_mutex>
Expand Down Expand Up @@ -252,14 +253,12 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
}

void TabletPublishTxnTask::handle() {
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
if (!_rowset->start_publish()) {
LOG(WARNING) << "publish is running. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
std::unique_lock<std::mutex> rowset_update_lock(_tablet->get_rowset_update_lock(),
std::defer_lock);
if (_tablet->enable_unique_key_merge_on_write()) {
rowset_update_lock.lock();
}
Defer defer {[&] { _rowset->finish_publish(); }};
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (publish_status != Status::OK()) {
Expand Down Expand Up @@ -295,6 +294,7 @@ void TabletPublishTxnTask::handle() {
}

void AsyncTabletPublishTask::handle() {
std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
Expand All @@ -305,12 +305,6 @@ void AsyncTabletPublishTask::handle() {
return;
}
RowsetSharedPtr rowset = iter->second;
if (!rowset->start_publish()) {
LOG(WARNING) << "publish is running. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
return;
}
Defer defer {[&] { rowset->finish_publish(); }};
Version version(_version, _version);
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, version, &_stats);
Expand Down

0 comments on commit 12fc3b4

Please sign in to comment.