Skip to content

Commit

Permalink
Merge branch 'master' into fix-array-min-max-funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Aug 15, 2024
2 parents 6e1356a + dedf15c commit 422e4f2
Show file tree
Hide file tree
Showing 53 changed files with 972 additions and 167 deletions.
12 changes: 11 additions & 1 deletion be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ CloudWarmUpManager::~CloudWarmUpManager() {
}
}

std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
};
constexpr bool include_stale = false;
tablet->traverse_rowsets(visitor, include_stale);
return id_to_rowset_meta_map;
}

void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
Expand Down Expand Up @@ -78,7 +88,7 @@ void CloudWarmUpManager::handle_jobs() {
std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);
auto tablet_meta = tablet->tablet_meta();
auto rs_metas = tablet_meta->snapshot_rs_metas();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
auto storage_resource = rs->remote_storage_resource();
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
}

_bloom_filter_alloced = data_size;
_inited = true;
return _bloom_filter->init(data, data_size);
}

Expand Down
12 changes: 11 additions & 1 deletion be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ void FileCacheBlockDownloader::check_download_task(const std::vector<int64_t>& t
}
}

std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
};
constexpr bool include_stale = false;
tablet->traverse_rowsets(visitor, include_stale);
return id_to_rowset_meta_map;
}

void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
Expand All @@ -141,7 +151,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
tablet = std::move(res).value();
}

auto id_to_rowset_meta_map = tablet->tablet_meta()->snapshot_rs_metas();
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
return;
Expand Down
12 changes: 0 additions & 12 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ class TabletMeta {
void revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap);

const std::vector<RowsetMetaSharedPtr>& all_stale_rs_metas() const;
// return the snapshot of rowset_meta
// the return value is map<rowset_id_str, rowset_meta_sptr>
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas() const;
RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version) const;
void delete_stale_rs_meta_by_version(const Version& version);
RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version& version) const;
Expand Down Expand Up @@ -698,15 +695,6 @@ inline bool TabletMeta::all_beta() const {
return true;
}

inline std::unordered_map<std::string, RowsetMetaSharedPtr> TabletMeta::snapshot_rs_metas() const {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
std::shared_lock rlock(_meta_lock);
std::for_each(_rs_metas.cbegin(), _rs_metas.cend(), [&](const auto& rowset_meta) {
id_to_rowset_meta_map.emplace(rowset_meta->rowset_id().to_string(), rowset_meta);
});
return id_to_rowset_meta_map;
}

std::string tablet_state_name(TabletState state);

// Only for unit test now.
Expand Down
5 changes: 3 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2063,10 +2063,11 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont
st = Status::Error(ErrorCode::INTERNAL_ERROR,
"_exec_plan_fragment_impl meet unknown error");
}
closure_guard.release();
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id)
<< ", errmsg=" << st;
} else {
closure_guard.release();
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/functions/function_regexp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ struct RegexpReplaceOneImpl {
}
};

template <bool ReturnNull>
struct RegexpExtractImpl {
static constexpr auto name = "regexp_extract";
static constexpr auto name = ReturnNull ? "regexp_extract_or_null" : "regexp_extract";
// 3 args
static void execute_impl(FunctionContext* context, ColumnPtr argument_columns[],
size_t input_rows_count, ColumnString::Chars& result_data,
Expand All @@ -201,7 +202,8 @@ struct RegexpExtractImpl {
}
const auto& index_data = index_col->get_int(i);
if (index_data < 0) {
StringOP::push_empty_string(i, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(i, result_data, result_offset, null_map)
: StringOP::push_empty_string(i, result_data, result_offset);
continue;
}
_execute_inner_loop<false>(context, str_col, pattern_col, index_data, result_data,
Expand All @@ -220,7 +222,8 @@ struct RegexpExtractImpl {
const auto& index_data = index_col->get_int(0);
if (index_data < 0) {
for (size_t i = 0; i < input_rows_count; ++i) {
StringOP::push_empty_string(i, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(i, result_data, result_offset, null_map)
: StringOP::push_empty_string(i, result_data, result_offset);
}
return;
}
Expand Down Expand Up @@ -260,15 +263,17 @@ struct RegexpExtractImpl {

int max_matches = 1 + re->NumberOfCapturingGroups();
if (index_data >= max_matches) {
StringOP::push_empty_string(index_now, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(index_now, result_data, result_offset, null_map)
: StringOP::push_empty_string(index_now, result_data, result_offset);
return;
}

std::vector<re2::StringPiece> matches(max_matches);
bool success =
re->Match(str_sp, 0, str.size, re2::RE2::UNANCHORED, &matches[0], max_matches);
if (!success) {
StringOP::push_empty_string(index_now, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(index_now, result_data, result_offset, null_map)
: StringOP::push_empty_string(index_now, result_data, result_offset);
return;
}
const re2::StringPiece& match = matches[index_data];
Expand Down Expand Up @@ -486,7 +491,8 @@ class FunctionRegexp : public IFunction {

void register_function_regexp_extract(SimpleFunctionFactory& factory) {
factory.register_function<FunctionRegexp<RegexpReplaceImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl<true>>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl<false>>>();
factory.register_function<FunctionRegexp<RegexpReplaceOneImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractAllImpl>>();
}
Expand Down
39 changes: 39 additions & 0 deletions be/test/vec/function/function_like_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,45 @@ TEST(FunctionLikeTest, regexp_extract) {
}
}

TEST(FunctionLikeTest, regexp_extract_or_null) {
std::string func_name = "regexp_extract_or_null";

DataSet data_set = {{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("x=([0-9]+)([a-z]+)"), (int64_t)0},
std::string("x=18abc")},
{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("^x=([a-z]+)([0-9]+)"), (int64_t)0},
std::string("x=a3")},
{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("^x=([a-z]+)([0-9]+)"), (int64_t)1},
std::string("a")},
{{std::string("http://a.m.baidu.com/i41915173660.htm"),
std::string("i([0-9]+)"), (int64_t)0},
std::string("i41915173660")},
{{std::string("http://a.m.baidu.com/i41915173660.htm"),
std::string("i([0-9]+)"), (int64_t)1},
std::string("41915173660")},

{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)0},
std::string("itde")},
{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)1},
std::string("i")},
{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)2},
std::string("td")},
// null
{{std::string("abc"), Null(), (int64_t)0}, Null()},
{{Null(), std::string("i([0-9]+)"), (int64_t)0}, Null()}};

// pattern is constant value
InputTypeSet const_pattern_input_types = {TypeIndex::String, Consted {TypeIndex::String},
TypeIndex::Int64};
for (const auto& line : data_set) {
DataSet const_pattern_dataset = {line};
static_cast<void>(check_function<DataTypeString, true>(func_name, const_pattern_input_types,
const_pattern_dataset));
}
}

TEST(FunctionLikeTest, regexp_extract_all) {
std::string func_name = "regexp_extract_all";

Expand Down
14 changes: 13 additions & 1 deletion cloud/src/recycler/obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

#include "recycler/obj_storage_client.h"

#include <chrono>

#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
#include "recycler/util.h"

using namespace std::chrono;

namespace doris::cloud {

Expand All @@ -28,6 +31,9 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
size_t num_deleted_objects = 0;
auto start_time = steady_clock::now();

auto list_iter = list_objects(path);

ObjectStorageResponse ret;
Expand All @@ -42,6 +48,7 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
continue;
}

num_deleted_objects++;
keys.emplace_back(std::move(obj->key));
if (keys.size() < batch_size) {
continue;
Expand Down Expand Up @@ -70,6 +77,11 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
}
}

auto elapsed = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
<< " finished, ret=" << ret.ret << ", finished=" << finished
<< ", num_deleted_objects=" << num_deleted_objects << ", cost=" << elapsed << " ms";

ret = finished ? ret : -1;

return ret;
Expand Down
12 changes: 4 additions & 8 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
return {std::string_view(), range_move};
}
++num_recycled;
LOG_INFO("k is {}, is empty {}", k, k.empty());
return {k, range_move};
});
} else {
Expand All @@ -1157,10 +1156,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
}
return true;
}());
sync_executor.add([k]() mutable -> TabletKeyPair {
LOG_INFO("k is {}, is empty {}", k, k.empty());
return {k, true};
});
sync_executor.add([k]() mutable -> TabletKeyPair { return {k, true}; });
++num_recycled;
}
return 0;
Expand Down Expand Up @@ -1433,7 +1429,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {

std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle rowsets finished, cost={}s", cost)
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
});
Expand Down Expand Up @@ -1618,7 +1614,7 @@ int InstanceRecycler::recycle_rowsets() {
// old version `RecycleRowsetPB` may has empty resource_id, just remove the kv.
LOG(INFO) << "delete the recycle rowset kv that has empty resource_id, key="
<< hex(k) << " value=" << proto_to_json(rowset);
rowset_keys.push_back(std::string(k));
rowset_keys.emplace_back(k);
return -1;
}
// decode rowset_id
Expand Down Expand Up @@ -1664,7 +1660,7 @@ int InstanceRecycler::recycle_rowsets() {
return -1;
}
} else {
rowset_keys.push_back(std::string(k));
rowset_keys.emplace_back(k);
if (rowset_meta->num_segments() > 0) { // Skip empty rowset
rowsets.push_back(std::move(*rowset_meta));
}
Expand Down
6 changes: 2 additions & 4 deletions cloud/src/recycler/sync_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
#include <glog/logging.h>

#include <future>
#include <iostream>
#include <string>

#include "common/logging.h"
#include "common/simple_thread_pool.h"

namespace doris::cloud {
Expand Down Expand Up @@ -54,10 +52,10 @@ class SyncExecutor {
auto current_time_second = time(nullptr);
current_time.tv_sec = current_time_second + 300;
current_time.tv_nsec = 0;
auto msg = fmt::format("{} has already taken 5 min", _name_tag);
while (0 != _count.timed_wait(current_time)) {
current_time.tv_sec += 300;
LOG(WARNING) << msg;
LOG(WARNING) << _name_tag << " has already taken 5 min, cost: "
<< time(nullptr) - current_time_second << " seconds";
}
*finished = !_stop_token;
std::vector<T> res;
Expand Down
18 changes: 18 additions & 0 deletions docker/thirdparties/docker-compose/oceanbase/init/01-drop-db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

drop database if exists doris_test;
18 changes: 18 additions & 0 deletions docker/thirdparties/docker-compose/oceanbase/init/02-create-db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

create database doris_test;
Loading

0 comments on commit 422e4f2

Please sign in to comment.