Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/action-pr-title
Submodule action-pr-title updated 548 files
2 changes: 1 addition & 1 deletion .github/actions/get-workflow-origin
2 changes: 1 addition & 1 deletion be/src/apache-orc
2 changes: 1 addition & 1 deletion be/src/clucene
53 changes: 39 additions & 14 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@
#include <new>
#include <queue>
#include <shared_mutex>
#include <type_traits>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/hdfs_builder.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
Expand All @@ -54,10 +55,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
Expand Down Expand Up @@ -355,8 +357,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
if (_ranges[0].file_type == TFileType::FILE_HDFS) {
_file_params.hdfs_params = parse_properties(_params.properties);
} else {
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
}

for (const auto& range : _ranges) {
TFileRangeDesc file_range;
Expand Down Expand Up @@ -485,17 +491,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
// bitmap convert:src -> to_base64 -> bitmap_from_base64
if (slot_desc->type().is_bitmap_type()) {
auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
vectorized::DataTypeString().get_type_as_type_descriptor(),
slot_desc->is_nullable());
auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
"to_base64", {arg}, base64_return_type);
RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
auto func_bitmap_from_base64 =
vectorized::SimpleFunctionFactory::instance().get_function(
"bitmap_from_base64", {arg_base64}, return_type);
RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg_base64.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
} else {
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = EnvUtils.getDorisHome() + "/lib/yarn-config";

@ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
"Default timeout for ingestion load job, in seconds."})
public static int ingestion_load_default_timeout_second = 86400; // 1 day

@ConfField(mutable = true, masterOnly = true, description = {"Broker Load 的最大等待 job 数量。"
+ "这个值是一个期望值。在某些情况下,比如切换 master,当前等待的 job 数量可能会超过这个值。",
"Maximal number of waiting jobs for Broker Load. This is a desired number. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
@SerializedName(value = "schemaVersion")
public int schemaVersion;

public EtlIndex(long indexId, List<EtlColumn> etlColumns, int schemaHash,
String indexType, boolean isBaseIndex) {
String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
this.schemaVersion = schemaVersion;
}

public EtlColumn getColumn(String name) {
Expand All @@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ ", schemaVersion=" + schemaVersion
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
*
* DROP RESOURCE "spark0";
*/
@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);

Expand Down
Loading