Skip to content

Commit

Permalink
Storages: Refine memory tracker of data sharing (#8857) (#8934)
Browse files Browse the repository at this point in the history
close #8856
  • Loading branch information
ti-chi-bot committed Apr 11, 2024
1 parent e73e61e commit d83fbe3
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 92 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ struct Settings
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests, include those sharing blocks in block queue. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "Deprecated") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
Expand Down
16 changes: 3 additions & 13 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <utility>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
Expand Down Expand Up @@ -59,15 +57,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
{
// Disable data sharing.
max_sharing_column_count = 0;
}
else if (
shared_column_data_mem_tracker != nullptr
&& std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes_for_all))
{
// The memory used reaches the limitation by running queries, disable the data sharing for this DMFile
max_sharing_column_count = 0;
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
max_sharing_column_bytes_for_all = 0;
}

DMFileReader reader(
Expand All @@ -89,9 +79,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
rows_threshold_per_read,
read_one_pack_every_time,
tracing_id,
max_sharing_column_count,
max_sharing_column_bytes_for_all,
scan_context);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_bytes_for_all > 0);
}
} // namespace DB::DM
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class DMFileBlockInputStreamBuilder
aio_threshold = settings.min_bytes_to_use_direct_io;
max_read_buffer_size = settings.max_read_buffer_size;
max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all;
max_sharing_column_count = settings.dt_max_sharing_column_count;
return *this;
}
DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_)
Expand Down Expand Up @@ -196,7 +195,6 @@ class DMFileBlockInputStreamBuilder
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
};

Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <Storages/S3/S3RandomAccessFile.h>
#include <fmt/format.h>


namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -196,7 +195,7 @@ DMFileReader::DMFileReader(
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
size_t max_sharing_column_bytes_,
const ScanContextPtr & scan_context_)
: dmfile(dmfile_)
, read_columns(read_columns_)
Expand All @@ -213,6 +212,7 @@ DMFileReader::DMFileReader(
, column_cache(column_cache_)
, scan_context(scan_context_)
, rows_threshold_per_read(rows_threshold_per_read_)
, max_sharing_column_bytes(max_sharing_column_bytes_)
, file_provider(file_provider_)
, log(Logger::get(tracing_id_))
{
Expand All @@ -238,9 +238,9 @@ DMFileReader::DMFileReader(
const auto data_type = dmfile->getColumnStat(cd.id).type;
data_type->enumerateStreams(callback, {});
}
if (max_sharing_column_count > 0)
if (max_sharing_column_bytes > 0)
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, max_sharing_column_count, log);
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, log);
for (const auto & cd : read_columns)
{
last_read_from_cache[cd.id] = false;
Expand Down Expand Up @@ -702,14 +702,21 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
size_t skip_packs)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr
&& shared_column_data_mem_tracker->get() >= static_cast<Int64>(max_sharing_column_bytes);
if (reach_sharing_column_memory_limit)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
}
bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit;
if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column))
{
// If there are concurrent read requests, this data is likely to be shared.
// So the allocation and deallocation of this data may not be in the same MemoryTracker.
// This can lead to inaccurate memory statistics of MemoryTracker.
// To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap.
auto mem_tracker_guard
= has_concurrent_reader ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
= enable_sharing_column ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
auto data_type = dmfile->getColumnStat(column_define.id).type;
auto col = data_type->createColumn();
readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]);
Expand All @@ -721,7 +728,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
last_read_from_cache[column_define.id] = true;
}

if (has_concurrent_reader && col_data_cache != nullptr)
if (enable_sharing_column && col_data_cache != nullptr)
{
DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DMFileReader
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
size_t max_sharing_column_bytes_,
const ScanContextPtr & scan_context_);

Block getHeader() const { return toEmptyBlock(read_columns); }
Expand Down Expand Up @@ -178,6 +178,7 @@ class DMFileReader
const ScanContextPtr scan_context;

const size_t rows_threshold_per_read;
const size_t max_sharing_column_bytes;

size_t next_pack_id = 0;
size_t next_row_offset = 0;
Expand Down
17 changes: 3 additions & 14 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,9 @@ class ColumnSharingCache
ColumnPtr col_data;
};

void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data, size_t max_sharing_column_count)
void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data)
{
std::lock_guard lock(mtx);
if (packs.size() >= max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_reach_count_limit).Increment();
return;
}
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
auto & value = packs[start_pack_id];
if (value.pack_count < pack_count)
Expand Down Expand Up @@ -126,14 +121,9 @@ class ColumnSharingCache
class ColumnSharingCacheMap
{
public:
ColumnSharingCacheMap(
const String & dmfile_name_,
const ColumnDefines & cds,
size_t max_sharing_column_count_,
LoggerPtr & log_)
ColumnSharingCacheMap(const String & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_)
: dmfile_name(dmfile_name_)
, stats(static_cast<int>(ColumnCacheStatus::_TOTAL_COUNT))
, max_sharing_column_count(max_sharing_column_count_)
, log(log_)
{
for (const auto & cd : cds)
Expand Down Expand Up @@ -162,7 +152,7 @@ class ColumnSharingCacheMap
{
return;
}
itr->second.add(start_pack_id, pack_count, col_data, max_sharing_column_count);
itr->second.add(start_pack_id, pack_count, col_data);
}

bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type)
Expand Down Expand Up @@ -217,7 +207,6 @@ class ColumnSharingCacheMap
std::string dmfile_name;
std::unordered_map<int64_t, ColumnSharingCache> cols;
std::vector<std::atomic<int64_t>> stats;
size_t max_sharing_column_count;
LoggerPtr log;
};

Expand Down
60 changes: 6 additions & 54 deletions dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ColumnSharingCache cache;

auto col = createColumn(8);
cache.add(1, 8, col, std::numeric_limits<UInt64>::max());
cache.add(1, 8, col);

ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
Expand All @@ -75,15 +75,15 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ASSERT_EQ(col4, nullptr);

auto col5 = createColumn(7);
cache.add(1, 7, col5, std::numeric_limits<UInt64>::max());
cache.add(1, 7, col5);
ColumnPtr col6;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col6->size(), 8 * TEST_PACK_ROWS);
compareColumn(col6, col, col6->size());

auto col7 = createColumn(9);
cache.add(1, 9, col7, std::numeric_limits<UInt64>::max());
cache.add(1, 9, col7);
ColumnPtr col8;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
Expand All @@ -96,13 +96,13 @@ TEST(ColumnSharingCacheTest, Del)
ColumnSharingCache cache;

auto col1 = createColumn(8);
cache.add(1, 8, col1, std::numeric_limits<UInt64>::max());
cache.add(1, 8, col1);

auto col2 = createColumn(8);
cache.add(9, 8, col2, std::numeric_limits<UInt64>::max());
cache.add(9, 8, col2);

auto col3 = createColumn(8);
cache.add(17, 8, col3, std::numeric_limits<UInt64>::max());
cache.add(17, 8, col3);

cache.del(10);

Expand All @@ -119,52 +119,4 @@ TEST(ColumnSharingCacheTest, Del)
compareColumn(col5, col2, col5->size());
}

TEST(ColumnSharingCacheTest, AddAndGetWithLimitation)
{
ColumnSharingCache cache;

auto col = createColumn(8);
// Limit to 0, add should fail.
cache.add(1, 8, col, 0);
ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col1, nullptr);

// Limit to 1, add should succ.
cache.add(1, 8, col, 1);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS);
compareColumn(col1, col, col1->size());
ColumnPtr col2;
st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS);
compareColumn(col2, col, col2->size());
ColumnPtr col3;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col3, nullptr);
ColumnPtr col4;
st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col4, nullptr);

auto col7 = createColumn(9);
// Limit to 1, add should fail.
cache.add(1, 9, col7, 1);
ColumnPtr col8;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col8, nullptr);

// Limit to 2, add should succ.
cache.add(1, 9, col7, 2);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS);
compareColumn(col8, col7, col8->size());
}

} // namespace DB::DM::tests

0 comments on commit d83fbe3

Please sign in to comment.