Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#ch Disable the minidump #4

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build_bin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export CMAKE_BUILD_TYPE=${CUSTOM_CMAKE_BUILD_TYPE:-RelWithDebInfo}
export CMAKE_FLAGS="-DCMAKE_INSTALL_PREFIX=../output -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DUSE_BYTEDANCE_RDKAFKA=${CUSTOM_USE_BYTEDANCE_RDKAFKA:-1} ${CMAKE_FLAGS}"
CMAKE_FLAGS="-DCMAKE_INSTALL_PREFIX=../output ${CMAKE_FLAGS}"
CMAKE_FLAGS="-DCMAKE_BUILD_TYPE=${CUSTOM_CMAKE_BUILD_TYPE:-RelWithDebInfo} $CMAKE_FLAGS"
CMAKE_FLAGS="-DENABLE_BREAKPAD=ON $CMAKE_FLAGS" # enable minidump
[[ -n "$CUSTOM_SANITIZE" ]] && CMAKE_FLAGS="-DSANITIZE=$CUSTOM_SANITIZE $CMAKE_FLAGS"
[[ -n "$CUSTOM_MAX_LINKING_JOBS" ]] && CMAKE_FLAGS="-DPARALLEL_LINK_JOBS=${CUSTOM_MAX_LINKING_JOBS} ${CMAKE_FLAGS}"
[[ -n "$CUSTOM_MAX_COMPILE_JOBS" ]] && CMAKE_FLAGS="-DPARALLEL_COMPILE_JOBS=${CUSTOM_MAX_COMPILE_JOBS} ${CMAKE_FLAGS}"
Expand Down
2 changes: 1 addition & 1 deletion docker/debian/base/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sed -i \
-e "s/set (VERSION_SCM [^) ]*/set (VERSION_SCM $VERSION_SCM/g;" \
cmake/version.cmake

cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} ${CMAKE_FLAGS} -DENABLE_BREAKPAD=ON -DCMAKE_INSTALL_PREFIX=build_install -S . -B build_docker
cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} ${CMAKE_FLAGS} -DCMAKE_INSTALL_PREFIX=build_install -S . -B build_docker
NUM_JOBS=$(( ($(nproc || grep -c ^processor /proc/cpuinfo) + 1) / 2 ))

ninja -C build_docker -j $NUM_JOBS install
2 changes: 1 addition & 1 deletion docker/packager/binary/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ccache_status
ccache --zero-stats ||:

# Build everything
cmake --debug-trycompile -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" -DENABLE_CHECK_HEAVY_BUILDS=0 -DENABLE_BREAKPAD=ON "${CMAKE_FLAGS[@]}" ..
cmake --debug-trycompile -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" -DENABLE_CHECK_HEAVY_BUILDS=0 "${CMAKE_FLAGS[@]}" ..

# No quotes because I want it to expand to nothing if empty.
# shellcheck disable=SC2086 # No quotes because I want it to expand to nothing if empty.
Expand Down
87 changes: 85 additions & 2 deletions src/Storages/MergeTree/MergeTreeCloudData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include <Storages/MergeTree/MergeTreeCloudData.h>
#include <Storages/MergeTree/checkDataPart.h>
#include "Processors/QueryPipeline.h"

namespace DB
Expand Down Expand Up @@ -260,6 +261,10 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)

auto cnch_parallel_prefetching = getSettings()->cnch_parallel_prefetching ? getSettings()->cnch_parallel_prefetching : 16;

std::mutex mutex;
MutableDataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;

MutableDataPartsVector partial_parts;
// auto it = std::remove_if(parts.begin(), parts.end(), [](const auto & part) { return part->isPartial(); });
// std::copy(it, parts.end(), std::back_inserter(partial_parts));
Expand All @@ -270,7 +275,22 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)
std::atomic<bool> has_non_adaptive_parts = false;
size_t pool_size = std::min(parts.size(), UInt64(cnch_parallel_prefetching));
runOverPartsInParallel(parts, pool_size, [&](auto & part) {
part->loadColumnsChecksumsIndexes(false, false);
bool broken = loadDataPart(part);
if (broken) {
/// Ignore broken parts that can appear as a result of hard server restart.
LOG_ERROR(
log,
"Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually",
part->relative_path,
part->name
);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;

return;
}

if (part->index_granularity_info.is_adaptive)
has_adaptive_parts.store(true, std::memory_order_relaxed);
else
Expand All @@ -279,7 +299,22 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)

pool_size = std::min(partial_parts.size(), UInt64(cnch_parallel_prefetching));
runOverPartsInParallel(partial_parts, pool_size, [&](auto & part) {
part->loadColumnsChecksumsIndexes(false, false);
bool broken = loadDataPart(part);
if (broken) {
/// Ignore broken parts that can appear as a result of hard server restart.
LOG_ERROR(
log,
"Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually",
part->relative_path,
part->name
);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;

return;
}

if (part->index_granularity_info.is_adaptive)
has_adaptive_parts.store(true, std::memory_order_relaxed);
else
Expand All @@ -292,6 +327,54 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)
}

has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (suspicious_broken_parts > 0) {
LOG_WARNING(
log,
"Broken parts count on start is bigger than 0, count: {}",
suspicious_broken_parts
);
}

auto deactivate_part = [&](MutableDataPartPtr part) {
part->remove_time.store(part->modification_time, std::memory_order_relaxed);
modifyPartState(part, DataPartState::Outdated);
};


for (auto & part : broken_parts_to_detach) {
deactivate_part(part);
}

parts.erase(std::remove_if(parts.begin(), parts.end(),
[&broken_parts_to_detach](const auto& part) {
// Check if this part is in the broken_parts_to_detach vector
return std::find(broken_parts_to_detach.begin(), broken_parts_to_detach.end(), part) != broken_parts_to_detach.end();
}), parts.end());
}

bool MergeTreeCloudData::loadDataPart(MutableDataPartPtr part) {
bool broken = false;
try {
part->loadColumnsChecksumsIndexes(false, false);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;

broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}

return broken;
}

void MergeTreeCloudData::runOverPartsInParallel(
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeCloudData.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MergeTreeCloudData : public MergeTreeMetaBase
/// Load prepared parts, deactivate outdated parts and construct coverage link
/// [Preallocate Mode] if worker_topology_hash is not empty, need to check whether the given topology is matched with worker's topology
void loadDataParts(MutableDataPartsVector & parts, UInt64 worker_topology_hash = 0);
bool loadDataPart(MutableDataPartPtr part);

/// Remove Outdated parts of which timestamp is less than expired ts from container.
/// DO NOT check reference count of parts.
Expand Down
8 changes: 7 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataPartCNCH.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,13 @@ IMergeTreeDataPart::ChecksumsPtr MergeTreeDataPartCNCH::loadChecksumsFromRemote(
return checksums;

String data_rel_path = fs::path(getFullRelativePath()) / DATA_FILE;
auto data_footer = loadPartDataFooter();
MergeTreeDataPartChecksums::FileChecksums data_footer;
try {
data_footer = loadPartDataFooter();
}
catch (...) {
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "The checksums file in part {} under path {} does not exist", name, data_rel_path);
}
const auto & checksum_file = data_footer["checksums.txt"];

if (checksum_file.file_size == 0 /* && isDeleted() */)
Expand Down
Loading