Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Splitting DMFileReaderPool to reduce lock contention (#9126) #9130

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 57 additions & 15 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@

namespace DB::DM
{
DMFileReaderPool & DMFileReaderPool::instance()
{
static DMFileReaderPool reader_pool;
return reader_pool;
}

void DMFileReaderPool::add(DMFileReader & reader)
void DMFileReaderPoolSharding::add(const String & path, DMFileReader & reader)
{
std::lock_guard lock(mtx);
readers[reader.path()].insert(&reader);
readers[path].insert(&reader);
}

void DMFileReaderPool::del(DMFileReader & reader)
void DMFileReaderPoolSharding::del(const String & path, DMFileReader & reader)
{
std::lock_guard lock(mtx);
auto itr = readers.find(reader.path());
auto itr = readers.find(path);
if (itr == readers.end())
{
return;
Expand All @@ -43,10 +38,16 @@ void DMFileReaderPool::del(DMFileReader & reader)
}
}

void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col)
void DMFileReaderPoolSharding::set(
const String & path,
DMFileReader & from_reader,
int64_t col_id,
size_t start,
size_t count,
ColumnPtr & col)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
auto itr = readers.find(path);
if (itr == readers.end())
{
return;
Expand All @@ -62,17 +63,58 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
bool DMFileReaderPoolSharding::hasConcurrentReader(const String & path)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
auto itr = readers.find(path);
return itr != readers.end() && itr->second.size() >= 2;
}

DMFileReader * DMFileReaderPool::get(const std::string & name)
DMFileReader * DMFileReaderPoolSharding::get(const std::string & path)
{
std::lock_guard lock(mtx);
auto itr = readers.find(name);
auto itr = readers.find(path);
return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr;
}

DMFileReaderPool & DMFileReaderPool::instance()
{
static DMFileReaderPool reader_pool;
return reader_pool;
}

DMFileReaderPoolSharding & DMFileReaderPool::getSharding(const String & path)
{
return shardings[std::hash<String>{}(path) % shardings.size()];
}

void DMFileReaderPool::add(DMFileReader & reader)
{
auto path = reader.path();
getSharding(path).add(path, reader);
}

void DMFileReaderPool::del(DMFileReader & reader)
{
auto path = reader.path();
getSharding(path).del(path, reader);
}

void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col)
{
auto path = from_reader.path();
getSharding(path).set(path, from_reader, col_id, start, count, col);
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
{
auto path = from_reader.path();
return getSharding(path).hasConcurrentReader(path);
}

DMFileReader * DMFileReaderPool::get(const std::string & path)
{
return getSharding(path).get(path);
}
} // namespace DB::DM
27 changes: 24 additions & 3 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,27 @@ class ColumnSharingCacheMap

class DMFileReader;

class DMFileReaderPoolSharding
{
public:
void add(const String & path, DMFileReader & reader);
void del(const String & path, DMFileReader & reader);
void set(
const String & path,
DMFileReader & from_reader,
int64_t col_id,
size_t start,
size_t count,
ColumnPtr & col);
bool hasConcurrentReader(const String & path);
// `get` is just for test.
DMFileReader * get(const std::string & path);

private:
std::mutex mtx;
std::unordered_map<std::string, std::unordered_set<DMFileReader *>> readers;
};

// DMFileReaderPool holds all the DMFileReader objects, so we can easily find DMFileReader objects with the same DMFile ID.
// When a DMFileReader object successfully reads a column's packs, it will try to put these packs into other DMFileReader objects' cache.
class DMFileReaderPool
Expand All @@ -232,14 +253,14 @@ class DMFileReaderPool
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);
bool hasConcurrentReader(DMFileReader & from_reader);
// `get` is just for test.
DMFileReader * get(const std::string & name);
DMFileReader * get(const std::string & path);

private:
DMFileReaderPool() = default;
DMFileReaderPoolSharding & getSharding(const String & path);

private:
std::mutex mtx;
std::unordered_map<std::string, std::unordered_set<DMFileReader *>> readers;
std::array<DMFileReaderPoolSharding, 16> shardings;
};

} // namespace DB::DM