Skip to content

Commit

Permalink
Unfinished changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Nov 3, 2023
1 parent 77507b8 commit 21830b5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
90 changes: 59 additions & 31 deletions src/Interpreters/Cache/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ CacheGuard::Lock FileCache::lockCache() const
return cache_guard.lock();
}

FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
std::pair<FileSegments, bool> FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
{
/// Given range = [left, right] and non-overlapping ordered set of file segments,
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
Expand All @@ -156,13 +156,14 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), range.left, range.size(), FileSegment::State::DETACHED);
return { file_segment };
return { {file_segment}, false };
}

if (locked_key.empty())
return {};

FileSegments result;
bool is_limit_reached = false;
auto add_to_result = [&](const FileSegmentMetadata & file_segment_metadata)
{
if (file_segments_limit && result.size() == file_segments_limit)
Expand Down Expand Up @@ -202,7 +203,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
return {};

if (!add_to_result(file_segment_metadata))
return result;
return { result, true };
}
else /// segment_it <-- segmment{k}
{
Expand All @@ -219,7 +220,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
/// ^
/// range.left
if (!add_to_result(prev_file_segment_metadata))
return result;
return { result, true };
}
}

Expand All @@ -236,20 +237,43 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
break;

if (!add_to_result(file_segment_metadata))
return result;
return { result, true };

++segment_it;
}
}

return result;
return { result, false };
}

std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size)
{
assert(size > 0);
std::vector<FileSegment::Range> ranges;

size_t current_pos = offset;
size_t end_pos_non_included = offset + size;
size_t remaining_size = size;

FileSegments file_segments;
while (current_pos < end_pos_non_included)
{
auto current_file_segment_size = std::min(remaining_size, max_file_segment_size);
ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1);

remaining_size -= current_file_segment_size;
current_pos += current_file_segment_size;
}

return ranges;
}

FileSegments FileCache::splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & settings)
{
assert(size > 0);
Expand All @@ -261,7 +285,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t remaining_size = size;

FileSegments file_segments;
while (current_pos < end_pos_non_included)
while (current_pos < end_pos_non_included && file_segments.size() < file_segments_limit)
{
current_file_segment_size = std::min(remaining_size, max_file_segment_size);
remaining_size -= current_file_segment_size;
Expand All @@ -273,7 +297,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
current_pos += current_file_segment_size;
}

assert(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
assert(file_segments.empty() || file_segments_limit > 0 || offset + size - 1 == file_segments.back()->range().right);
return file_segments;
}

Expand Down Expand Up @@ -338,6 +362,13 @@ void FileCache::fillHolesWithEmptyFileSegments(
}
else
{
auto ranges = splitRange(current_pos, hole_size);
for (const auto & range : ranges)
{
auto file_segment_metadata_it = addFileSegment(
locked_key, range.left, range.size(), FileSegment::State::EMPTY, settings, nullptr);
file_segments.push_back(file_segment_metadata_it->second->file_segment);
}
auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(it, std::move(split));
Expand Down Expand Up @@ -386,7 +417,7 @@ FileSegmentsHolderPtr FileCache::set(
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
FileSegment::Range range(offset, offset + size - 1);

auto file_segments = getImpl(*locked_key, range, /* file_segments_limit */0);
auto [file_segments, _] = getImpl(*locked_key, range, /* file_segments_limit */0);
if (!file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");

Expand All @@ -400,7 +431,7 @@ FileSegmentsHolderPtr FileCache::set(
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, settings);
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
}

return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
Expand All @@ -419,7 +450,7 @@ FileCache::getOrSet(

assertInitialized();

const auto end_offset = offset + size - 1;
auto end_offset = offset + size - 1;
const auto aligned_offset = roundDownToMultiple(offset, boundary_alignment);
const auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= offset);
Expand All @@ -428,41 +459,38 @@ FileCache::getOrSet(

/// Get all segments which intersect with the given range.
FileSegment::Range range(offset, end_offset);
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
auto [file_segments, limit_reached] = getImpl(*locked_key, range, file_segments_limit);

if (!file_segments.empty() && limit_reached)
end_offset = file_segments.back()->range().right;

if (aligned_offset < offset && (file_segments.empty() || offset < file_segments.front()->range().left))
{
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? offset - 1 : file_segments.front()->range().left - 1);
auto prefix_file_segments = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);
auto [prefix_file_segments, _] = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);

if (prefix_file_segments.empty())
{
range.left = aligned_offset;
}
else
{
size_t last_right_offset = prefix_file_segments.back()->range().right;

while (!prefix_file_segments.empty() && prefix_file_segments.front()->range().right < offset)
prefix_file_segments.pop_front();

if (prefix_file_segments.empty())
{
range.left = last_right_offset + 1;
}
else
{
file_segments.splice(file_segments.begin(), prefix_file_segments);
range.left = file_segments.front()->range().left;
}
/// [_________________________][_______________]
/// ^ ^ ^
/// aligned_offset offset end_offset
/// ______] [_____] [___] [__________]
/// ^
/// prifx_file_segments.back().right
chassert(prefix_file_segments.back()->range().right < offset);
chassert(prefix_file_segments.back()->range().right >= aligned_offset);

range.left = prefix_file_segments.back()->range().right + 1;
}
}

if (end_offset < aligned_end_offset && (file_segments.empty() || file_segments.back()->range().right < end_offset))
{
auto suffix_range = FileSegment::Range(end_offset, aligned_end_offset);
/// Get only 1 file segment.
auto suffix_file_segments = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);
auto [suffix_file_segments, _] = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);

if (suffix_file_segments.empty())
range.right = aligned_end_offset;
Expand All @@ -472,7 +500,7 @@ FileCache::getOrSet(

if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, settings);
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
}
else
{
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/Cache/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,17 @@ class FileCache : private boost::noncopyable
void loadMetadataImpl();
void loadMetadataForKeys(const std::filesystem::path & keys_dir);

FileSegments getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const;
/// bool - if `file_segments_limit` reached or not.
std::pair<FileSegments, bool> getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const;

std::vector<FileSegment::Range> splitRange(size_t offset, size_t size);

FileSegments splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & create_settings);

void fillHolesWithEmptyFileSegments(
Expand Down

0 comments on commit 21830b5

Please sign in to comment.