Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (_input_rowsets.size() == 1) {
DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
// MUST NOT move input rowset to stale path
cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);
} else {
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
Expand Down
48 changes: 44 additions & 4 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -197,6 +198,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque

SchemaChangeParams sc_params;

// cache schema change output to file cache
std::vector<RowsetSharedPtr> rowsets;
rowsets.resize(rs_splits.size());
std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
[](RowSetSplits& split) { return split.rs_reader->rowset(); });
sc_params.output_to_file_cache = _should_cache_sc_output(rowsets);

RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
sc_params.ref_rowset_readers.reserve(rs_splits.size());
for (RowSetSplits& split : rs_splits) {
Expand Down Expand Up @@ -289,6 +297,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
context.tablet_schema = _new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id);
context.write_file_cache = sc_params.output_to_file_cache;
context.tablet = _new_tablet;
if (!context.storage_resource) {
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
sc_params.vault_id);
Expand Down Expand Up @@ -440,7 +450,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock);
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
_new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
stats.num_rows(), stats.data_size());
Expand Down Expand Up @@ -473,7 +483,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
// Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
tmp_tablet->set_alter_version(alter_version);
}
Expand All @@ -491,7 +501,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
}
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
Expand All @@ -514,7 +524,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
}
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
Expand Down Expand Up @@ -565,4 +575,34 @@ void CloudSchemaChangeJob::clean_up_on_failure() {
}
}

bool CloudSchemaChangeJob::_should_cache_sc_output(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t total_size = 0;
int64_t cached_index_size = 0;
int64_t cached_data_size = 0;

for (const auto& rs : input_rowsets) {
const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
total_size += rs_meta->total_disk_size();
cached_index_size += rs->approximate_cache_index_size();
cached_data_size += rs->approximate_cached_data_size();
}

double input_hit_rate = static_cast<double>(cached_index_size + cached_data_size) / total_size;

LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. "
<< "job_id=" << _job_id << ", input_rowsets_count=" << input_rowsets.size()
<< ", total_size=" << total_size << ", cached_index_size=" << cached_index_size
<< ", cached_data_size=" << cached_data_size << ", input_hit_rate=" << input_hit_rate
<< ", min_hit_ratio_threshold="
<< config::file_cache_keep_schema_change_output_min_hit_ratio << ", should_cache="
<< (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio);

if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) {
return true;
}

return false;
}

} // namespace doris
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_schema_change_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ class CloudSchemaChangeJob {
void clean_up_on_failure();

private:
bool _should_cache_sc_output(const std::vector<RowsetSharedPtr>& input_rowsets);

Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
cloud::TabletJobInfoPB& job);

Status _process_delete_bitmap(int64_t alter_version, int64_t start_calc_delete_bitmap_version,
int64_t initiator);

private:
CloudStorageEngine& _cloud_storage_engine;
std::shared_ptr<CloudTablet> _base_tablet;
std::shared_ptr<CloudTablet> _new_tablet;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_

auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
for (auto& rs : rowsets) {
if (version_overlap || warmup_delta_data) {
if (warmup_delta_data) {
#ifndef BE_TEST
bool warm_up_state_updated = false;
// Warmup rowset data in background
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,10 @@ DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
DEFINE_mBool(enable_file_cache_adaptive_write, "true");
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");

// if difference below this threshold, we consider cache's progressive upgrading (2.0->3.0) successful
DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");

DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,8 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
DECLARE_mBool(enable_file_cache_adaptive_write);
DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ struct SchemaChangeParams {
ObjectPool pool;
int32_t be_exec_version;
std::string vault_id;
bool output_to_file_cache;
};

class SchemaChangeJob {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.concurrent.atomic.AtomicBoolean
import org.apache.doris.regression.suite.ClusterOptions
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.OutputUtils

@groovy.transform.Immutable
class RowsetInfo {
int startVersion
int endVersion
String id
String originalString
}

suite("test_filecache_with_alter_table", "docker") {
def options = new ClusterOptions()
options.cloudMode = true
options.setFeNum(1)
options.setBeNum(1)

options.beConfigs.add('enable_flush_file_cache_async=false')
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
options.beConfigs.add('enable_evict_file_cache_in_advance=false')
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')

def baseTestTable = "test_filecache_with_alter_table"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
def backendId_to_backendBrpcPort = [:]
def csvPathPrefix = "/tmp/temp_csv_data"
def loadBatchNum = 20

def generateCsvData = {
def rowsPerFile = 32768
def columnsPerRow = 4
def headers = 'col1,col2,col3,col4'

def dir = new File(csvPathPrefix)
if (!dir.exists()) {
dir.mkdirs()
} else {
dir.eachFile { it.delete() }
}

long currentNumber = 1L
(1..loadBatchNum).each { fileIndex ->
def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex)
def csvFile = new File(fileName)

csvFile.withWriter('UTF-8') { writer ->
writer.writeLine(headers)
(1..rowsPerFile).each { rowIndex ->
def row = (1..columnsPerRow).collect { currentNumber++ }
writer.writeLine(row.join(','))
}
}
}
logger.info("Successfully generated ${loadBatchNum} CSV files in ${csvPathPrefix}")
}

def getTabletStatus = { tablet ->
String tabletId = tablet.TabletId
String backendId = tablet.BackendId
def beHost = backendId_to_backendIP[backendId]
def beHttpPort = backendId_to_backendHttpPort[backendId]

String command = "curl -s -X GET http://${beHost}:${beHttpPort}/api/compaction/show?tablet_id=${tabletId}"

logger.info("Executing: ${command}")
def process = command.execute()
def exitCode = process.waitFor()
def output = process.getText()

logger.info("Get tablet status response: code=${exitCode}, out=${output}")
assertEquals(0, exitCode, "Failed to get tablet status.")

return parseJson(output.trim())
}

def waitForAlterJobToFinish = { tableName, timeoutMillis ->
def pollInterval = 1000
def timeElapsed = 0
while (timeElapsed <= timeoutMillis) {
def alterResult = sql_return_maparray """SHOW ALTER TABLE COLUMN WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
logger.info("Checking ALTER status for table '${tableName}': ${alterResult}")
if (alterResult && alterResult[0].State == "FINISHED") {
sleep(3000)
logger.info("ALTER job on table '${tableName}' finished. Details: ${alterResult[0]}")
return
}
sleep(pollInterval)
timeElapsed += pollInterval
}
fail("Wait for ALTER job on table '${tableName}' to finish timed out after ${timeoutMillis}ms.")
}

def runSchemaChangeCacheTest = { String testTable, double inputCacheRatio, boolean expectOutputCached ->
logger.info("==================================================================================")
logger.info("Running Test Case on Table '${testTable}': Input Cache Ratio = ${inputCacheRatio * 100}%, Expect Output Cached = ${expectOutputCached}")
logger.info("==================================================================================")

sql """ DROP TABLE IF EXISTS ${testTable} force;"""
sql """
CREATE TABLE IF NOT EXISTS ${testTable} (
col1 bigint,
col2 bigint,
col3 bigint,
col4 bigint
)
UNIQUE KEY(col1)
DISTRIBUTED BY HASH(col1) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true"
)
"""

(1..loadBatchNum).each { fileIndex ->
def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex)
streamLoad {
logger.info("Stream loading file index ${fileIndex} into table ${testTable}")
set "column_separator", ","
table testTable
file fileName
time 3000
check { res, exception, startTime, endTime ->
if (exception != null) throw exception
def json = parseJson(res)
assertEquals("success", json.Status.toLowerCase())
}
}
}
sql """ SELECT COUNT(col1) from ${testTable} """

def tablets = sql_return_maparray "show tablets from ${testTable};"
assertEquals(1, tablets.size(), "Expected to find exactly one tablet.")
def tablet = tablets[0]
def beHost = backendId_to_backendIP[tablet.BackendId]
def beHttpPort = backendId_to_backendHttpPort[tablet.BackendId]

def tabletStatus = getTabletStatus(tablet)
List<RowsetInfo> originalRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
def parts = rowsetStr.split(" ")
def versionParts = parts[0].replace('[', '').replace(']', '').split("-")
new RowsetInfo(
startVersion: versionParts[0].toInteger(),
endVersion: versionParts[1].toInteger(),
id: parts[4],
originalString: rowsetStr
)
}.findAll { it.startVersion != 0 }.sort { it.startVersion }

int numToClear = Math.round(originalRowsetInfos.size() * (1 - inputCacheRatio)).toInteger()
logger.info("Total data rowsets: ${originalRowsetInfos.size()}. Clearing cache for ${numToClear} rowsets to achieve ~${inputCacheRatio * 100}% hit ratio.")

originalRowsetInfos.take(numToClear).each { rowset ->
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=clear&sync=true&value=${rowset.id}_0.dat", true)
}

def cachedInputRowsets = originalRowsetInfos.findAll { rowset ->
def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true)
data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") }
}

def actualCachedRatio = cachedInputRowsets.size() / (double)originalRowsetInfos.size()
logger.info("Verification: Cached input rowsets: ${cachedInputRowsets.size()}. Actual cache ratio: ${actualCachedRatio * 100}%")
assertTrue(Math.abs(inputCacheRatio - actualCachedRatio) < 0.01, "Actual cache ratio does not match expected ratio.")

logger.info("Triggering ALTER TABLE on ${testTable}")
sql """ALTER TABLE ${testTable} MODIFY COLUMN col2 VARCHAR(255)"""
waitForAlterJobToFinish(testTable, 60000)

tablets = sql_return_maparray "show tablets from ${testTable};"
tablet = tablets[0]
tabletStatus = getTabletStatus(tablet)

def newRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
def parts = rowsetStr.split(" ")
def version_pair = parts[0].replace('[', '').replace(']', '').split('-')
new RowsetInfo(
startVersion: version_pair[0].toInteger(),
endVersion: version_pair[1].toInteger(),
id: parts[4],
originalString: rowsetStr
)
}.findAll { it.startVersion != 0 }.sort { it.startVersion }

def cachedOutputRowsets = newRowsetInfos.findAll { rowset ->
def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true)
data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") }
}

logger.info("After ALTER, found ${cachedOutputRowsets.size()} cached output rowsets out of ${newRowsetInfos.size()}.")

if (expectOutputCached) {
assertTrue(cachedOutputRowsets.size() > 0, "Expected output rowsets to be cached, but none were found.")
} else {
assertEquals(0, cachedOutputRowsets.size(), "Expected output rowsets NOT to be cached, but some were found.")
}
logger.info("Test Case Passed: Input Ratio ${inputCacheRatio * 100}%, Output Cached Check: ${expectOutputCached}")

sql """ DROP TABLE IF EXISTS ${testTable} force;"""
}

docker(options) {
getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort);

sql """ set global enable_auto_analyze = false;"""

generateCsvData()

runSchemaChangeCacheTest("${baseTestTable}_0", 0.0, false)
runSchemaChangeCacheTest("${baseTestTable}_65", 0.65, false)
runSchemaChangeCacheTest("${baseTestTable}_75", 0.75, true)
runSchemaChangeCacheTest("${baseTestTable}_100", 1.0, true)
}
}
Loading