Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

namespace doris {

bvar::Adder<int64_t> g_cloud_commit_rowset_count("cloud_commit_rowset_count");
bvar::Adder<int64_t> g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count");

CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteRequest& req,
RuntimeProfile* profile, const UniqueId& load_id)
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
Expand Down Expand Up @@ -108,10 +111,12 @@ const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() {
}

Status CloudDeltaWriter::commit_rowset() {
g_cloud_commit_rowset_count << 1;
std::lock_guard<bthread::Mutex> lock(_mtx);

// Handle empty rowset (no data written)
if (!_is_init) {
g_cloud_commit_empty_rowset_count << 1;
return _commit_empty_rowset();
}

Expand Down
37 changes: 35 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1706,28 +1706,61 @@ Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version
return a.first < b.first;
});

// During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls
// fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole
// filling for versions <= alter_version to prevent:
// 1. Abnormal compaction score calculations for schema change tablets
// 2. Unexpected -235 errors during load operations
// This allows schema change to proceed normally while still permitting hole filling for versions
// beyond the alter_version threshold.
bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY;
if (is_schema_change_tablet && tablet->alter_version() <= 1) {
LOG(INFO) << "Skip version hole filling for new schema change tablet "
<< tablet->tablet_id() << " with alter_version " << tablet->alter_version();
return Status::OK();
}

int64_t last_version = -1;
for (const Version& version : existing_versions) {
VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": ["
<< version.first << ", " << version.second << "]";
// missing versions are those that are not in the existing_versions
if (version.first > last_version + 1) {
// there is a hole between versions
auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
// Skip hole filling for versions <= alter_version during schema change
if (is_schema_change_tablet && ver <= tablet->alter_version()) {
continue;
}
RowsetSharedPtr hole_rowset;
RETURN_IF_ERROR(create_empty_rowset_for_hole(
tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
hole_rowsets.push_back(hole_rowset);
}
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1
<< " to " << version.first - 1 << " for tablet " << tablet->tablet_id();
<< " to " << version.first - 1 << " for tablet " << tablet->tablet_id()
<< (is_schema_change_tablet
? (", schema change tablet skipped filling versions <= " +
std::to_string(tablet->alter_version()))
: "");
}
last_version = version.second;
}

if (last_version + 1 <= max_version) {
LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to "
<< max_version << " for tablet " << tablet->tablet_id();
<< max_version << " for tablet " << tablet->tablet_id()
<< (is_schema_change_tablet
? (", schema change tablet skipped filling versions <= " +
std::to_string(tablet->alter_version()))
: "");
// there is a hole after the last existing version
for (; last_version + 1 <= max_version; ++last_version) {
// Skip hole filling for versions <= alter_version during schema change
if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) {
continue;
}
RowsetSharedPtr hole_rowset;
auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back());
RETURN_IF_ERROR(create_empty_rowset_for_hole(
Expand Down
13 changes: 12 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/cloud.pb.h>

#include <algorithm>
#include <chrono>
#include <memory>
#include <random>
Expand Down Expand Up @@ -459,12 +460,22 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator));
TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap()->delete_bitmap.clear();
tmp_meta->clear_rowsets();
// Keep only version [0-1] rowset, other rowsets will be added in _output_rowsets
auto& rs_metas = tmp_meta->all_mutable_rs_metas();
rs_metas.erase(std::remove_if(rs_metas.begin(), rs_metas.end(),
[](const RowsetMetaSharedPtr& rs_meta) {
return !(rs_meta->version().first == 0 &&
rs_meta->version().second == 1);
}),
rs_metas.end());

std::shared_ptr<CloudTablet> tmp_tablet =
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
// Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
tmp_tablet->set_alter_version(alter_version);
}

// step 1, process incremental rowset without delete bitmap update lock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
7140 240

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") {
for (int i = 0; i < 20; i++) {
sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7, 8.8,
'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
sleep(20)
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_schema_change_with_empty_rowset", "p0,nonConcurrent") {
def custoBeConfig = [
max_tablet_version_num : 100
]

setBeConfigTemporary(custoBeConfig) {
def tableName = "test_sc_with_empty_rowset"

def getJobState = { tbl ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}

sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(11) NULL,
`k2` tinyint(4) NULL,
`k3` smallint(6) NULL,
`k4` int(30) NULL,
`k5` largeint(40) NULL,
`k6` float NULL,
`k7` double NULL,
`k8` decimal(9, 0) NULL,
`k9` char(10) NULL,
`k10` varchar(1024) NULL,
`k11` text NULL,
`k12` date NULL,
`k13` datetime NULL
) ENGINE=OLAP
UNIQUE KEY(k1, k2, k3)
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
"""

for (int i = 0; i < 100; i++) {
sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8,
'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
}


// trigger compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "cumulative")

sql """ alter table ${tableName} modify column k4 string NULL"""

for (int i = 100; i < 120; i++) {
sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8,
'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
sleep(20)
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
{
String res = getJobState(tableName)
if (res == "FINISHED" || res == "CANCELLED") {
assertEquals("FINISHED", res)
return true
}
return false
}
)

qt_sql """ select sum(k1), sum(k2) from ${tableName} """
}
}

Loading