diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 102129bdd89656..a80a1025f15961 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -375,7 +375,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { if (_input_rowsets.size() == 1) { DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version()); // MUST NOT move input rowset to stale path - cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock); + cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true); } else { cloud_tablet()->delete_rowsets(_input_rowsets, wrlock); cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 583cc9a905b614..c2e17b91e08443 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -32,6 +32,7 @@ #include "olap/delete_handler.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/storage_engine.h" @@ -197,6 +198,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque SchemaChangeParams sc_params; + // cache schema change output to file cache + std::vector rowsets; + rowsets.resize(rs_splits.size()); + std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(), + [](RowSetSplits& split) { return split.rs_reader->rowset(); }); + sc_params.output_to_file_cache = _should_cache_sc_output(rowsets); + RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); sc_params.ref_rowset_readers.reserve(rs_splits.size()); for (RowSetSplits& split : rs_splits) { @@ -289,6 +297,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam context.tablet_schema = _new_tablet->tablet_schema(); context.newest_write_timestamp = rs_reader->newest_write_timestamp(); context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id); + context.write_file_cache = sc_params.output_to_file_cache; + context.tablet = _new_tablet; if (!context.storage_resource) { return Status::InternalError("vault id not found, maybe not sync, vault id {}", sc_params.vault_id); @@ -440,7 +450,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread std::unique_lock lock {_new_tablet->get_sync_meta_lock()}; std::unique_lock wlock(_new_tablet->get_header_lock()); - _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock); + _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false); _new_tablet->set_cumulative_layer_point(_output_cumulative_point); _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), stats.num_rows(), stats.data_size()); @@ -473,7 +483,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, std::make_shared(_cloud_storage_engine, tmp_meta); { std::unique_lock wlock(tmp_tablet->get_header_lock()); - tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version tmp_tablet->set_alter_version(alter_version); } @@ -491,7 +501,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, DBUG_BLOCK); { std::unique_lock wlock(tmp_tablet->get_header_lock()); - tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); } for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); @@ -514,7 +524,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, {max_version + 1, new_max_version}, CaptureRowsetOps {})); { std::unique_lock wlock(tmp_tablet->get_header_lock()); - tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); } for (auto rowset : ret.rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); @@ -565,4 +575,34 @@ void CloudSchemaChangeJob::clean_up_on_failure() { } } +bool CloudSchemaChangeJob::_should_cache_sc_output( + const std::vector& input_rowsets) { + int64_t total_size = 0; + int64_t cached_index_size = 0; + int64_t cached_data_size = 0; + + for (const auto& rs : input_rowsets) { + const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta(); + total_size += rs_meta->total_disk_size(); + cached_index_size += rs->approximate_cache_index_size(); + cached_data_size += rs->approximate_cached_data_size(); + } + + double input_hit_rate = static_cast(cached_index_size + cached_data_size) / total_size; + + LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. " + << "job_id=" << _job_id << ", input_rowsets_count=" << input_rowsets.size() + << ", total_size=" << total_size << ", cached_index_size=" << cached_index_size + << ", cached_data_size=" << cached_data_size << ", input_hit_rate=" << input_hit_rate + << ", min_hit_ratio_threshold=" + << config::file_cache_keep_schema_change_output_min_hit_ratio << ", should_cache=" + << (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio); + + if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) { + return true; + } + + return false; +} + } // namespace doris diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h index 2f49f3eab6dac7..9a2a34f074519a 100644 --- a/be/src/cloud/cloud_schema_change_job.h +++ b/be/src/cloud/cloud_schema_change_job.h @@ -39,13 +39,14 @@ class CloudSchemaChangeJob { void clean_up_on_failure(); private: + bool _should_cache_sc_output(const std::vector& input_rowsets); + Status _convert_historical_rowsets(const SchemaChangeParams& sc_params, cloud::TabletJobInfoPB& job); Status _process_delete_bitmap(int64_t alter_version, int64_t start_calc_delete_bitmap_version, int64_t initiator); -private: CloudStorageEngine& _cloud_storage_engine; std::shared_ptr _base_tablet; std::shared_ptr _new_tablet; diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 1209b010a9ff8e..acf1408004d1fb 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -413,7 +413,7 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ auto add_rowsets_directly = [=, this](std::vector& rowsets) { for (auto& rs : rowsets) { - if (version_overlap || warmup_delta_data) { + if (warmup_delta_data) { #ifndef BE_TEST bool warm_up_state_updated = false; // Warmup rowset data in background diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 571b1879c80c2d..f9623ab1ac8b0c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1142,6 +1142,10 @@ DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false"); DEFINE_mBool(enable_file_cache_adaptive_write, "true"); DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7"); +// if difference below this threshold, we consider cache's progressive upgrading (2.0->3.0) successful +DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3"); +DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7"); + DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000"); DEFINE_mInt64(file_cache_background_gc_interval_ms, "100"); DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index b2e41b9f977d80..e2cf49e726609c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1175,6 +1175,8 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us); DECLARE_mBool(enable_file_cache_keep_base_compaction_output); DECLARE_mBool(enable_file_cache_adaptive_write); DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio); +DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold); +DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio); DECLARE_mInt64(file_cache_remove_block_qps_limit); DECLARE_mInt64(file_cache_background_gc_interval_ms); DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index c29cb49a7aaece..781d4d856f797e 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -277,6 +277,7 @@ struct SchemaChangeParams { ObjectPool pool; int32_t be_exec_version; std::string vault_id; + bool output_to_file_cache; }; class SchemaChangeJob { diff --git a/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy new file mode 100644 index 00000000000000..87c33562dc6f6b --- /dev/null +++ b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.OutputUtils + +@groovy.transform.Immutable +class RowsetInfo { + int startVersion + int endVersion + String id + String originalString +} + +suite("test_filecache_with_alter_table", "docker") { + def options = new ClusterOptions() + options.cloudMode = true + options.setFeNum(1) + options.setBeNum(1) + + options.beConfigs.add('enable_flush_file_cache_async=false') + options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99') + options.beConfigs.add('enable_evict_file_cache_in_advance=false') + options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]') + + def baseTestTable = "test_filecache_with_alter_table" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_backendBrpcPort = [:] + def csvPathPrefix = "/tmp/temp_csv_data" + def loadBatchNum = 20 + + def generateCsvData = { + def rowsPerFile = 32768 + def columnsPerRow = 4 + def headers = 'col1,col2,col3,col4' + + def dir = new File(csvPathPrefix) + if (!dir.exists()) { + dir.mkdirs() + } else { + dir.eachFile { it.delete() } + } + + long currentNumber = 1L + (1..loadBatchNum).each { fileIndex -> + def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex) + def csvFile = new File(fileName) + + csvFile.withWriter('UTF-8') { writer -> + writer.writeLine(headers) + (1..rowsPerFile).each { rowIndex -> + def row = (1..columnsPerRow).collect { currentNumber++ } + writer.writeLine(row.join(',')) + } + } + } + logger.info("Successfully generated ${loadBatchNum} CSV files in ${csvPathPrefix}") + } + + def getTabletStatus = { tablet -> + String tabletId = tablet.TabletId + String backendId = tablet.BackendId + def beHost = backendId_to_backendIP[backendId] + def beHttpPort = backendId_to_backendHttpPort[backendId] + + String command = "curl -s -X GET http://${beHost}:${beHttpPort}/api/compaction/show?tablet_id=${tabletId}" + + logger.info("Executing: ${command}") + def process = command.execute() + def exitCode = process.waitFor() + def output = process.getText() + + logger.info("Get tablet status response: code=${exitCode}, out=${output}") + assertEquals(0, exitCode, "Failed to get tablet status.") + + return parseJson(output.trim()) + } + + def waitForAlterJobToFinish = { tableName, timeoutMillis -> + def pollInterval = 1000 + def timeElapsed = 0 + while (timeElapsed <= timeoutMillis) { + def alterResult = sql_return_maparray """SHOW ALTER TABLE COLUMN WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;""" + logger.info("Checking ALTER status for table '${tableName}': ${alterResult}") + if (alterResult && alterResult[0].State == "FINISHED") { + sleep(3000) + logger.info("ALTER job on table '${tableName}' finished. Details: ${alterResult[0]}") + return + } + sleep(pollInterval) + timeElapsed += pollInterval + } + fail("Wait for ALTER job on table '${tableName}' to finish timed out after ${timeoutMillis}ms.") + } + + def runSchemaChangeCacheTest = { String testTable, double inputCacheRatio, boolean expectOutputCached -> + logger.info("==================================================================================") + logger.info("Running Test Case on Table '${testTable}': Input Cache Ratio = ${inputCacheRatio * 100}%, Expect Output Cached = ${expectOutputCached}") + logger.info("==================================================================================") + + sql """ DROP TABLE IF EXISTS ${testTable} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + col1 bigint, + col2 bigint, + col3 bigint, + col4 bigint + ) + UNIQUE KEY(col1) + DISTRIBUTED BY HASH(col1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + + (1..loadBatchNum).each { fileIndex -> + def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex) + streamLoad { + logger.info("Stream loading file index ${fileIndex} into table ${testTable}") + set "column_separator", "," + table testTable + file fileName + time 3000 + check { res, exception, startTime, endTime -> + if (exception != null) throw exception + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + } + } + } + sql """ SELECT COUNT(col1) from ${testTable} """ + + def tablets = sql_return_maparray "show tablets from ${testTable};" + assertEquals(1, tablets.size(), "Expected to find exactly one tablet.") + def tablet = tablets[0] + def beHost = backendId_to_backendIP[tablet.BackendId] + def beHttpPort = backendId_to_backendHttpPort[tablet.BackendId] + + def tabletStatus = getTabletStatus(tablet) + List originalRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr -> + def parts = rowsetStr.split(" ") + def versionParts = parts[0].replace('[', '').replace(']', '').split("-") + new RowsetInfo( + startVersion: versionParts[0].toInteger(), + endVersion: versionParts[1].toInteger(), + id: parts[4], + originalString: rowsetStr + ) + }.findAll { it.startVersion != 0 }.sort { it.startVersion } + + int numToClear = Math.round(originalRowsetInfos.size() * (1 - inputCacheRatio)).toInteger() + logger.info("Total data rowsets: ${originalRowsetInfos.size()}. Clearing cache for ${numToClear} rowsets to achieve ~${inputCacheRatio * 100}% hit ratio.") + + originalRowsetInfos.take(numToClear).each { rowset -> + Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=clear&sync=true&value=${rowset.id}_0.dat", true) + } + + def cachedInputRowsets = originalRowsetInfos.findAll { rowset -> + def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true) + data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") } + } + + def actualCachedRatio = cachedInputRowsets.size() / (double)originalRowsetInfos.size() + logger.info("Verification: Cached input rowsets: ${cachedInputRowsets.size()}. Actual cache ratio: ${actualCachedRatio * 100}%") + assertTrue(Math.abs(inputCacheRatio - actualCachedRatio) < 0.01, "Actual cache ratio does not match expected ratio.") + + logger.info("Triggering ALTER TABLE on ${testTable}") + sql """ALTER TABLE ${testTable} MODIFY COLUMN col2 VARCHAR(255)""" + waitForAlterJobToFinish(testTable, 60000) + + tablets = sql_return_maparray "show tablets from ${testTable};" + tablet = tablets[0] + tabletStatus = getTabletStatus(tablet) + + def newRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr -> + def parts = rowsetStr.split(" ") + def version_pair = parts[0].replace('[', '').replace(']', '').split('-') + new RowsetInfo( + startVersion: version_pair[0].toInteger(), + endVersion: version_pair[1].toInteger(), + id: parts[4], + originalString: rowsetStr + ) + }.findAll { it.startVersion != 0 }.sort { it.startVersion } + + def cachedOutputRowsets = newRowsetInfos.findAll { rowset -> + def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true) + data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") } + } + + logger.info("After ALTER, found ${cachedOutputRowsets.size()} cached output rowsets out of ${newRowsetInfos.size()}.") + + if (expectOutputCached) { + assertTrue(cachedOutputRowsets.size() > 0, "Expected output rowsets to be cached, but none were found.") + } else { + assertEquals(0, cachedOutputRowsets.size(), "Expected output rowsets NOT to be cached, but some were found.") + } + logger.info("Test Case Passed: Input Ratio ${inputCacheRatio * 100}%, Output Cached Check: ${expectOutputCached}") + + sql """ DROP TABLE IF EXISTS ${testTable} force;""" + } + + docker(options) { + getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort); + + sql """ set global enable_auto_analyze = false;""" + + generateCsvData() + + runSchemaChangeCacheTest("${baseTestTable}_0", 0.0, false) + runSchemaChangeCacheTest("${baseTestTable}_65", 0.65, false) + runSchemaChangeCacheTest("${baseTestTable}_75", 0.75, true) + runSchemaChangeCacheTest("${baseTestTable}_100", 1.0, true) + } +} \ No newline at end of file