Skip to content

Commit

Permalink
Merge branch 'master' into show_broker
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 21, 2024
2 parents 06b48c1 + ca579c1 commit 1486552
Show file tree
Hide file tree
Showing 24 changed files with 573 additions and 308 deletions.
5 changes: 1 addition & 4 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id,
pre_max_version - 1};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(
std::make_tuple(_tablet->tablet_id(), start, before_end));
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
Expand Down
61 changes: 57 additions & 4 deletions be/src/cloud/cloud_delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <thread>
#include <utility>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
Expand Down Expand Up @@ -78,8 +79,8 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
Expand All @@ -95,6 +96,50 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re
auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
LOG(INFO) << "show_local_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count
<< ",cardinality=" << cardinality << ",size=" << size;

rapidjson::Document root;
root.SetObject();
root.AddMember("delete_bitmap_count", count, root.GetAllocator());
root.AddMember("cardinality", cardinality, root.GetAllocator());
root.AddMember("size", size, root.GetAllocator());

// to json string
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());

return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
if (!st.ok()) {
LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id
<< ", st=" << st.to_string();
return st;
}
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
}
auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
LOG(INFO) << "show_ms_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count
<< ",cardinality=" << cardinality << ",size=" << size;

rapidjson::Document root;
root.SetObject();
Expand All @@ -113,9 +158,17 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re

void CloudDeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) {
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
std::string json_result;
Status st = _handle_show_local_delete_bitmap_count(req, &json_result);
if (!st.ok()) {
HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
} else {
HttpChannel::send_reply(req, HttpStatus::OK, json_result);
}
} else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS) {
std::string json_result;
Status st = _handle_show_delete_bitmap_count(req, &json_result);
Status st = _handle_show_ms_delete_bitmap_count(req, &json_result);
if (!st.ok()) {
HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
} else {
Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_delete_bitmap_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class HttpRequest;

class ExecEnv;

enum class DeleteBitmapActionType { COUNT_INFO = 1 };
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };

/// This action is used for viewing the delete bitmap status
class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
Expand All @@ -45,7 +45,8 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
void handle(HttpRequest* req) override;

private:
Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result);
Status _handle_show_local_delete_bitmap_count(HttpRequest* req, std::string* json_result);
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
CloudStorageEngine& _engine;
Expand Down
15 changes: 10 additions & 5 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab
}

Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data,
bool sync_delete_bitmap) {
bool sync_delete_bitmap, bool full_sync) {
using namespace std::chrono;

TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);
Expand All @@ -411,7 +411,11 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
idx->set_partition_id(tablet->partition_id());
{
std::shared_lock rlock(tablet->get_header_lock());
req.set_start_version(tablet->max_version_unlocked() + 1);
if (full_sync) {
req.set_start_version(0);
} else {
req.set_start_version(tablet->max_version_unlocked() + 1);
}
req.set_base_compaction_cnt(tablet->base_compaction_cnt());
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
req.set_cumulative_point(tablet->cumulative_layer_point());
Expand Down Expand Up @@ -471,7 +475,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
resp.stats(), req.idx(), &delete_bitmap);
resp.stats(), req.idx(), &delete_bitmap, full_sync);
if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
LOG_WARNING("rowset meta is expired, need to retry")
.tag("tablet", tablet->tablet_id())
Expand Down Expand Up @@ -617,12 +621,13 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
DeleteBitmap* delete_bitmap) {
DeleteBitmap* delete_bitmap, bool full_sync) {
if (rs_metas.empty()) {
return Status::OK();
}

if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
if (!full_sync &&
sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CloudMetaMgr {
Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta);

Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false,
bool sync_delete_bitmap = true);
bool sync_delete_bitmap = true, bool full_sync = false);

Status prepare_rowset(const RowsetMeta& rs_meta,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
Expand Down Expand Up @@ -116,7 +116,8 @@ class CloudMetaMgr {

Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap,
bool full_sync = false);
void check_table_size_correctness(const RowsetMeta& rs_meta);
int64_t get_segment_file_size(const RowsetMeta& rs_meta);
int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/remote_file_system.h"
#include "io/io_common.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
Expand Down Expand Up @@ -345,8 +346,9 @@ bool CompactionMixin::handle_ordered_data_compaction() {
if (!config::enable_ordered_data_compaction) {
return false;
}
if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
// The remote file system does not support to link files.
if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION ||
compaction_type() == ReaderType::READER_FULL_COMPACTION) {
// The remote file system and full compaction does not support to link files.
return false;
}
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ Status FullCompaction::prepare_compact() {
std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock());
tablet()->set_is_full_compaction_running(true);

DBUG_EXECUTE_IF("FullCompaction.prepare_compact.set_cumu_point",
{ tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); })

// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Status PrimaryKeyIndexBuilder::init() {

auto opt = segment_v2::BloomFilterOptions();
opt.fpp = 0.01;
_bloom_filter_index_builder.reset(
new segment_v2::PrimaryKeyBloomFilterIndexWriterImpl(opt, type_info));
RETURN_IF_ERROR(segment_v2::PrimaryKeyBloomFilterIndexWriterImpl::create(
opt, type_info, &_bloom_filter_index_builder));
return Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,22 @@ Status NGramBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_opti
return Status::OK();
}

Status PrimaryKeyBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_options,
const TypeInfo* typeinfo,
std::unique_ptr<BloomFilterIndexWriter>* res) {
FieldType type = typeinfo->type();
switch (type) {
case FieldType::OLAP_FIELD_TYPE_CHAR:
case FieldType::OLAP_FIELD_TYPE_VARCHAR:
case FieldType::OLAP_FIELD_TYPE_STRING:
*res = std::make_unique<PrimaryKeyBloomFilterIndexWriterImpl>(bf_options, typeinfo);
break;
default:
return Status::NotSupported("unsupported type for primary key bloom filter index:{}",
std::to_string(int(type)));
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class PrimaryKeyBloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
}
};

static Status create(const BloomFilterOptions& bf_options, const TypeInfo* typeinfo,
std::unique_ptr<BloomFilterIndexWriter>* res);
// This method may allocate large memory for bf, will return error
// when memory is exhaused to prevent oom.
Status add_values(const void* values, size_t count) override;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1209,9 +1209,13 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::
}
auto start_bmk = std::get<1>(delete_bitmap_tuple);
auto end_bmk = std::get<2>(delete_bitmap_tuple);
// the key range of to be removed is [start_bmk,end_bmk),
// due to the different definitions of the right boundary,
// so use end_bmk as right boundary when removing local delete bitmap,
// use (end_bmk - 1) as right boundary when removing ms delete bitmap
remove(start_bmk, end_bmk);
to_delete.emplace_back(std::make_tuple(std::get<0>(start_bmk).to_string(), 0,
std::get<2>(end_bmk)));
std::get<2>(end_bmk) - 1));
}
_stale_delete_bitmap.erase(version_str);
}
Expand Down
13 changes: 9 additions & 4 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,16 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status",
run_status_compaction_action);
CloudDeleteBitmapAction* count_delete_bitmap_action =
_pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_INFO, _env, engine,
CloudDeleteBitmapAction* count_local_delete_bitmap_action =
_pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine,
TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count",
count_delete_bitmap_action);
_ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local",
count_local_delete_bitmap_action);
CloudDeleteBitmapAction* count_ms_delete_bitmap_action =
_pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine,
TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_ms",
count_ms_delete_bitmap_action);
#ifdef ENABLE_INJECTION_POINT
InjectionPointAction* injection_point_action = _pool.add(new InjectionPointAction);
_ev_http_server->register_handler(HttpMethod::GET, "/api/injection_point/{op}",
Expand Down
Loading

0 comments on commit 1486552

Please sign in to comment.