Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Refine memory tracker of data sharing (#8857) #8934

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ struct Settings
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests, include those sharing blocks in block queue. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "Deprecated") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
Expand Down
16 changes: 3 additions & 13 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <utility>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
Expand Down Expand Up @@ -59,15 +57,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
{
// Disable data sharing.
max_sharing_column_count = 0;
}
else if (
shared_column_data_mem_tracker != nullptr
&& std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes_for_all))
{
// The memory used reaches the limitation by running queries, disable the data sharing for this DMFile
max_sharing_column_count = 0;
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
max_sharing_column_bytes_for_all = 0;
}

DMFileReader reader(
Expand All @@ -89,9 +79,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
rows_threshold_per_read,
read_one_pack_every_time,
tracing_id,
max_sharing_column_count,
max_sharing_column_bytes_for_all,
scan_context);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_bytes_for_all > 0);
}
} // namespace DB::DM
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class DMFileBlockInputStreamBuilder
aio_threshold = settings.min_bytes_to_use_direct_io;
max_read_buffer_size = settings.max_read_buffer_size;
max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all;
max_sharing_column_count = settings.dt_max_sharing_column_count;
return *this;
}
DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_)
Expand Down Expand Up @@ -196,7 +195,6 @@ class DMFileBlockInputStreamBuilder
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
};

Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <Storages/S3/S3RandomAccessFile.h>
#include <fmt/format.h>


namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -196,7 +195,7 @@ DMFileReader::DMFileReader(
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
size_t max_sharing_column_bytes_,
const ScanContextPtr & scan_context_)
: dmfile(dmfile_)
, read_columns(read_columns_)
Expand All @@ -213,6 +212,7 @@ DMFileReader::DMFileReader(
, column_cache(column_cache_)
, scan_context(scan_context_)
, rows_threshold_per_read(rows_threshold_per_read_)
, max_sharing_column_bytes(max_sharing_column_bytes_)
, file_provider(file_provider_)
, log(Logger::get(tracing_id_))
{
Expand All @@ -238,9 +238,9 @@ DMFileReader::DMFileReader(
const auto data_type = dmfile->getColumnStat(cd.id).type;
data_type->enumerateStreams(callback, {});
}
if (max_sharing_column_count > 0)
if (max_sharing_column_bytes > 0)
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, max_sharing_column_count, log);
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, log);
for (const auto & cd : read_columns)
{
last_read_from_cache[cd.id] = false;
Expand Down Expand Up @@ -702,14 +702,21 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
size_t skip_packs)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr
&& shared_column_data_mem_tracker->get() >= static_cast<Int64>(max_sharing_column_bytes);
if (reach_sharing_column_memory_limit)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
}
bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit;
if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column))
{
// If there are concurrent read requests, this data is likely to be shared.
// So the allocation and deallocation of this data may not be in the same MemoryTracker.
// This can lead to inaccurate memory statistics of MemoryTracker.
// To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap.
auto mem_tracker_guard
= has_concurrent_reader ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
= enable_sharing_column ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
auto data_type = dmfile->getColumnStat(column_define.id).type;
auto col = data_type->createColumn();
readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]);
Expand All @@ -721,7 +728,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
last_read_from_cache[column_define.id] = true;
}

if (has_concurrent_reader && col_data_cache != nullptr)
if (enable_sharing_column && col_data_cache != nullptr)
{
DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DMFileReader
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
size_t max_sharing_column_bytes_,
const ScanContextPtr & scan_context_);

Block getHeader() const { return toEmptyBlock(read_columns); }
Expand Down Expand Up @@ -178,6 +178,7 @@ class DMFileReader
const ScanContextPtr scan_context;

const size_t rows_threshold_per_read;
const size_t max_sharing_column_bytes;

size_t next_pack_id = 0;
size_t next_row_offset = 0;
Expand Down
17 changes: 3 additions & 14 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,9 @@ class ColumnSharingCache
ColumnPtr col_data;
};

void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data, size_t max_sharing_column_count)
void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data)
{
std::lock_guard lock(mtx);
if (packs.size() >= max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_reach_count_limit).Increment();
return;
}
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
auto & value = packs[start_pack_id];
if (value.pack_count < pack_count)
Expand Down Expand Up @@ -126,14 +121,9 @@ class ColumnSharingCache
class ColumnSharingCacheMap
{
public:
ColumnSharingCacheMap(
const String & dmfile_name_,
const ColumnDefines & cds,
size_t max_sharing_column_count_,
LoggerPtr & log_)
ColumnSharingCacheMap(const String & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_)
: dmfile_name(dmfile_name_)
, stats(static_cast<int>(ColumnCacheStatus::_TOTAL_COUNT))
, max_sharing_column_count(max_sharing_column_count_)
, log(log_)
{
for (const auto & cd : cds)
Expand Down Expand Up @@ -162,7 +152,7 @@ class ColumnSharingCacheMap
{
return;
}
itr->second.add(start_pack_id, pack_count, col_data, max_sharing_column_count);
itr->second.add(start_pack_id, pack_count, col_data);
}

bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type)
Expand Down Expand Up @@ -217,7 +207,6 @@ class ColumnSharingCacheMap
std::string dmfile_name;
std::unordered_map<int64_t, ColumnSharingCache> cols;
std::vector<std::atomic<int64_t>> stats;
size_t max_sharing_column_count;
LoggerPtr log;
};

Expand Down
60 changes: 6 additions & 54 deletions dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ColumnSharingCache cache;

auto col = createColumn(8);
cache.add(1, 8, col, std::numeric_limits<UInt64>::max());
cache.add(1, 8, col);

ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
Expand All @@ -75,15 +75,15 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ASSERT_EQ(col4, nullptr);

auto col5 = createColumn(7);
cache.add(1, 7, col5, std::numeric_limits<UInt64>::max());
cache.add(1, 7, col5);
ColumnPtr col6;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col6->size(), 8 * TEST_PACK_ROWS);
compareColumn(col6, col, col6->size());

auto col7 = createColumn(9);
cache.add(1, 9, col7, std::numeric_limits<UInt64>::max());
cache.add(1, 9, col7);
ColumnPtr col8;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
Expand All @@ -96,13 +96,13 @@ TEST(ColumnSharingCacheTest, Del)
ColumnSharingCache cache;

auto col1 = createColumn(8);
cache.add(1, 8, col1, std::numeric_limits<UInt64>::max());
cache.add(1, 8, col1);

auto col2 = createColumn(8);
cache.add(9, 8, col2, std::numeric_limits<UInt64>::max());
cache.add(9, 8, col2);

auto col3 = createColumn(8);
cache.add(17, 8, col3, std::numeric_limits<UInt64>::max());
cache.add(17, 8, col3);

cache.del(10);

Expand All @@ -119,52 +119,4 @@ TEST(ColumnSharingCacheTest, Del)
compareColumn(col5, col2, col5->size());
}

TEST(ColumnSharingCacheTest, AddAndGetWithLimitation)
{
ColumnSharingCache cache;

auto col = createColumn(8);
// Limit to 0, add should fail.
cache.add(1, 8, col, 0);
ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col1, nullptr);

// Limit to 1, add should succ.
cache.add(1, 8, col, 1);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS);
compareColumn(col1, col, col1->size());
ColumnPtr col2;
st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS);
compareColumn(col2, col, col2->size());
ColumnPtr col3;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col3, nullptr);
ColumnPtr col4;
st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col4, nullptr);

auto col7 = createColumn(9);
// Limit to 1, add should fail.
cache.add(1, 9, col7, 1);
ColumnPtr col8;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col8, nullptr);

// Limit to 2, add should succ.
cache.add(1, 9, col7, 2);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS);
compareColumn(col8, col7, col8->size());
}

} // namespace DB::DM::tests