Skip to content
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), std::defer_lock);
bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));

_new_tablet->set_alter_failed(false);
Defer defer([this] {
// if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
if (_new_tablet->tablet_state() != TABLET_RUNNING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是在fix bug 么?
如果是,就单独提一个pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema_tablet_scanner requires IS_ALTER_FAILED, but CloudTablet does not record this. I am recording it here, but it is not a bug fix.

_new_tablet->set_alter_failed(true);
}
});

if (!owns_lock) {
LOG(WARNING) << "Failed to obtain schema change lock, there might be inverted index being "
"built on base_tablet="
Expand Down Expand Up @@ -142,7 +150,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
&rs_splits, false));
}
Defer defer {[&]() {
Defer defer2 {[&]() {
_new_tablet->set_alter_version(-1);
_base_tablet->set_alter_version(-1);
}};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
}

const std::string& CloudTablet::tablet_path() const {
return get_rowset_with_max_version()->tablet_path();
}

Status CloudTablet::capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const {
Versions version_path;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class CloudTablet final : public BaseTablet {
return _approximate_data_size.load(std::memory_order_relaxed);
}

const std::string& tablet_path() const override;

// clang-format off
int64_t fetch_add_approximate_num_rowsets (int64_t x) { return _approximate_num_rowsets .fetch_add(x, std::memory_order_relaxed); }
int64_t fetch_add_approximate_num_segments(int64_t x) { return _approximate_num_segments.fetch_add(x, std::memory_order_relaxed); }
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,15 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
<< max_base_rowset_delete_bitmap_score_tablet_id << ", tablets=[" << ss.str() << "]";
}

std::vector<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_all_tablet() {
std::vector<std::shared_ptr<CloudTablet>> tablets;
tablets.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets](auto& t) { tablets.push_back(t); });
return tablets;
}

void CloudTabletMgr::put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet) {
_tablet_map->put(tablet);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class CloudTabletMgr {
void get_topn_tablet_delete_bitmap_score(uint64_t* max_delete_bitmap_score,
uint64_t* max_base_rowset_delete_bitmap_score);

std::vector<std::shared_ptr<CloudTablet>> get_all_tablet();

// **ATTN: JUST FOR UT**
void put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet);

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "exec/schema_scanner/schema_table_privileges_scanner.h"
#include "exec/schema_scanner/schema_table_properties_scanner.h"
#include "exec/schema_scanner/schema_tables_scanner.h"
#include "exec/schema_scanner/schema_tablets_scanner.h"
#include "exec/schema_scanner/schema_user_privileges_scanner.h"
#include "exec/schema_scanner/schema_user_scanner.h"
#include "exec/schema_scanner/schema_variables_scanner.h"
Expand Down Expand Up @@ -233,6 +234,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaBackendKerberosTicketCacheScanner::create_unique();
case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS:
return SchemaRoutineLoadJobScanner::create_unique();
case TSchemaTableType::SCH_BACKEND_TABLETS:
return SchemaTabletsScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
31 changes: 26 additions & 5 deletions be/src/exec/schema_scanner/schema_scanner_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cctz/time_zone.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
#include "util/thrift_rpc_helper.h"
#include "vec/common/string_ref.h"
Expand All @@ -31,7 +32,7 @@ namespace doris {
void SchemaScannerHelper::insert_string_value(int col_index, std::string str_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), str_val.size());
Expand All @@ -41,7 +42,7 @@ void SchemaScannerHelper::insert_string_value(int col_index, std::string str_val
void SchemaScannerHelper::insert_datetime_value(int col_index, const std::vector<void*>& datas,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
auto data = datas[0];
Expand All @@ -54,7 +55,7 @@ void SchemaScannerHelper::insert_datetime_value(int col_index, int64_t timestamp
const cctz::time_zone& ctz,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();

Expand All @@ -68,10 +69,30 @@ void SchemaScannerHelper::insert_datetime_value(int col_index, int64_t timestamp
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_bool_value(int col_index, bool bool_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnBool*>(col_ptr)->insert_value(bool_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int32_value(int col_index, int32_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnInt32*>(col_ptr)->insert_value(int_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnInt64*>(col_ptr)->insert_value(int_val);
Expand All @@ -81,7 +102,7 @@ void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
void SchemaScannerHelper::insert_double_value(int col_index, double double_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
mutable_col_ptr = block->get_by_position(col_index).column->assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
assert_cast<vectorized::ColumnFloat64*>(col_ptr)->insert_value(double_val);
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/schema_scanner/schema_scanner_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#ifndef _SCHEMA_SCANNER_HELPER_H_

#include <stdint.h>
Expand All @@ -39,6 +40,9 @@ class SchemaScannerHelper {
static void insert_datetime_value(int col_index, int64_t timestamp, const cctz::time_zone& ctz,
vectorized::Block* block);

static void insert_bool_value(int col_index, bool bool_val, vectorized::Block* block);

static void insert_int32_value(int col_index, int32_t int_val, vectorized::Block* block);
static void insert_int64_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_double_value(int col_index, double double_val, vectorized::Block* block);
};
Expand Down
Loading
Loading