Skip to content

Commit

Permalink
Storage: Refactor DMFileReader (#8927)
Browse files Browse the repository at this point in the history
close #8904
  • Loading branch information
Lloyd-Pottiger committed Apr 26, 2024
1 parent 3770103 commit 98e4a7c
Show file tree
Hide file tree
Showing 6 changed files with 512 additions and 366 deletions.
69 changes: 56 additions & 13 deletions dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@

#include <Storages/DeltaMerge/File/ColumnCache.h>

namespace DB
namespace DB::DM
{
namespace DM
{
RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id)

RangeWithStrategys ColumnCache::getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id)
{
PackRange target_range{pack_id, pack_id + pack_count};
PackRange target_range{start_pack_idx, start_pack_idx + pack_count};

RangeWithStrategys range_and_strategys;

Strategy strategy = Strategy::Unknown;
range_and_strategys.reserve(pack_count);
auto strategy = Strategy::Unknown;
size_t range_start = 0;
for (size_t cursor = target_range.first; cursor < target_range.second; cursor++)
for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor)
{
if (isPackInCache(cursor, column_id))
{
Expand All @@ -36,7 +35,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
}
else if (strategy == Strategy::Disk)
{
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Disk));
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk);
}
range_start = cursor;
strategy = Strategy::Memory;
Expand All @@ -45,7 +44,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
{
if (strategy == Strategy::Memory)
{
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Memory));
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory);
}
else if (strategy == Strategy::Disk)
{
Expand All @@ -55,8 +54,53 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
strategy = Strategy::Disk;
}
}
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, target_range.second}, strategy));
range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy);
range_and_strategys.shrink_to_fit();
return range_and_strategys;
}

RangeWithStrategys ColumnCache::getReadStrategy(
size_t start_pack_idx,
size_t pack_count,
const std::vector<size_t> & clean_read_pack_idx)
{
PackRange target_range{start_pack_idx, start_pack_idx + pack_count};

RangeWithStrategys range_and_strategys;
range_and_strategys.reserve(pack_count);
auto strategy = Strategy::Unknown;
size_t range_start = 0;
for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor)
{
if (std::find(clean_read_pack_idx.cbegin(), clean_read_pack_idx.cend(), cursor) != clean_read_pack_idx.cend())
{
if (strategy == Strategy::Memory)
{
continue;
}
else if (strategy == Strategy::Disk)
{
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk);
}
range_start = cursor;
strategy = Strategy::Memory;
}
else
{
if (strategy == Strategy::Memory)
{
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory);
}
else if (strategy == Strategy::Disk)
{
continue;
}
range_start = cursor;
strategy = Strategy::Disk;
}
}
range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy);
range_and_strategys.shrink_to_fit();
return range_and_strategys;
}

Expand Down Expand Up @@ -125,5 +169,4 @@ bool ColumnCache::isPackInCache(PackId pack_id, ColId column_id)
return false;
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
17 changes: 9 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/ColumnCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@

#pragma once

#include <Core/Block.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/KVStore/Types.h>
#include <common/logger_useful.h>

#include <cstddef>
#include <memory>

namespace DB
{
namespace DM
namespace DB::DM
{

using ColId = DB::ColumnID;
using PackId = size_t;
using PackRange = std::pair<PackId, PackId>;
Expand All @@ -46,7 +43,11 @@ class ColumnCache

using RangeWithStrategy = std::pair<PackRange, ColumnCache::Strategy>;
using RangeWithStrategys = std::vector<RangeWithStrategy>;
RangeWithStrategys getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id);
RangeWithStrategys getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id);
static RangeWithStrategys getReadStrategy(
size_t start_pack_idx,
size_t pack_count,
const std::vector<size_t> & clean_read_pack_idx);

void tryPutColumn(size_t pack_id, ColId column_id, const ColumnPtr & column, size_t rows_offset, size_t rows_count);

Expand Down Expand Up @@ -74,5 +75,5 @@ using ColumnCachePtrs = std::vector<ColumnCachePtr>;
using RangeWithStrategy = ColumnCache::RangeWithStrategy;
using RangeWithStrategys = ColumnCache::RangeWithStrategys;
using ColumnCacheElement = ColumnCache::ColumnCacheElement;
} // namespace DM
} // namespace DB

} // namespace DB::DM
Loading

0 comments on commit 98e4a7c

Please sign in to comment.