Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4500
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Mar 30, 2022
1 parent 2b887f0 commit 221b6cc
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 74 deletions.
63 changes: 54 additions & 9 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,73 @@

namespace DB
{
inline size_t getThreadIdForLog(const String & line)
{
auto sub_line = line.substr(line.find("thread_id="));
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
if (regex_search(sub_line, m, rx))
return std::stoi(m[1]);
else
return 0;
}

// Usage example:
// The first argument is the key you want to search.
// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it.
// So we can use it as the first argument.
// But many kind of thread can print this keyword,
// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread.
// Here we use 'Rough set filter' to specify we just want to search read thread.
// And the complete command is the following:
// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter')
// TODO: this is still a too hack way to do test, but cannot think a better way now.
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2)
throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
// the candidate line must be printed by a thread which also print a line contains `thread_hint`
String thread_hint = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
// get the lines containing `thread_hint` and `key`
std::vector<String> thread_hint_line_candidates;
std::vector<String> key_line_candidates;
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
String line;
while (std::getline(file, line))
{
if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos))
thread_hint_line_candidates.emplace_back(line);
else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
key_line_candidates.emplace_back(line);
}
}
if (line_candidates.empty())
// get target thread id
if (thread_hint_line_candidates.empty() || key_line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back());
if (target_thread_id == 0)
{
output("Invalid");
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
{
if (getThreadIdForLog(*iter) == target_thread_id)
{
target_line = *iter;
break;
}
}
// try parse the first number following the key
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,21 @@ void DeltaPackFile::calculateStat(const DMContext & context)
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = context.hash_salt;

<<<<<<< HEAD:dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());
=======
auto pack_filter = DMFilePackFilter::loadFrom(
file,
index_cache,
/*set_cache_if_miss*/ false,
{segment_range},
EMPTY_FILTER,
{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
/*tracing_logger*/ nullptr);
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500)):dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down
59 changes: 34 additions & 25 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
}

assert(segment != nullptr);
if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr)
if (segment->hasAbandoned() || segment_snap == nullptr)
continue;

const auto segment_id = segment->segmentId();
Expand All @@ -1562,43 +1562,52 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
// meet empty segment, try merge it
if (segment_snap->getRows() == 0)
{
// release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status.
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

try
{
// Check whether we should apply gc on this segment
const bool should_compact
= GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log)
|| GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log);
bool should_compact = false;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log))
{
should_compact = true;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
}
bool finish_gc_on_segment = false;
if (should_compact)
{
if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment)
{
// Continue to check whether we need to apply more tasks on this segment
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
gc_segments_num++;
finish_gc_on_segment = true;
Expand Down
79 changes: 79 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
: file_provider(context.getFileProvider())
, read_limiter(context.getReadLimiter())
{
// init from global context
const auto & global_context = context.getGlobalContext();
setCaches(global_context.getMarkCache(), global_context.getMinMaxIndexCache());
// init from settings
setFromSettings(context.getSettingsRef());
}

DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges)
{
if (dmfile->getStatus() != DMFile::Status::READABLE)
throw Exception(fmt::format(
"DMFile [{}] is expected to be in READABLE status, but: {}",
dmfile->fileId(),
DMFile::statusString(dmfile->getStatus())),
ErrorCodes::LOGICAL_ERROR);

// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
// It is safe to ignore them here.
if (unlikely(rowkey_ranges.empty() && enable_clean_read))
throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR);

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
tracing_logger);

DMFileReader reader(
dmfile,
read_columns,
is_common_handle,
enable_clean_read,
max_data_version,
std::move(pack_filter),
mark_cache,
enable_column_cache,
column_cache,
aio_threshold,
max_read_buffer_size,
file_provider,
read_limiter,
rows_threshold_per_read,
read_one_pack_every_time,
tracing_logger);

return std::make_shared<DMFileBlockInputStream>(std::move(reader));
}
} // namespace DB::DM
48 changes: 41 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using IdSetPtr = std::shared_ptr<IdSet>;
class DMFilePackFilter
{
public:
<<<<<<< HEAD
static DMFilePackFilter loadFrom(const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
Expand All @@ -35,6 +36,20 @@ class DMFilePackFilter
const ReadLimiterPtr & read_limiter)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_ranges, filter, read_packs, file_provider, read_limiter);
=======
static DMFilePackFilter loadFrom(
const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
const RowKeyRanges & rowkey_ranges,
const RSOperatorPtr & filter,
const IdSetPtr & read_packs,
const FileProviderPtr & file_provider,
const ReadLimiterPtr & read_limiter,
const DB::LoggerPtr & tracing_logger)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger);
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
pack_filter.init();
return pack_filter;
}
Expand Down Expand Up @@ -86,15 +101,23 @@ class DMFilePackFilter
private:
DMFilePackFilter(const DMFilePtr & dmfile_,
const MinMaxIndexCachePtr & index_cache_,
<<<<<<< HEAD
UInt64 hash_salt_,
=======
bool set_cache_if_miss_,
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
const RowKeyRanges & rowkey_ranges_, // filter by handle range
const RSOperatorPtr & filter_, // filter by push down where clause
const IdSetPtr & read_packs_, // filter by pack index
const FileProviderPtr & file_provider_,
const ReadLimiterPtr & read_limiter_)
: dmfile(dmfile_)
, index_cache(index_cache_)
<<<<<<< HEAD
, hash_salt(hash_salt_)
=======
, set_cache_if_miss(set_cache_if_miss_)
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
, rowkey_ranges(rowkey_ranges_)
, filter(filter_)
, read_packs(read_packs_)
Expand Down Expand Up @@ -200,20 +223,24 @@ class DMFilePackFilter
const DMFilePtr & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter)
{
auto & type = dmfile->getColumnStat(col_id).type;
const auto file_name_base = DMFile::getFileNameBase(col_id);

auto load = [&]() {
auto index_file_size = dmfile->colIndexSize(file_name_base);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
if (!dmfile->configuration)
{
auto index_buf = ReadBufferFromFileProvider(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), dmfile->colIndexSize(file_name_base)),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), index_file_size),
read_limiter);
index_buf.seek(dmfile->colIndexOffset(file_name_base));
return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base));
Expand All @@ -228,21 +255,24 @@ class DMFilePackFilter
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
index_buf->seek(dmfile->colIndexOffset(file_name_base));
auto file_size = dmfile->colIndexSize(file_name_base);
auto header_size = dmfile->configuration->getChecksumHeaderLength();
auto frame_total_size = dmfile->configuration->getChecksumFrameLength();
auto frame_count = file_size / frame_total_size + (file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, file_size - header_size * frame_count);
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count);
}
};
MinMaxIndexPtr minmax_index;
if (index_cache)
if (index_cache && set_cache_if_miss)
{
minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load);
}
else
{
minmax_index = load();
// try load from the cache first
if (index_cache)
minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base));
if (!minmax_index)
minmax_index = load();
}
indexes.emplace(col_id, RSIndex(type, minmax_index));
}
Expand All @@ -255,13 +285,17 @@ class DMFilePackFilter
if (!dmfile->isColIndexExist(col_id))
return;

loadIndex(param.indexes, dmfile, file_provider, index_cache, col_id, read_limiter);
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter);
}

private:
DMFilePtr dmfile;
MinMaxIndexCachePtr index_cache;
<<<<<<< HEAD
UInt64 hash_salt;
=======
bool set_cache_if_miss;
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
RowKeyRanges rowkey_ranges;
RSOperatorPtr filter;
IdSetPtr read_packs;
Expand Down
Loading

0 comments on commit 221b6cc

Please sign in to comment.