Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque

std::unique_lock<std::mutex> schema_change_lock(_base_tablet->get_schema_change_lock(),
std::try_to_lock);
_new_tablet->set_alter_failed(false);
Defer defer([this] {
// if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
if (_new_tablet->tablet_state() != TABLET_RUNNING) {
_new_tablet->set_alter_failed(true);
}
});
if (!schema_change_lock.owns_lock()) {
LOG(WARNING) << "Failed to obtain schema change lock. base_tablet="
<< request.base_tablet_id;
Expand Down Expand Up @@ -134,7 +141,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
&rs_splits, false));
}
Defer defer {[&]() {
Defer defer2 {[&]() {
_new_tablet->set_alter_version(-1);
_base_tablet->set_alter_version(-1);
}};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ Status CloudTablet::capture_consistent_rowsets_unlocked(
return _capture_consistent_rowsets_unlocked(version_path, rowsets);
}

std::string CloudTablet::tablet_path() const {
return "";
}

Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class CloudTablet final : public BaseTablet {
return _approximate_data_size.load(std::memory_order_relaxed);
}

std::string tablet_path() const override;

// clang-format off
int64_t fetch_add_approximate_num_rowsets (int64_t x) { return _approximate_num_rowsets .fetch_add(x, std::memory_order_relaxed); }
int64_t fetch_add_approximate_num_segments(int64_t x) { return _approximate_num_segments.fetch_add(x, std::memory_order_relaxed); }
Expand Down
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,13 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
<< max_base_rowset_delete_bitmap_score_tablet_id << ",tablets=[" << ss.str() << "]";
}

std::vector<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_all_tablet() {
std::vector<std::shared_ptr<CloudTablet>> tablets;
tablets.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets](auto& t) { tablets.push_back(t); });
return tablets;
}

void CloudTabletMgr::put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet) {
_tablet_map->put(tablet);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class CloudTabletMgr {
// **ATTN: JUST FOR UT**
void put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet);

std::vector<std::shared_ptr<CloudTablet>> get_all_tablet();

private:
CloudStorageEngine& _engine;

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "exec/schema_scanner/schema_table_privileges_scanner.h"
#include "exec/schema_scanner/schema_table_properties_scanner.h"
#include "exec/schema_scanner/schema_tables_scanner.h"
#include "exec/schema_scanner/schema_tablets_scanner.h"
#include "exec/schema_scanner/schema_user_privileges_scanner.h"
#include "exec/schema_scanner/schema_user_scanner.h"
#include "exec/schema_scanner/schema_variables_scanner.h"
Expand Down Expand Up @@ -247,6 +248,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaCatalogMetaCacheStatsScanner::create_unique();
case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS:
return SchemaRoutineLoadJobScanner::create_unique();
case TSchemaTableType::SCH_BACKEND_TABLETS:
return SchemaTabletsScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
43 changes: 34 additions & 9 deletions be/src/exec/schema_scanner/schema_scanner_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,54 @@ namespace doris {
void SchemaScannerHelper::insert_string_value(int col_index, std::string str_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_datetime_value(int col_index, const std::vector<void*>& datas,
vectorized::Block* block) {
void SchemaScannerHelper::insert_datetime_value(int col_index, int64_t timestamp,
const std::string& ctz, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
auto data = datas[0];
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);

std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_unixtime(timestamp, ctz);
datas[0] = src;
auto* data = datas[0];
assert_cast<vectorized::ColumnDateTime*>(col_ptr)->insert_data(reinterpret_cast<char*>(data),
0);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_bool_value(int col_index, bool bool_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnBool*>(col_ptr)->insert_value(bool_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int32_value(int col_index, int32_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnInt32*>(col_ptr)->insert_value(int_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(int_val);
Expand All @@ -62,7 +87,7 @@ void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
void SchemaScannerHelper::insert_double_value(int col_index, double double_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(double_val);
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/schema_scanner/schema_scanner_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ class Block;
class SchemaScannerHelper {
public:
static void insert_string_value(int col_index, std::string str_val, vectorized::Block* block);
static void insert_datetime_value(int col_index, const std::vector<void*>& datas,
static void insert_datetime_value(int col_index, int64_t timestamp, const std::string& ctz,
vectorized::Block* block);

static void insert_bool_value(int col_index, bool bool_val, vectorized::Block* block);

static void insert_int32_value(int col_index, int32_t int_val, vectorized::Block* block);
static void insert_int64_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_double_value(int col_index, double double_val, vectorized::Block* block);
};
Expand Down
227 changes: 227 additions & 0 deletions be/src/exec/schema_scanner/schema_tablets_scanner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/schema_tablets_scanner.h"

#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/olap_common.pb.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <numeric>
#include <string>
#include <utility>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/status.h"
#include "exec/schema_scanner.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "olap/storage_engine.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"

namespace doris {
namespace vectorized {
class Block;
} // namespace vectorized

#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaTabletsScanner::_s_tbls_columns = {
// name, type, size, is_null
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"REPLICA_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"PARTITION_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"TABLET_PATH", TYPE_STRING, sizeof(StringRef), true},
{"TABLET_LOCAL_SIZE", TYPE_BIGINT, sizeof(int64_t), true},
{"TABLET_REMOTE_SIZE", TYPE_BIGINT, sizeof(int64_t), true},
{"VERSION_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"SEGMENT_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"NUM_COLUMNS", TYPE_BIGINT, sizeof(int64_t), true},
{"ROW_SIZE", TYPE_BIGINT, sizeof(int64_t), true},
{"COMPACTION_SCORE", TYPE_INT, sizeof(int32_t), true},
{"COMPRESS_KIND", TYPE_STRING, sizeof(StringRef), true},
{"IS_USED", TYPE_BOOLEAN, sizeof(bool), true},
{"IS_ALTER_FAILED", TYPE_BOOLEAN, sizeof(bool), true},
{"CREATE_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"UPDATE_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"IS_OVERLAP", TYPE_BOOLEAN, sizeof(bool), true},
};

SchemaTabletsScanner::SchemaTabletsScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BACKEND_TABLETS) {};

Status SchemaTabletsScanner::start(RuntimeState* state) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
_backend_id = state->backend_id();
RETURN_IF_ERROR(_get_all_tablets());
return Status::OK();
}

Status SchemaTabletsScanner::_get_all_tablets() {
if (config::is_cloud_mode()) {
auto tablets =
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr().get_all_tablet();
std::ranges::for_each(tablets, [&](auto& tablet) {
_tablets.push_back(std::static_pointer_cast<BaseTablet>(tablet));
});
} else {
auto tablets = ExecEnv::GetInstance()
->storage_engine()
.to_local()
.tablet_manager()
->get_all_tablet();
std::ranges::for_each(tablets, [&](auto& tablet) {
_tablets.push_back(std::static_pointer_cast<BaseTablet>(tablet));
});
}
return Status::OK();
}

Status SchemaTabletsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}
*eos = true;
return _fill_block_impl(block);
}

Status SchemaTabletsScanner::_fill_block_impl(vectorized::Block* block) {
SCOPED_TIMER(_fill_block_timer);

size_t row_num = _tablets.size();
if (row_num == 0) {
return Status::OK();
}

size_t fill_tablets_num = _tablets.size();
std::vector<void*> datas(fill_tablets_num);

for (int i = 0; i < _tablets.size(); i++) {
BaseTabletSPtr tablet = _tablets[i];
// BE_ID
SchemaScannerHelper::insert_int64_value(0, _backend_id, block);

// TABLET_ID
SchemaScannerHelper::insert_int64_value(1, tablet->tablet_meta()->tablet_id(), block);

// REPLICA_ID
SchemaScannerHelper::insert_int64_value(2, tablet->tablet_meta()->replica_id(), block);

// PARTITION_ID
SchemaScannerHelper::insert_int64_value(3, tablet->tablet_meta()->partition_id(), block);

// TABLET_PATH
SchemaScannerHelper::insert_string_value(4, tablet->tablet_path(), block);

// TABLET_LOCAL_SIZE
SchemaScannerHelper::insert_int64_value(5, tablet->tablet_meta()->tablet_local_size(),
block);

// TABLET_REMOTE_SIZE
SchemaScannerHelper::insert_int64_value(6, tablet->tablet_meta()->tablet_remote_size(),
block);

// VERSION_COUNT
SchemaScannerHelper::insert_int64_value(
7, static_cast<int64_t>(tablet->tablet_meta()->version_count()), block);

// SEGMENT_COUNT
SchemaScannerHelper::insert_int64_value(
8,
[&tablet]() {
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
return std::accumulate(rs_metas.begin(), rs_metas.end(), 0,
[](int64_t val, RowsetMetaSharedPtr& rs_meta) {
return val + rs_meta->num_segments();
});
}(),
block);

// NUM_COLUMNS
SchemaScannerHelper::insert_int64_value(9, tablet->tablet_meta()->tablet_columns_num(),
block);

// ROW_SIZE
SchemaScannerHelper::insert_int64_value(10, static_cast<int64_t>(tablet->row_size()),
block);

// COMPACTION_SCORE
SchemaScannerHelper::insert_int32_value(11, tablet->get_real_compaction_score(), block);

// COMPRESS_KIND
SchemaScannerHelper::insert_string_value(12, CompressKind_Name(tablet->compress_kind()),
block);

// IS_USED
SchemaScannerHelper::insert_bool_value(
13,
[&tablet]() {
if (config::is_cloud_mode()) {
return true;
}
return std::static_pointer_cast<Tablet>(tablet)->is_used();
}(),
block);

// IS_ALTER_FAILED
SchemaScannerHelper::insert_bool_value(14, tablet->is_alter_failed(), block);

// CREATE_TIME
SchemaScannerHelper::insert_datetime_value(15, tablet->tablet_meta()->creation_time(),
TimezoneUtils::default_time_zone, block);

// UPDATE_TIME
SchemaScannerHelper::insert_datetime_value(
16,
[&tablet]() {
auto rowset = tablet->get_rowset_with_max_version();
return rowset == nullptr ? 0 : rowset->newest_write_timestamp();
}(),
TimezoneUtils::default_time_zone, block);

// IS_OVERLAP
SchemaScannerHelper::insert_bool_value(
17,
[&tablet]() {
const auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
return std::any_of(rs_metas.begin(), rs_metas.end(),
[](const RowsetMetaSharedPtr& rs_meta) {
return rs_meta->is_segments_overlapping();
});
}(),
block);
}

return Status::OK();
}
} // namespace doris
Loading
Loading