From d83fbe3c3be9ace3b90f932fa29a77caea9f2385 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 11 Apr 2024 23:24:23 +0800 Subject: [PATCH] Storages: Refine memory tracker of data sharing (#8857) (#8934) close pingcap/tiflash#8856 --- dbms/src/Interpreters/Settings.h | 4 +- .../File/DMFileBlockInputStream.cpp | 16 +---- .../DeltaMerge/File/DMFileBlockInputStream.h | 2 - .../Storages/DeltaMerge/File/DMFileReader.cpp | 19 ++++-- .../Storages/DeltaMerge/File/DMFileReader.h | 3 +- .../ReadThread/ColumnSharingCache.h | 17 +----- .../tests/gtest_column_sharing_cache.cpp | 60 ++----------------- 7 files changed, 29 insertions(+), 92 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 5550fca69e9..1d6ab38489d 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -216,8 +216,8 @@ struct Settings \ M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \ M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \ - M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \ - M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \ + M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests, include those sharing blocks in block queue. 0 means disable data sharing") \ + M(SettingUInt64, dt_max_sharing_column_count, 5, "Deprecated") \ M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \ M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \ diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index c004017694c..88340237a26 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -16,8 +16,6 @@ #include #include -#include - namespace DB::DM { DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context) @@ -59,15 +57,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0) { // Disable data sharing. - max_sharing_column_count = 0; - } - else if ( - shared_column_data_mem_tracker != nullptr - && std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes_for_all)) - { - // The memory used reaches the limitation by running queries, disable the data sharing for this DMFile - max_sharing_column_count = 0; - GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); + max_sharing_column_bytes_for_all = 0; } DMFileReader reader( @@ -89,9 +79,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & rows_threshold_per_read, read_one_pack_every_time, tracing_id, - max_sharing_column_count, + max_sharing_column_bytes_for_all, scan_context); - return std::make_shared(std::move(reader), max_sharing_column_count > 0); + return std::make_shared(std::move(reader), max_sharing_column_bytes_for_all > 0); } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index 351b68f3cd9..9b2775b5f4b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -162,7 +162,6 @@ class DMFileBlockInputStreamBuilder aio_threshold = settings.min_bytes_to_use_direct_io; max_read_buffer_size = settings.max_read_buffer_size; max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all; - max_sharing_column_count = settings.dt_max_sharing_column_count; return *this; } DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_) @@ -196,7 +195,6 @@ class DMFileBlockInputStreamBuilder size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; bool read_one_pack_every_time = false; size_t max_sharing_column_bytes_for_all = 0; - size_t max_sharing_column_count = 0; String tracing_id; }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 703b1b4de87..e9837e29128 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -33,7 +33,6 @@ #include #include - namespace CurrentMetrics { extern const Metric OpenFileForRead; @@ -196,7 +195,7 @@ DMFileReader::DMFileReader( size_t rows_threshold_per_read_, bool read_one_pack_every_time_, const String & tracing_id_, - size_t max_sharing_column_count, + size_t max_sharing_column_bytes_, const ScanContextPtr & scan_context_) : dmfile(dmfile_) , read_columns(read_columns_) @@ -213,6 +212,7 @@ DMFileReader::DMFileReader( , column_cache(column_cache_) , scan_context(scan_context_) , rows_threshold_per_read(rows_threshold_per_read_) + , max_sharing_column_bytes(max_sharing_column_bytes_) , file_provider(file_provider_) , log(Logger::get(tracing_id_)) { @@ -238,9 +238,9 @@ DMFileReader::DMFileReader( const auto data_type = dmfile->getColumnStat(cd.id).type; data_type->enumerateStreams(callback, {}); } - if (max_sharing_column_count > 0) + if (max_sharing_column_bytes > 0) { - col_data_cache = std::make_unique(path(), read_columns, max_sharing_column_count, log); + col_data_cache = std::make_unique(path(), read_columns, log); for (const auto & cd : read_columns) { last_read_from_cache[cd.id] = false; @@ -702,6 +702,13 @@ void DMFileReader::readColumn(ColumnDefine & column_define, size_t skip_packs) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); + bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr + && shared_column_data_mem_tracker->get() >= static_cast(max_sharing_column_bytes); + if (reach_sharing_column_memory_limit) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); + } + bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit; if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) { // If there are concurrent read requests, this data is likely to be shared. @@ -709,7 +716,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define, // This can lead to inaccurate memory statistics of MemoryTracker. // To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap. auto mem_tracker_guard - = has_concurrent_reader ? std::make_optional(true, nullptr) : std::nullopt; + = enable_sharing_column ? std::make_optional(true, nullptr) : std::nullopt; auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]); @@ -721,7 +728,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define, last_read_from_cache[column_define.id] = true; } - if (has_concurrent_reader && col_data_cache != nullptr) + if (enable_sharing_column && col_data_cache != nullptr) { DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 2a02c46445e..ceeb8c36610 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -94,7 +94,7 @@ class DMFileReader size_t rows_threshold_per_read_, bool read_one_pack_every_time_, const String & tracing_id_, - size_t max_sharing_column_count, + size_t max_sharing_column_bytes_, const ScanContextPtr & scan_context_); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -178,6 +178,7 @@ class DMFileReader const ScanContextPtr scan_context; const size_t rows_threshold_per_read; + const size_t max_sharing_column_bytes; size_t next_pack_id = 0; size_t next_row_offset = 0; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index e61ec5a38b0..e09a25a5876 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -46,14 +46,9 @@ class ColumnSharingCache ColumnPtr col_data; }; - void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data, size_t max_sharing_column_count) + void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data) { std::lock_guard lock(mtx); - if (packs.size() >= max_sharing_column_count) - { - GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_reach_count_limit).Increment(); - return; - } GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment(); auto & value = packs[start_pack_id]; if (value.pack_count < pack_count) @@ -126,14 +121,9 @@ class ColumnSharingCache class ColumnSharingCacheMap { public: - ColumnSharingCacheMap( - const String & dmfile_name_, - const ColumnDefines & cds, - size_t max_sharing_column_count_, - LoggerPtr & log_) + ColumnSharingCacheMap(const String & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_) : dmfile_name(dmfile_name_) , stats(static_cast(ColumnCacheStatus::_TOTAL_COUNT)) - , max_sharing_column_count(max_sharing_column_count_) , log(log_) { for (const auto & cd : cds) @@ -162,7 +152,7 @@ class ColumnSharingCacheMap { return; } - itr->second.add(start_pack_id, pack_count, col_data, max_sharing_column_count); + itr->second.add(start_pack_id, pack_count, col_data); } bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type) @@ -217,7 +207,6 @@ class ColumnSharingCacheMap std::string dmfile_name; std::unordered_map cols; std::vector> stats; - size_t max_sharing_column_count; LoggerPtr log; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp index f12442ba76e..ae1d5fbd533 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp @@ -50,7 +50,7 @@ TEST(ColumnSharingCacheTest, AddAndGet) ColumnSharingCache cache; auto col = createColumn(8); - cache.add(1, 8, col, std::numeric_limits::max()); + cache.add(1, 8, col); ColumnPtr col1; auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE); @@ -75,7 +75,7 @@ TEST(ColumnSharingCacheTest, AddAndGet) ASSERT_EQ(col4, nullptr); auto col5 = createColumn(7); - cache.add(1, 7, col5, std::numeric_limits::max()); + cache.add(1, 7, col5); ColumnPtr col6; st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE); ASSERT_EQ(st, ColumnCacheStatus::GET_HIT); @@ -83,7 +83,7 @@ TEST(ColumnSharingCacheTest, AddAndGet) compareColumn(col6, col, col6->size()); auto col7 = createColumn(9); - cache.add(1, 9, col7, std::numeric_limits::max()); + cache.add(1, 9, col7); ColumnPtr col8; st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE); ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); @@ -96,13 +96,13 @@ TEST(ColumnSharingCacheTest, Del) ColumnSharingCache cache; auto col1 = createColumn(8); - cache.add(1, 8, col1, std::numeric_limits::max()); + cache.add(1, 8, col1); auto col2 = createColumn(8); - cache.add(9, 8, col2, std::numeric_limits::max()); + cache.add(9, 8, col2); auto col3 = createColumn(8); - cache.add(17, 8, col3, std::numeric_limits::max()); + cache.add(17, 8, col3); cache.del(10); @@ -119,52 +119,4 @@ TEST(ColumnSharingCacheTest, Del) compareColumn(col5, col2, col5->size()); } -TEST(ColumnSharingCacheTest, AddAndGetWithLimitation) -{ - ColumnSharingCache cache; - - auto col = createColumn(8); - // Limit to 0, add should fail. - cache.add(1, 8, col, 0); - ColumnPtr col1; - auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_MISS); - ASSERT_EQ(col1, nullptr); - - // Limit to 1, add should succ. - cache.add(1, 8, col, 1); - st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_HIT); - ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS); - compareColumn(col1, col, col1->size()); - ColumnPtr col2; - st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); - ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS); - compareColumn(col2, col, col2->size()); - ColumnPtr col3; - st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_PART); - ASSERT_EQ(col3, nullptr); - ColumnPtr col4; - st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_MISS); - ASSERT_EQ(col4, nullptr); - - auto col7 = createColumn(9); - // Limit to 1, add should fail. - cache.add(1, 9, col7, 1); - ColumnPtr col8; - st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_PART); - ASSERT_EQ(col8, nullptr); - - // Limit to 2, add should succ. - cache.add(1, 9, col7, 2); - st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE); - ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); - ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS); - compareColumn(col8, col7, col8->size()); -} - } // namespace DB::DM::tests \ No newline at end of file