Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,15 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab
Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data,
bool sync_delete_bitmap, bool full_sync,
SyncRowsetStats* sync_stats) {
std::unique_lock lock {tablet->get_sync_meta_lock()};
return sync_tablet_rowsets_unlocked(tablet, lock, warmup_delta_data, sync_delete_bitmap,
full_sync, sync_stats);
}

Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
std::unique_lock<bthread::Mutex>& lock,
bool warmup_delta_data, bool sync_delete_bitmap,
bool full_sync, SyncRowsetStats* sync_stats) {
using namespace std::chrono;

TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);
Expand Down Expand Up @@ -554,6 +563,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
}
tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
}
DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
if (target_tablet_id == tablet->tablet_id()) {
DBUG_BLOCK
}
});
{
const auto& stats = resp.stats();
std::unique_lock wlock(tablet->get_header_lock());
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class CloudMetaMgr {
Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false,
bool sync_delete_bitmap = true, bool full_sync = false,
SyncRowsetStats* sync_stats = nullptr);
Status sync_tablet_rowsets_unlocked(
CloudTablet* tablet, std::unique_lock<bthread::Mutex>& lock /* _sync_meta_lock */,
bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false,
SyncRowsetStats* sync_stats = nullptr);

Status prepare_rowset(const RowsetMeta& rs_meta,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_eng
CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;

Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.block", DBUG_BLOCK);
// new tablet has to exist
_new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id));
if (_new_tablet->tablet_state() == TABLET_RUNNING) {
Expand Down Expand Up @@ -399,6 +400,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
_new_tablet->tablet_id());
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected retryable error");
});
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job",
DBUG_BLOCK);
auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
if (!st.ok()) {
if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
Expand All @@ -416,6 +419,9 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
const auto& stats = finish_resp.stats();
{
// to prevent the converted historical rowsets be replaced by rowsets written on new tablet
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
Expand Down
9 changes: 5 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,16 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data,
}

// serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);
std::unique_lock lock(_sync_meta_lock);
if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
if (_max_version >= query_version) {
return Status::OK();
}
}

auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data, true, false, stats);
auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, warmup_delta_data, true,
false, stats);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
Expand All @@ -181,7 +182,7 @@ Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
}

// Serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);
std::unique_lock lock(_sync_meta_lock);

{
std::shared_lock rlock(_meta_lock);
Expand Down Expand Up @@ -216,7 +217,7 @@ Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
_max_version = -1;
}

st = _engine.meta_mgr().sync_tablet_rowsets(this, false, true, false, stats);
st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, false, true, false, stats);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class CloudTablet final : public BaseTablet {

std::mutex& get_rowset_update_lock() { return _rowset_update_lock; }

bthread::Mutex& get_sync_meta_lock() { return _sync_meta_lock; }

const auto& rowset_map() const { return _rs_version_map; }

// Merge all rowset schemas within a CloudTablet
Expand Down Expand Up @@ -237,6 +239,7 @@ class CloudTablet final : public BaseTablet {

// this mutex MUST ONLY be used when sync meta
bthread::Mutex _sync_meta_lock;
// ATTENTION: lock order should be: _sync_meta_lock -> _meta_lock

std::atomic<int64_t> _cumulative_point {-1};
std::atomic<int64_t> _approximate_num_rowsets {-1};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 10
2 2 2 20
3 3 3 30

-- !dup_key_count --

-- !sql --
1 1 1 10
100 100 100 1
2 2 2 20
3 3 3 30

Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_cloud_sc_convert_data_replaced_on_new_tablet", "nonConcurrent") {
if (!isCloudMode()) {
return
}

GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()

def table1 = "test_cloud_sc_convert_data_replaced_on_new_tablet"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1",
"function_column.sequence_col" = "c3"
); """

sql "insert into ${table1} values(1,1,1,10);" // 2
sql "insert into ${table1} values(2,2,2,20);" // 3
sql "insert into ${table1} values(3,3,3,30);" // 4
sql "sync;"
qt_sql "select * from ${table1} order by k1;"

def backends = sql_return_maparray('show backends')
def tabletStats = sql_return_maparray("show tablets from ${table1};")
assert tabletStats.size() == 1
def tabletId = tabletStats[0].TabletId
def tabletBackendId = tabletStats[0].BackendId
def tabletBackend
for (def be : backends) {
if (be.BackendId == tabletBackendId) {
tabletBackend = be
break;
}
}
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");

try {
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::process_alter_tablet.block")
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job")
// GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.modify_tablet_meta")
sql "alter table ${table1} modify column c2 varchar(100);"

Thread.sleep(1000)

tabletStats = sql_return_maparray("show tablets from ${table1};")
def newTabletId = "-1"
for (def stat : tabletStats) {
if (stat.TabletId != tabletId) {
newTabletId = stat.TabletId
break
}
}
logger.info("new_tablet_id: ${newTabletId}")

sql "insert into ${table1} values(1,99,99,1);" // 5

// let query trigger sync_rowsets to make base tablet's max version be 5
sql "select * from ${table1} order by k1;"

Thread.sleep(1000)
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::process_alter_tablet.block")
Thread.sleep(1000)

// to trigger sync_rowsets on new tablet and block before modify be's tablet meta
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", [tablet_id: newTabletId])
def t1 = Thread.start {
sql "insert into ${table1} values(100,100,100,1);"
}

Thread.sleep(1000)

// let sc modify tablet meta first
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job")
Thread.sleep(1000)
// and load's sync_rowsets() on new tablet modify be's tablet meta
GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta")

t1.join()

waitForSchemaChangeDone {
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """
time 1000
}

qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;"
order_qt_sql "select * from ${table1};"

} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
}
}
Loading