Skip to content

Commit

Permalink
fix empty segment cannot merge after gc and avoid write index data fo…
Browse files Browse the repository at this point in the history
…r empty dmfile (pingcap#4500)
  • Loading branch information
lidezhu committed Mar 30, 2022
1 parent 2b887f0 commit 271d054
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 78 deletions.
63 changes: 54 additions & 9 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,73 @@

namespace DB
{
inline size_t getThreadIdForLog(const String & line)
{
auto sub_line = line.substr(line.find("thread_id="));
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
if (regex_search(sub_line, m, rx))
return std::stoi(m[1]);
else
return 0;
}

// Usage example:
// The first argument is the key you want to search.
// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it.
// So we can use it as the first argument.
// But many kind of thread can print this keyword,
// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread.
// Here we use 'Rough set filter' to specify we just want to search read thread.
// And the complete command is the following:
// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter')
// TODO: this is still a too hack way to do test, but cannot think a better way now.
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2)
throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
// the candidate line must be printed by a thread which also print a line contains `thread_hint`
String thread_hint = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
// get the lines containing `thread_hint` and `key`
std::vector<String> thread_hint_line_candidates;
std::vector<String> key_line_candidates;
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
String line;
while (std::getline(file, line))
{
if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos))
thread_hint_line_candidates.emplace_back(line);
else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
key_line_candidates.emplace_back(line);
}
}
if (line_candidates.empty())
// get target thread id
if (thread_hint_line_candidates.empty() || key_line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back());
if (target_thread_id == 0)
{
output("Invalid");
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
{
if (getThreadIdForLog(*iter) == target_thread_id)
{
target_line = *iter;
break;
}
}
// try parse the first number following the key
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void DeltaPackFile::calculateStat(const DMContext & context)
auto hash_salt = context.hash_salt;

auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, /*set_cache_if_miss*/ false, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down
59 changes: 34 additions & 25 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
}

assert(segment != nullptr);
if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr)
if (segment->hasAbandoned() || segment_snap == nullptr)
continue;

const auto segment_id = segment->segmentId();
Expand All @@ -1562,43 +1562,52 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
// meet empty segment, try merge it
if (segment_snap->getRows() == 0)
{
// release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status.
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

try
{
// Check whether we should apply gc on this segment
const bool should_compact
= GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log)
|| GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log);
bool should_compact = false;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log))
{
should_compact = true;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
}
bool finish_gc_on_segment = false;
if (should_compact)
{
if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment)
{
// Continue to check whether we need to apply more tasks on this segment
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
gc_segments_num++;
finish_gc_on_segment = true;
Expand Down
27 changes: 19 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ class DMFilePackFilter
static DMFilePackFilter loadFrom(const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
bool set_cache_if_miss,
const RowKeyRanges & rowkey_ranges,
const RSOperatorPtr & filter,
const IdSetPtr & read_packs,
const FileProviderPtr & file_provider,
const ReadLimiterPtr & read_limiter)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_ranges, filter, read_packs, file_provider, read_limiter);
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter);
pack_filter.init();
return pack_filter;
}
Expand Down Expand Up @@ -87,6 +88,7 @@ class DMFilePackFilter
DMFilePackFilter(const DMFilePtr & dmfile_,
const MinMaxIndexCachePtr & index_cache_,
UInt64 hash_salt_,
bool set_cache_if_miss_,
const RowKeyRanges & rowkey_ranges_, // filter by handle range
const RSOperatorPtr & filter_, // filter by push down where clause
const IdSetPtr & read_packs_, // filter by pack index
Expand All @@ -95,6 +97,7 @@ class DMFilePackFilter
: dmfile(dmfile_)
, index_cache(index_cache_)
, hash_salt(hash_salt_)
, set_cache_if_miss(set_cache_if_miss_)
, rowkey_ranges(rowkey_ranges_)
, filter(filter_)
, read_packs(read_packs_)
Expand Down Expand Up @@ -200,20 +203,24 @@ class DMFilePackFilter
const DMFilePtr & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter)
{
auto & type = dmfile->getColumnStat(col_id).type;
const auto file_name_base = DMFile::getFileNameBase(col_id);

auto load = [&]() {
auto index_file_size = dmfile->colIndexSize(file_name_base);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
if (!dmfile->configuration)
{
auto index_buf = ReadBufferFromFileProvider(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), dmfile->colIndexSize(file_name_base)),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), index_file_size),
read_limiter);
index_buf.seek(dmfile->colIndexOffset(file_name_base));
return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base));
Expand All @@ -228,21 +235,24 @@ class DMFilePackFilter
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
index_buf->seek(dmfile->colIndexOffset(file_name_base));
auto file_size = dmfile->colIndexSize(file_name_base);
auto header_size = dmfile->configuration->getChecksumHeaderLength();
auto frame_total_size = dmfile->configuration->getChecksumFrameLength();
auto frame_count = file_size / frame_total_size + (file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, file_size - header_size * frame_count);
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count);
}
};
MinMaxIndexPtr minmax_index;
if (index_cache)
if (index_cache && set_cache_if_miss)
{
minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load);
}
else
{
minmax_index = load();
// try load from the cache first
if (index_cache)
minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base));
if (!minmax_index)
minmax_index = load();
}
indexes.emplace(col_id, RSIndex(type, minmax_index));
}
Expand All @@ -255,13 +265,14 @@ class DMFilePackFilter
if (!dmfile->isColIndexExist(col_id))
return;

loadIndex(param.indexes, dmfile, file_provider, index_cache, col_id, read_limiter);
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter);
}

private:
DMFilePtr dmfile;
MinMaxIndexCachePtr index_cache;
UInt64 hash_salt;
bool set_cache_if_miss;
RowKeyRanges rowkey_ranges;
RSOperatorPtr filter;
IdSetPtr read_packs;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ DMFileReader::DMFileReader(
, read_columns(read_columns_)
, enable_clean_read(enable_clean_read_)
, max_read_version(max_read_version_)
, pack_filter(dmfile_, index_cache_, hash_salt_, rowkey_ranges_, filter_, read_packs_, file_provider_, read_limiter)
, pack_filter(dmfile_, index_cache_, hash_salt_, /*set_cache_if_miss*/ true, rowkey_ranges_, filter_, read_packs_, file_provider_, read_limiter)
, handle_res(pack_filter.getHandleRes())
, use_packs(pack_filter.getUsePacks())
, skip_packs_by_column(read_columns.size(), 0)
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,25 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)

void DMFileWriter::write(const Block & block, const BlockProperty & block_property)
{
is_empty_file = false;
DMFile::PackStat stat;
stat.rows = block.rows();
stat.not_clean = block_property.not_clean_rows;
stat.bytes = block.bytes(); // This is bytes of pack data in memory.

auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column;

const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : (const ColumnVector<UInt8> *)del_mark_column.get();
const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : static_cast<const ColumnVector<UInt8> *>(del_mark_column.get());

for (auto & cd : write_columns)
{
auto & col = getByColumnId(block, cd.id).column;
const auto & col = getByColumnId(block, cd.id).column;
writeColumn(cd.id, *cd.type, *col, del_mark);

if (cd.id == VERSION_COLUMN_ID)
stat.first_version = col->get64(0);
else if (cd.id == TAG_COLUMN_ID)
stat.first_tag = (UInt8)(col->get64(0));
stat.first_tag = static_cast<UInt8>(col->get64(0));
}

if (!options.flags.isSingleFile())
Expand Down Expand Up @@ -345,7 +346,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
dmfile->encryptionIndexPath(stream_name),
false,
write_limiter);
stream->minmaxes->write(*type, buf);
if (!is_empty_file)
stream->minmaxes->write(*type, buf);
buf.sync();
bytes_written += buf.getMaterializedBytes();
}
Expand All @@ -358,7 +360,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
write_limiter,
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
stream->minmaxes->write(*type, *buf);
if (!is_empty_file)
stream->minmaxes->write(*type, *buf);
buf->sync();
bytes_written += buf->getMaterializedBytes();
#ifndef NDEBUG
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class DMFileWriter

FileProviderPtr file_provider;
WriteLimiterPtr write_limiter;

// use to avoid write index data for empty file
bool is_empty_file = true;
};

} // namespace DM
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
auto pack_filter = DMFilePackFilter::loadFrom(file,
index_cache,
hash_salt,
/*set_cache_if_miss*/ true,
{range},
EMPTY_FILTER,
{},
Expand Down Expand Up @@ -228,6 +229,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
auto pack_filter = DMFilePackFilter::loadFrom(file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
/*set_cache_if_miss*/ false,
{rowkey_range},
EMPTY_FILTER,
{},
Expand Down Expand Up @@ -346,16 +348,15 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
size_t match_packs = 0;
size_t total_match_rows = 0;
size_t total_match_bytes = 0;
// Usually, this method will be called for some "cold" key ranges. Loading the index
// into cache may pollute the cache and make the hot index cache invalid. Set the
// index cache to nullptr so that the cache won't be polluted.
// TODO: We can use the cache if the index happens to exist in the cache, but
// don't refill the cache if the index does not exist.
// Usually, this method will be called for some "cold" key ranges.
// Loading the index into cache may pollute the cache and make the hot index cache invalid.
// So don't refill the cache if the index does not exist.
for (auto & f : stable->files)
{
auto filter = DMFilePackFilter::loadFrom(f, //
nullptr,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
/*set_cache_if_miss*/ false,
{range},
RSOperatorPtr{},
IdSetPtr{},
Expand Down
Loading

0 comments on commit 271d054

Please sign in to comment.