Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class ArrayTypeInfo : public TypeInfo {
inline uint32_t hash_code(const void* data, uint32_t seed) const override {
auto value = reinterpret_cast<const CollectionValue*>(data);
auto len = value->length();
uint32_t result = HashUtil::hash(&len, sizeof(size_t), seed);
uint32_t result = HashUtil::hash(&len, sizeof(len), seed);
for (size_t i = 0; i < len; ++i) {
if (value->is_null_at(i)) {
result = seed * result;
Expand Down
17 changes: 8 additions & 9 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -188,8 +188,8 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id,
this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)),
_timeout_second(-1),
_fragments_ctx(std::move(fragments_ctx)),
_set_rsc_info(false) {
_set_rsc_info(false),
_fragments_ctx(std::move(fragments_ctx)) {
_start_time = DateTimeValue::local_time();
_coord_addr = _fragments_ctx->coord_addr;
}
Expand Down Expand Up @@ -489,11 +489,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_cxt->need_commit_self = true;
stream_load_cxt->need_rollback = true;
// total_length == -1 means read one message from pipe in once time, don't care the length.
auto pipe = std::make_shared<StreamLoadPipe>(
1024 * 1024 /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
-1 /* total_length */,
true /* use_proto */);
auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_cxt->body_sink = pipe;
stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;

Expand All @@ -508,7 +506,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
}
}

void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe) {
void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<StreamLoadPipe> pipe) {
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::string to_load_error_http_path(const std::string& file_name);
// This class used to manage all the fragment execute in this instance
class FragmentMgr : public RestMonitorIface {
public:
typedef std::function<void(PlanFragmentExecutor*)> FinishCallback;
using FinishCallback = std::function<void(PlanFragmentExecutor*)>;

FragmentMgr(ExecEnv* exec_env);
virtual ~FragmentMgr();
Expand Down
29 changes: 14 additions & 15 deletions be/src/runtime/tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Tuple {
return result;
}

void init(int size) { bzero(this, size); }
void init(int size) { bzero(_data, size); }

// The total size of all data represented in this tuple (tuple data and referenced
// string and collection data).
Expand Down Expand Up @@ -120,69 +120,68 @@ class Tuple {
// this is a no-op (but we don't have to branch to check is slots are nulalble).
void set_null(const NullIndicatorOffset& offset) {
//DCHECK(offset.bit_mask != 0);
char* null_indicator_byte = reinterpret_cast<char*>(this) + offset.byte_offset;
char* null_indicator_byte = &_data[offset.byte_offset];
*null_indicator_byte |= offset.bit_mask;
}

// Turn null indicator bit off.
void set_not_null(const NullIndicatorOffset& offset) {
char* null_indicator_byte = reinterpret_cast<char*>(this) + offset.byte_offset;
char* null_indicator_byte = &_data[offset.byte_offset];
*null_indicator_byte &= ~offset.bit_mask;
}

bool is_null(const NullIndicatorOffset& offset) const {
const char* null_indicator_byte = reinterpret_cast<const char*>(this) + offset.byte_offset;
const char* null_indicator_byte = &_data[offset.byte_offset];
return (*null_indicator_byte & offset.bit_mask) != 0;
}

void* get_slot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<char*>(this) + offset;
return &_data[offset];
}

const void* get_slot(int offset) const {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<const char*>(this) + offset;
return &_data[offset];
}

StringValue* get_string_slot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<StringValue*>(reinterpret_cast<char*>(this) + offset);
return reinterpret_cast<StringValue*>(&_data[offset]);
}

const StringValue* get_string_slot(int offset) const {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<const StringValue*>(reinterpret_cast<const char*>(this) + offset);
return reinterpret_cast<const StringValue*>(&_data[offset]);
}

CollectionValue* get_collection_slot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<CollectionValue*>(reinterpret_cast<char*>(this) + offset);
return reinterpret_cast<CollectionValue*>(&_data[offset]);
}

const CollectionValue* get_collection_slot(int offset) const {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<const CollectionValue*>(reinterpret_cast<const char*>(this) +
offset);
return reinterpret_cast<const CollectionValue*>(&_data[offset]);
}

DateTimeValue* get_datetime_slot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<DateTimeValue*>(reinterpret_cast<char*>(this) + offset);
return reinterpret_cast<DateTimeValue*>(&_data[offset]);
}

DecimalV2Value* get_decimalv2_slot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<DecimalV2Value*>(reinterpret_cast<char*>(this) + offset);
return reinterpret_cast<DecimalV2Value*>(&_data[offset]);
}

void* get_data() { return this; }
void* get_data() { return _data; }

std::string to_string(const TupleDescriptor& d) const;
static std::string to_string(const Tuple* t, const TupleDescriptor& d);

private:
void* _data;
char _data[0];
};

} // namespace doris
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/parse_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int64_t ParseUtil::parse_mem_spec(const std::string& mem_spec_str, bool* is_perc
}

StringParser::ParseResult result;
int64_t bytes;
int64_t bytes = -1;

if (multiplier != -1 || *is_percent) {
// Parse float - MB or GB or percent
Expand Down
2 changes: 2 additions & 0 deletions be/test/olap/column_vector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ TEST_F(ColumnVectorTest, array_column_vector_test) {
test_read_write_array_column_vector<OLAP_FIELD_TYPE_TINYINT>(type_info, num_array,
array_val);

// Test hash_code in CollectionValue
type_info->hash_code(array_val, 0);
delete[] array_val;
delete[] item_val;
}
Expand Down