Skip to content

Commit

Permalink
Merge branch 'master' into nereids_alter_database_set_quota
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Jan 14, 2025
2 parents 8dec13f + 8e2cf8a commit 0a866fa
Show file tree
Hide file tree
Showing 400 changed files with 4,863 additions and 1,306 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Doris
Copyright 2018-2024 The Apache Software Foundation
Copyright 2018-2025 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
43 changes: 39 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
MonotonicStopWatch watch;
watch.start();
auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
std::ostringstream rowset_ids_stream;
bool found = false;
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
continue;
}
found = true;
auto clean_rowsets = tablet->get_snapshot_rowset(true);
// Get first 10 rowset IDs as comma-separated string, just for log
int count = 0;
for (const auto& rowset : clean_rowsets) {
if (count >= 10) break;
if (count > 0) {
rowset_ids_stream << ",";
}
rowset_ids_stream << rowset->rowset_id().to_string();
count++;
}

CloudTablet::recycle_cached_data(std::move(clean_rowsets));
break;
}

if (!found) {
LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
<< ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
<< " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
<< static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

Expand Down
63 changes: 28 additions & 35 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ static std::string debug_info(const Request& req) {
} else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else {
static_assert(!sizeof(Request));
}
Expand Down Expand Up @@ -373,7 +375,11 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
if (op_name == "get delete bitmap") {
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
} else {
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
}
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
Expand Down Expand Up @@ -714,41 +720,12 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_

VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();

int retry_times = 0;
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
auto start = std::chrono::high_resolution_clock::now();
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
brpc::Controller cntl;
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res.Clear();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) [[unlikely]] {
LOG_INFO("failed to get delete bitmap")
.tag("reason", cntl.ErrorText())
.tag("tablet_id", tablet->tablet_id())
.tag("partition_id", tablet->partition_id())
.tag("tried", retry_times);
proxy->set_unhealthy();
} else {
break;
}

if (++retry_times > config::delete_bitmap_rpc_retry_times) {
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap, tablet={} err={}",
tablet->tablet_id(), cntl.ErrorText());
}
break;
}
}
auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap);
auto end = std::chrono::high_resolution_clock::now();
if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
return st;
}

if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
Expand Down Expand Up @@ -1070,7 +1047,12 @@ Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJ
req.mutable_job()->CopyFrom(job);
req.set_action(FinishTabletJobRequest::COMMIT);
req.set_cloud_unique_id(config::cloud_unique_id);
return retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job);
auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job);
if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when commit tablet job {}", job.ShortDebugString());
}
return st;
}

Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
Expand Down Expand Up @@ -1205,6 +1187,17 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in
<< "ms : " << res.status().msg();
bthread_usleep(duration_ms * 1000);
} while (++retry_times <= 100);
if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
"initiator {}",
tablet.table_id(), lock_id, initiator);
} else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
"initiator {}",
tablet.table_id(), lock_id, initiator);
}
return st;
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}

cloud::FinishTabletJobResponse finish_resp;
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", {
std::srand(static_cast<unsigned int>(std::time(nullptr)));
int random_value = std::rand() % 100;
if (random_value < 50) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test txn conflict");
}
});
auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
if (!st.ok()) {
if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,

uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
Expand All @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
stale_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
Expand Down Expand Up @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;
Expand Down
10 changes: 5 additions & 5 deletions be/src/gutil/hash/city.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ uint64 HashLen16(uint64 u, uint64 v) {
const uint64 kMul = 0xc6a4a7935bd1e995ULL;
uint64 a = (u ^ v) * kMul;
a ^= (a >> 47);
uint64 b = (v ^ a) * kMul;
uint64 b = (u ^ a) * kMul;
b ^= (b >> 47);
b *= kMul;
return b;
Expand Down Expand Up @@ -199,11 +199,11 @@ uint64 CityHash64(const char* s, size_t len) {
HashLen16(v.second, w.second) + x);
}

uint64 CityHash64WithSeed(const char* s, size_t len, uint64 seed) {
return CityHash64WithSeeds(s, len, k2, seed);
}

uint64 CityHash64WithSeeds(const char* s, size_t len, uint64 seed0, uint64 seed1) {
return HashLen16(CityHash64(s, len) - seed0, seed1);
}

uint64 CityHash64WithSeed(const char* s, size_t len, uint64 seed) {
return CityHash64WithSeeds(s, len, k2, seed);
}
} // namespace util_hash
9 changes: 4 additions & 5 deletions be/src/gutil/hash/city.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

#include "gutil/integral_types.h"

// WARNING
// The implementation of cityhash in this file is somewhat different from Google's original version.
// For the same input, there will be different output results.
// Therefore, we should do not to use this special cityhash as possible as we can.
namespace util_hash {

uint64 HashLen16(uint64 u, uint64 v);
Expand All @@ -35,9 +39,4 @@ uint64 CityHash64(const char* buf, size_t len);
// Hash function for a byte array. For convenience, a 64-bit seed is also
// hashed into the result. The mapping may change from time to time.
uint64 CityHash64WithSeed(const char* buf, size_t len, uint64 seed);

// Hash function for a byte array. For convenience, two seeds are also
// hashed into the result. The mapping may change from time to time.
uint64 CityHash64WithSeeds(const char* buf, size_t len, uint64 seed0, uint64 seed1);

} // namespace util_hash
9 changes: 9 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ LocalFileSystem::~LocalFileSystem() = default;

Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
VLOG_DEBUG << "create file: " << file.native()
<< ", sync_data: " << (opts ? opts->sync_file_data : true);
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileSystem::create_file_impl",
Status::IOError("inject io error"));
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
Expand Down Expand Up @@ -108,6 +110,8 @@ Status LocalFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
}

Status LocalFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
VLOG_DEBUG << "create directory: " << dir.native()
<< ", failed_if_exists: " << failed_if_exists;
bool exists = true;
RETURN_IF_ERROR(exists_impl(dir, &exists));
if (exists && failed_if_exists) {
Expand All @@ -124,6 +128,7 @@ Status LocalFileSystem::create_directory_impl(const Path& dir, bool failed_if_ex
}

Status LocalFileSystem::delete_file_impl(const Path& file) {
VLOG_DEBUG << "delete file: " << file.native();
bool exists = true;
RETURN_IF_ERROR(exists_impl(file, &exists));
if (!exists) {
Expand All @@ -141,6 +146,7 @@ Status LocalFileSystem::delete_file_impl(const Path& file) {
}

Status LocalFileSystem::delete_directory_impl(const Path& dir) {
VLOG_DEBUG << "delete directory: " << dir.native();
bool exists = true;
RETURN_IF_ERROR(exists_impl(dir, &exists));
if (!exists) {
Expand Down Expand Up @@ -249,6 +255,7 @@ Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector<F
}

Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
VLOG_DEBUG << "rename file: " << orig_name.native() << " to " << new_name.native();
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileSystem::rename",
Status::IOError("inject io error"));
std::error_code ec;
Expand All @@ -265,6 +272,7 @@ Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
}

Status LocalFileSystem::link_file_impl(const Path& src, const Path& dest) {
VLOG_DEBUG << "link file: " << src.native() << " to " << dest.native();
if (::link(src.c_str(), dest.c_str()) != 0) {
return localfs_error(errno, fmt::format("failed to create hard link from {} to {}",
src.native(), dest.native()));
Expand Down Expand Up @@ -364,6 +372,7 @@ Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
}

Status LocalFileSystem::copy_path_impl(const Path& src, const Path& dest) {
VLOG_DEBUG << "copy from " << src.native() << " to " << dest.native();
std::error_code ec;
std::filesystem::copy(src, dest, std::filesystem::copy_options::recursive, ec);
if (ec) {
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/segment_v2/ngram_bloom_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
namespace doris {
namespace segment_v2 {

static constexpr uint64_t SEED_GEN = 217728422;

NGramBloomFilter::NGramBloomFilter(size_t size)
: _size(size),
words((size + sizeof(UnderType) - 1) / sizeof(UnderType)),
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/ngram_bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "olap/rowset/segment_v2/bloom_filter.h"

namespace doris {
static constexpr uint64_t SEED_GEN = 217728422;

namespace segment_v2 {
enum HashStrategyPB : int;

Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ class MultiBlockMerger {
tablet_schema->column(i).get_aggregate_function(
vectorized::AGG_LOAD_SUFFIX,
tablet_schema->column(i).get_be_exec_version());
if (!function) {
return Status::InternalError(
"could not find aggregate function on column {}, aggregation={}",
tablet_schema->column(i).name(),
tablet_schema->column(i).aggregation());
}
agg_functions.push_back(function);
// create aggregate data
auto* place = new char[function->size_of_data()];
Expand Down
36 changes: 34 additions & 2 deletions be/src/pipeline/common/distinct_agg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,45 @@
#include "vec/common/hash_table/ph_hash_map.h"
#include "vec/common/hash_table/ph_hash_set.h"
#include "vec/common/hash_table/string_hash_map.h"
#include "vec/core/types.h"

namespace doris {

template <typename T>
struct DistinctHashSetType {
using HashSet = PHHashSet<T, HashCRC32<T>>;
};

template <>
struct DistinctHashSetType<vectorized::UInt8> {
using HashSet = SmallFixedSizeHashSet<vectorized::UInt8>;
};

template <>
struct DistinctHashSetType<vectorized::Int8> {
using HashSet = SmallFixedSizeHashSet<vectorized::Int8>;
};

template <typename T>
struct DistinctPhase2HashSetType {
using HashSet = PHHashSet<T, HashMixWrapper<T>>;
};

template <>
struct DistinctPhase2HashSetType<vectorized::UInt8> {
using HashSet = SmallFixedSizeHashSet<vectorized::UInt8>;
};

template <>
struct DistinctPhase2HashSetType<vectorized::Int8> {
using HashSet = SmallFixedSizeHashSet<vectorized::Int8>;
};

template <typename T>
using DistinctData = PHHashSet<T, HashCRC32<T>>;
using DistinctData = typename DistinctHashSetType<T>::HashSet;

template <typename T>
using DistinctDataPhase2 = PHHashSet<T, HashMixWrapper<T>>;
using DistinctDataPhase2 = typename DistinctPhase2HashSetType<T>::HashSet;

using DistinctDataWithStringKey = PHHashSet<StringRef>;

Expand Down
Loading

0 comments on commit 0a866fa

Please sign in to comment.