Skip to content

Commit

Permalink
Merge branch 'release-5.1' into cherry-pick-3581-to-release-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Jan 26, 2022
2 parents 1204c76 + 714fc7d commit a086ba1
Show file tree
Hide file tree
Showing 18 changed files with 476 additions and 235 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashException.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ namespace DB
E(Internal, "TiFlash DDL internal error.", \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(StaleSchema, "Schema is stale and need to reload all schema.", \
"This error will be recover by reload all schema automatically.", \
"");) \
C(Coprocessor, \
E(BadRequest, "Bad TiDB coprocessor request.", \
Expand Down
37 changes: 22 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class DTEntryIterator
{
using LeafPtr = DTLeaf<M, F, S> *;

LeafPtr leaf;
LeafPtr leaf = nullptr;
size_t pos;
Int64 delta;

Expand Down Expand Up @@ -553,8 +553,8 @@ class DTEntriesCopy : Allocator

const size_t entry_count;
const Int64 delta;
UInt64 * const sids;
DTMutation * const mutations;
UInt64 * const sids = nullptr;
DTMutation * const mutations = nullptr;

public:
DTEntriesCopy(LeafPtr left_leaf, size_t entry_count_, Int64 delta_)
Expand All @@ -578,8 +578,10 @@ class DTEntriesCopy : Allocator

~DTEntriesCopy()
{
this->free(sids, sizeof(UInt64) * entry_count);
this->free(mutations, sizeof(DTMutation) * entry_count);
if (sids)
this->free(sids, sizeof(UInt64) * entry_count);
if (mutations)
this->free(mutations, sizeof(DTMutation) * entry_count);
}

class Iterator
Expand Down Expand Up @@ -728,18 +730,19 @@ class DeltaTree
static_assert(std::is_standard_layout_v<Intern>);

private:
NodePtr root;
LeafPtr left_leaf, right_leaf;
NodePtr root = nullptr;
LeafPtr left_leaf = nullptr;
LeafPtr right_leaf = nullptr;
size_t height = 1;

size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;

Allocator * allocator;
Allocator * allocator = nullptr;
size_t bytes = 0;

Logger * log;
Logger * log = nullptr;

public:
// For test cases only.
Expand Down Expand Up @@ -889,12 +892,16 @@ class DeltaTree

~DeltaTree()
{
if (isLeaf(root))
freeTree<Leaf>((LeafPtr)root);
else
freeTree<Intern>((InternPtr)root);
if (root)
{
if (isLeaf(root))
freeTree<Leaf>((LeafPtr)root);
else
freeTree<Intern>((InternPtr)root);
}

delete allocator;
if (allocator)
delete allocator;

LOG_TRACE(log, "free");
}
Expand Down Expand Up @@ -1480,4 +1487,4 @@ typename DT_CLASS::InternPtr DT_CLASS::afterNodeUpdated(T * node)
#undef DT_CLASS

} // namespace DM
} // namespace DB
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ PageFile::recover(const String & parent_path, const FileProviderPtr & file_provi
LOG_INFO(log, "Broken page without data file, ignored: " + pf.dataPath());
return {{}, Type::Invalid};
}

return {pf, Type::Formal};
}
else if (ss[0] == folder_prefix_checkpoint)
Expand All @@ -870,7 +871,6 @@ PageFile::recover(const String & parent_path, const FileProviderPtr & file_provi
LOG_INFO(log, "Broken page without meta file, ignored: " + pf.metaPath());
return {{}, Type::Invalid};
}
pf.type = Type::Checkpoint;

return {pf, Type::Checkpoint};
}
Expand Down
61 changes: 40 additions & 21 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,22 @@ PageFileSet PageStorage::listAllPageFiles(const FileProviderPtr & file_provi
if (!option.ignore_checkpoint)
page_files.insert(page_file);
}
else
else if (page_file_type == PageFile::Type::Temp)
{
// For Temp and Invalid
if (option.remove_tmp_files)
{
if (page_file_type == PageFile::Type::Temp)
{
page_file.deleteEncryptionInfo();
}
// Remove temp and invalid file.
page_file.deleteEncryptionInfo();
// Remove temp files.
if (Poco::File file(directory + "/" + name); file.exists())
file.remove(true);
}
}
else
{
// Remove invalid files.
if (Poco::File file(directory + "/" + name); option.remove_invalid_files && file.exists())
file.remove(true);
}
}
}

Expand Down Expand Up @@ -250,8 +252,9 @@ void PageStorage::restore()
#ifdef PAGE_STORAGE_UTIL_DEBUGGGING
opt.remove_tmp_files = false;
#endif
opt.ignore_legacy = false;
opt.ignore_checkpoint = false;
opt.ignore_legacy = false;
opt.ignore_checkpoint = false;
opt.remove_invalid_files = true;
PageFileSet page_files = PageStorage::listAllPageFiles(file_provider, delegator, page_file_log, opt);

/// Restore current version from both formal and legacy page files
Expand Down Expand Up @@ -350,7 +353,9 @@ void PageStorage::restore()
for (auto & pf : page_files_to_remove)
LOG_TRACE(log, storage_name << pf.toString());
#else
archivePageFiles(page_files_to_remove);
// when restore `PageStorage`, the `PageFile` in `page_files_to_remove` is not counted in the total size,
// so no need to remove its' size here again.
archivePageFiles(page_files_to_remove, false);
#endif
removePageFilesIf(page_files, [&page_files_to_remove](const PageFile & pf) -> bool { return page_files_to_remove.count(pf) > 0; });
}
Expand All @@ -363,8 +368,12 @@ void PageStorage::restore()
// Only insert location of PageFile when it storing delta data
for (auto & page_file : page_files)
{
// Checkpoint file is always stored on `delegator`'s default path, so no need to insert it's location here
size_t idx_in_delta_paths = delegator->addPageFileUsedSize(
page_file.fileIdLevel(), page_file.getDiskSize(), page_file.parentPath(), /*need_insert_location*/ true);
page_file.fileIdLevel(),
page_file.getDiskSize(),
page_file.parentPath(),
/*need_insert_location*/ page_file.getType() != PageFile::Type::Checkpoint);
// Try best to reuse writable page files
if (page_file.reusableForWrite() && isPageFileSizeFitsWritable(page_file, config))
{
Expand Down Expand Up @@ -828,12 +837,17 @@ void PageStorage::drop()

ListPageFilesOption opt;
opt.ignore_checkpoint = false;
opt.ignore_legacy = false;
opt.remove_tmp_files = false;
auto page_files = PageStorage::listAllPageFiles(file_provider, delegator, page_file_log, opt);
opt.ignore_legacy = false;
opt.remove_tmp_files = false;
opt.remove_invalid_files = false;
auto page_files = PageStorage::listAllPageFiles(file_provider, delegator, page_file_log, opt);

for (const auto & page_file : page_files)
delegator->removePageFile(page_file.fileIdLevel(), page_file.getDiskSize(), false);
{
// All checkpoint file is stored on `delegator`'s default path and we didn't record it's location as other types of PageFile,
// so we need set `remove_from_default_path` to true to distinguish this situation.
delegator->removePageFile(page_file.fileIdLevel(), page_file.getDiskSize(), /*meta_left*/ false, /*remove_from_default_path*/ page_file.getType() == PageFile::Type::Checkpoint);
}

/// FIXME: Note that these drop directories actions are not atomic, may leave some broken files on disk.

Expand Down Expand Up @@ -962,7 +976,8 @@ bool PageStorage::gc(bool not_skip)
}
ListPageFilesOption opt;
opt.remove_tmp_files = true;
auto page_files = PageStorage::listAllPageFiles(file_provider, delegator, page_file_log, opt);
opt.remove_invalid_files = false;
auto page_files = PageStorage::listAllPageFiles(file_provider, delegator, page_file_log, opt);
if (unlikely(page_files.empty()))
{
// In case the directory are removed by accident
Expand Down Expand Up @@ -1127,7 +1142,7 @@ bool PageStorage::gc(bool not_skip)
PageFileSet page_files_to_archive;
std::tie(page_files, page_files_to_archive, gc_context.num_bytes_written_in_compact_legacy)
= compactor.tryCompact(std::move(page_files), writing_files_snapshot);
archivePageFiles(page_files_to_archive);
archivePageFiles(page_files_to_archive, true);
gc_context.num_files_archive_in_compact_legacy = page_files_to_archive.size();
}

Expand Down Expand Up @@ -1173,7 +1188,7 @@ bool PageStorage::gc(bool not_skip)
return gc_context.compact_result.do_compaction;
}

void PageStorage::archivePageFiles(const PageFileSet & page_files)
void PageStorage::archivePageFiles(const PageFileSet & page_files, bool remove_size)
{
const Poco::Path archive_path(delegator->defaultPath(), PageStorage::ARCHIVE_SUBDIR);
Poco::File archive_dir(archive_path);
Expand All @@ -1193,11 +1208,13 @@ void PageStorage::archivePageFiles(const PageFileSet & page_files)
if (Poco::File file(path); file.exists())
{
// To ensure the atomic of deletion, move to the `archive` dir first and then remove the PageFile dir.
auto file_size = page_file.getDiskSize();
auto file_size = remove_size ? page_file.getDiskSize() : 0;
file.moveTo(dest);
file.remove(true);
page_file.deleteEncryptionInfo();
delegator->removePageFile(page_file.fileIdLevel(), file_size, false);
// All checkpoint file is stored on `delegator`'s default path and we didn't record it's location as other types of PageFile,
// so we need set `remove_from_default_path` to true to distinguish this situation.
delegator->removePageFile(page_file.fileIdLevel(), file_size, /*meta_left*/ false, /*remove_from_default_path*/ page_file.getType() == PageFile::Type::Checkpoint);
}
}
LOG_INFO(log, storage_name << " archive " + DB::toString(page_files.size()) + " files to " + archive_path.toString());
Expand Down Expand Up @@ -1266,7 +1283,9 @@ PageStorage::gcRemoveObsoleteData(PageFileSet & page_file
// Don't touch the <file_id, level> that are used for the sorting then you could
// work around by using a const_cast
size_t bytes_removed = const_cast<PageFile &>(page_file).setLegacy();
delegator->removePageFile(page_id_and_lvl, bytes_removed, true);
// All checkpoint file is stored on `delegator`'s default path and we didn't record it's location as other types of PageFile,
// so we need set `remove_from_default_path` to true to distinguish this situation.
delegator->removePageFile(page_id_and_lvl, bytes_removed, /*meta_left*/ true, /*remove_from_default_path*/ page_file.getType() == PageFile::Type::Checkpoint);
num_data_removed += 1;
}
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class PageStorage : private boost::noncopyable
bool remove_tmp_files = false;
bool ignore_legacy = false;
bool ignore_checkpoint = false;
bool remove_invalid_files = false;
};

using VersionedPageEntries = PageEntriesVersionSetWithDelta;
Expand Down Expand Up @@ -202,7 +203,7 @@ class PageStorage : private boost::noncopyable

static constexpr const char * ARCHIVE_SUBDIR = "archive";

void archivePageFiles(const PageFileSet & page_files_to_archive);
void archivePageFiles(const PageFileSet & page_files_to_archive, bool remove_size);

std::tuple<size_t, size_t> //
gcRemoveObsoleteData(PageFileSet & page_files,
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/Page/gc/LegacyCompactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ LegacyCompactor::tryCompact( //
if (!info.empty())
{
bytes_written = writeToCheckpoint(storage_path, checkpoint_id, std::move(wb), file_provider, page_file_log);
// Don't need to insert location since Checkpoint PageFile won't be read except using listAllPageFiles in `PageStorage::restore`
// 1. Don't need to insert location since Checkpoint PageFile won't be read except using listAllPageFiles in `PageStorage::restore`
// 2. Also, `checkpoint_id` is the same as the largest page file compacted,
// so insert the checkpoint file's location here will overwrite the old page file's location and may incur error when deploy on multi disk environment
// 3. And we always store checkpoint file on `delegator`'s default path, so we can just remove it from the default path when removing it
delegator->addPageFileUsedSize(checkpoint_id, bytes_written, storage_path, /*need_insert_location=*/false);
}

Expand Down
33 changes: 19 additions & 14 deletions dbms/src/Storages/PathCapacityMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ extern const Metric StoreSizeUsed;

namespace DB
{
inline size_t safeGetQuota(const std::vector<size_t> & quotas, size_t idx)
{
return idx < quotas.size() ? quotas[idx] : 0;
}

inline size_t safeGetQuota(const std::vector<size_t> & quotas, size_t idx) { return idx < quotas.size() ? quotas[idx] : 0; }

PathCapacityMetrics::PathCapacityMetrics( //
const size_t capacity_quota_, // will be ignored if `main_capacity_quota` is not empty
const Strings & main_paths_, const std::vector<size_t> main_capacity_quota_, //
const Strings & latest_paths_, const std::vector<size_t> latest_capacity_quota_)
: capacity_quota(capacity_quota_), log(&Poco::Logger::get("PathCapacityMetrics"))
PathCapacityMetrics::PathCapacityMetrics(
const size_t capacity_quota_, // will be ignored if `main_capacity_quota` is not empty
const Strings & main_paths_,
const std::vector<size_t> main_capacity_quota_,
const Strings & latest_paths_,
const std::vector<size_t> latest_capacity_quota_)
: capacity_quota(capacity_quota_)
, log(&Poco::Logger::get("PathCapacityMetrics"))
{
if (main_capacity_quota_.empty())
if (!main_capacity_quota_.empty())
{
// The `capacity_quota_` is left for backward compatibility.
// If `main_capacity_quota_` is not empty, use the capacity for each path instead of global capacity.
Expand Down Expand Up @@ -130,10 +135,10 @@ FsStats PathCapacityMetrics::getFsStats() const
// Default threshold "schedule.low-space-ratio" in PD is 0.8, log warning message if avail ratio is low.
if (avail_rate <= 0.2)
LOG_WARNING(log,
"Available space is only " << DB::toString(avail_rate * 100.0, 2)
<< "% of capacity size. Avail size: " << formatReadableSizeWithBinarySuffix(total_stat.avail_size)
<< ", used size: " << formatReadableSizeWithBinarySuffix(total_stat.used_size)
<< ", capacity size: " << formatReadableSizeWithBinarySuffix(total_stat.capacity_size));
"Available space is only " << DB::toString(avail_rate * 100.0, 2)
<< "% of capacity size. Avail size: " << formatReadableSizeWithBinarySuffix(total_stat.avail_size)
<< ", used size: " << formatReadableSizeWithBinarySuffix(total_stat.used_size)
<< ", capacity size: " << formatReadableSizeWithBinarySuffix(total_stat.capacity_size));
total_stat.ok = 1;

CurrentMetrics::set(CurrentMetrics::StoreSizeCapacity, total_stat.capacity_size);
Expand Down Expand Up @@ -219,8 +224,8 @@ FsStats PathCapacityMetrics::CapacityInfo::getStats(Poco::Logger * log) const
avail = capacity - res.used_size;
else if (log)
LOG_WARNING(log,
"No available space for path: " << path << ", capacity: " << formatReadableSizeWithBinarySuffix(capacity) //
<< ", used: " << formatReadableSizeWithBinarySuffix(used_bytes));
"No available space for path: " << path << ", capacity: " << formatReadableSizeWithBinarySuffix(capacity) //
<< ", used: " << formatReadableSizeWithBinarySuffix(used_bytes));

const uint64_t disk_free_bytes = vfs.f_bavail * vfs.f_frsize;
if (avail > disk_free_bytes)
Expand Down
Loading

0 comments on commit a086ba1

Please sign in to comment.