Skip to content

Commit

Permalink
Merge branch 'master' into file_scanner_null_ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Sep 10, 2024
2 parents 1266aa8 + eb4673f commit caf8722
Show file tree
Hide file tree
Showing 172 changed files with 2,029 additions and 515 deletions.
3 changes: 3 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down Expand Up @@ -86,6 +87,7 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
Expand All @@ -107,6 +109,7 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
git clone https://github.com/DoozyX/clang-format-lint-action .github/actions/clang-format-lint-action
pushd .github/actions/clang-format-lint-action &>/dev/null
git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
git checkout c71d0bf4e21876ebec3e5647491186f8797fde31 # v0.18.2
popd &>/dev/null
- name: Install Python dependencies
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ namespace {
std::mutex s_task_signatures_mtx;
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;

std::atomic_ulong s_report_version(time(nullptr) * 10000);
std::atomic_ulong s_report_version(time(nullptr) * 100000);

void increase_report_version() {
s_report_version.fetch_add(1, std::memory_order_relaxed);
Expand Down Expand Up @@ -1074,6 +1074,7 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

increase_report_version();
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
Expand Down
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn);
}

Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) {
GetObjStoreInfoRequest req;
GetObjStoreInfoResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -916,6 +916,8 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
return s;
}

*is_vault_mode = resp.enable_storage_vault();

auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
Expand All @@ -931,6 +933,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
}
});

// desensitization, hide secret
for (int i = 0; i < resp.obj_info_size(); ++i) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
Expand All @@ -940,7 +943,8 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}

LOG(INFO) << "get storage vault response: " << resp.ShortDebugString();
LOG(INFO) << "get storage vault, enable_storage_vault=" << is_vault_mode
<< " response=" << resp.ShortDebugString();
return Status::OK();
}

Expand Down
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ class CloudMetaMgr {

Status precommit_txn(const StreamLoadContext& ctx);

Status get_storage_vault_info(StorageVaultInfos* vault_infos);
/**
* Gets storage vault (storage backends) from meta-service
*
* @param vault_info output param, all storage backends
* @param is_vault_mode output param, true for pure vault mode, false for legacy mode
* @return status
*/
Status get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode);

Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res);

Expand Down
14 changes: 10 additions & 4 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ struct RefreshFSVaultVisitor {

Status CloudStorageEngine::open() {
cloud::StorageVaultInfos vault_infos;
bool enable_storage_vault = false;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (st.ok()) {
break;
}
Expand All @@ -177,7 +178,11 @@ Status CloudStorageEngine::open() {
return vault_process_error(id, vault_info, std::move(st));
}
}
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));

// vault mode should not support latest_fs to get rid of unexpected storage backends choosen
if (!enable_storage_vault) {
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
}

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down Expand Up @@ -340,7 +345,8 @@ void CloudStorageEngine::_check_file_cache_ttl_block_valid() {

void CloudStorageEngine::sync_storage_vault() {
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
bool enable_storage_vault = false;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
Expand All @@ -363,7 +369,7 @@ void CloudStorageEngine::sync_storage_vault() {
}

if (auto& id = std::get<0>(vault_infos.back());
latest_fs() == nullptr || latest_fs()->id() != id) {
(latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
set_latest_fs(get_filesystem(id));
}
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized:
break;
}

case TYPE_DATETIME: {
std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_date_str(cell.stringVal.data(), cell.stringVal.size());
datas[0] = src;
auto data = datas[0];
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
break;
}
default: {
std::stringstream ss;
ss << "unsupported column type:" << type;
Expand Down
26 changes: 4 additions & 22 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "exec/schema_scanner/schema_active_queries_scanner.h"

#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -101,27 +100,10 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

SchemaScannerHelper::insert_string_value(0, row.column_value[0].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(1, row.column_value[1].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_int_value(2, row.column_value[2].longVal,
_active_query_block.get());
SchemaScannerHelper::insert_int_value(3, row.column_value[3].longVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(4, row.column_value[4].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(5, row.column_value[5].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(6, row.column_value[6].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(7, row.column_value[7].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(8, row.column_value[8].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(9, row.column_value[9].stringVal,
_active_query_block.get());
for (int j = 0; j < _s_tbls_columns.size(); j++) {
RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _active_query_block.get(),
_s_tbls_columns[j].type));
}
}
return Status::OK();
}
Expand Down
18 changes: 2 additions & 16 deletions be/src/exec/schema_scanner/schema_partitions_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <stdint.h>

#include "exec/schema_scanner/schema_helper.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -147,22 +146,9 @@ Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

for (int j = 0; j < _s_tbls_columns.size(); j++) {
if ((_s_tbls_columns[j].type == TYPE_BIGINT) || _s_tbls_columns[j].type == TYPE_INT) {
SchemaScannerHelper::insert_int_value(j, row.column_value[j].longVal,
_partitions_block.get());
} else if (_s_tbls_columns[j].type == TYPE_DATETIME) {
std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_date_str(row.column_value[j].stringVal.data(),
row.column_value[j].stringVal.size());
datas[0] = src;
SchemaScannerHelper::insert_datetime_value(j, datas, _partitions_block.get());
} else {
SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal,
_partitions_block.get());
}
RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _partitions_block.get(),
_s_tbls_columns[j].type));
}
}
return Status::OK();
Expand Down
15 changes: 2 additions & 13 deletions be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "exec/schema_scanner/schema_routine_scanner.h"

#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -103,19 +102,9 @@ Status SchemaRoutinesScanner::get_block_from_fe() {

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

for (int j = 0; j < _s_tbls_columns.size(); j++) {
if (_s_tbls_columns[j].type == TYPE_DATETIME) {
std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_date_str(row.column_value[j].stringVal.data(),
row.column_value[j].stringVal.size());
datas[0] = src;
SchemaScannerHelper::insert_datetime_value(j, datas, _routines_block.get());
} else {
SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal,
_routines_block.get());
}
RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _routines_block.get(),
_s_tbls_columns[j].type));
}
}
return Status::OK();
Expand Down
Loading

0 comments on commit caf8722

Please sign in to comment.