Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 29 additions & 61 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,6 @@

namespace doris {

class Slice {
public:
Slice(const uint8_t* data, int size) : _data(data), _size(size) { }
Slice(const std::string& str) : _data((const uint8_t*)str.data()), _size(str.size()) { }

~Slice() {
// No need to delete _begin, because it only record the index in a std::string.
// The c-string will be released along with the std::string object.
}

int size() const {
return _size;
}

const uint8_t* data() const {
return _data;
}

const uint8_t* end() const {
return _data + _size;
}
private:
friend std::ostream& operator<<(std::ostream& os, const Slice& slice);
const uint8_t* _data;
int _size;
};

std::ostream& operator<<(std::ostream& os, const Slice& slice) {
os << std::string((const char*)slice._data, slice._size);
return os;
}

BrokerScanner::BrokerScanner(RuntimeState* state,
RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
Expand All @@ -81,8 +49,8 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
_ranges(ranges),
_broker_addresses(broker_addresses),
// _splittable(params.splittable),
_value_separator(params.column_separator),
_line_delimiter(params.line_delimiter),
_value_separator(static_cast<char>(params.column_separator)),
_line_delimiter(static_cast<char>(params.line_delimiter)),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
Expand Down Expand Up @@ -397,9 +365,9 @@ void BrokerScanner::close() {
void BrokerScanner::split_line(
const Slice& line, std::vector<Slice>* values) {
// line-begin char and line-end char are considered to be 'delimeter'
const uint8_t* value = line.data();
const uint8_t* ptr = line.data();
for (size_t i = 0; i < line.size(); ++i, ++ptr) {
const char* value = line.data;
const char* ptr = line.data;
for (size_t i = 0; i < line.size; ++i, ++ptr) {
if (*ptr == _value_separator) {
values->emplace_back(value, ptr - value);
value = ptr + 1;
Expand All @@ -411,12 +379,12 @@ void BrokerScanner::split_line(
void BrokerScanner::fill_fix_length_string(
const Slice& value, MemPool* pool,
char** new_value_p, const int new_value_length) {
if (new_value_length != 0 && value.size() < new_value_length) {
if (new_value_length != 0 && value.size < new_value_length) {
*new_value_p = reinterpret_cast<char*>(pool->allocate(new_value_length));

// 'value' is guaranteed not to be nullptr
memcpy(*new_value_p, value.data(), value.size());
for (int i = value.size(); i < new_value_length; ++i) {
memcpy(*new_value_p, value.data, value.size);
for (int i = value.size; i < new_value_length; ++i) {
(*new_value_p)[i] = '\0';
}
}
Expand All @@ -430,13 +398,13 @@ bool BrokerScanner::check_decimal_input(
const Slice& slice,
int precision, int scale,
std::stringstream* error_msg) {
const uint8_t* value = slice.data();
int value_length = slice.size();
const char* value = slice.data;
size_t value_length = slice.size;

if (value_length > (precision + 2)) {
(*error_msg) << "the length of decimal value is overflow. "
<< "precision in schema: (" << precision << ", " << scale << "); "
<< "value: [" << slice << "]; "
<< "value: [" << slice.to_string() << "]; "
<< "str actual length: " << value_length << ";";
return false;
}
Expand Down Expand Up @@ -479,21 +447,21 @@ bool BrokerScanner::check_decimal_input(
if (value_int_len > (precision - scale)) {
(*error_msg) << "the int part length longer than schema precision ["
<< precision << "]. "
<< "value [" << slice << "]. ";
<< "value [" << slice.to_string() << "]. ";
return false;
} else if (value_frac_len > scale) {
(*error_msg) << "the frac part length longer than schema scale ["
<< scale << "]. "
<< "value [" << slice << "]. ";
<< "value [" << slice.to_string() << "]. ";
return false;
}
return true;
}

bool is_null(const Slice& slice) {
return slice.size() == 2 &&
slice.data()[0] == '\\' &&
slice.data()[1] == 'N';
return slice.size == 2 &&
slice.data[0] == '\\' &&
slice.data[1] == 'N';
}

// Writes a slot in _tuple from an value containing text data.
Expand All @@ -503,29 +471,29 @@ bool BrokerScanner::write_slot(
Tuple* tuple, MemPool* tuple_pool,
std::stringstream* error_msg) {

if (value.size() == 0 && !slot->type().is_string_type()) {
if (value.size == 0 && !slot->type().is_string_type()) {
(*error_msg) << "the length of input should not be 0. "
<< "column_name: " << column_name << "; "
<< "type: " << slot->type();
return false;
}

char* value_to_convert = (char*)value.data();
int value_to_convert_length = value.size();
char* value_to_convert = value.data;
size_t value_to_convert_length = value.size;

// Fill all the spaces if it is 'TYPE_CHAR' type
if (slot->type().is_string_type()) {
int char_len = column_type.len;
if (value.size() > char_len) {
if (value.size > char_len) {
(*error_msg) << "the length of input is too long than schema. "
<< "column_name: " << column_name << "; "
<< "input_str: [" << value << "] "
<< "input_str: [" << value.to_string() << "] "
<< "type: " << slot->type() << "; "
<< "schema length: " << char_len << "; "
<< "actual length: " << value.size() << "; ";
<< "actual length: " << value.size << "; ";
return false;
}
if (slot->type().type == TYPE_CHAR && value.size() < char_len) {
if (slot->type().type == TYPE_CHAR && value.size < char_len) {
if (!is_null(value)) {
fill_fix_length_string(
value, tuple_pool,
Expand All @@ -547,7 +515,7 @@ bool BrokerScanner::write_slot(
(*error_msg) << "convert csv string to "
<< slot->type() << " failed. "
<< "column_name: " << column_name << "; "
<< "input_str: [" << value << "]; ";
<< "input_str: [" << value.to_string() << "]; ";
return false;
}

Expand Down Expand Up @@ -576,7 +544,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
error_msg << "actual column number is less than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string((const char*)line.data(), line.size()),
_state->append_error_msg_to_file(std::string(line.data, line.size),
error_msg.str());
_counter->num_rows_filtered++;
return false;
Expand All @@ -585,7 +553,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
error_msg << "actual column number is more than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string((const char*)line.data(), line.size()),
_state->append_error_msg_to_file(std::string(line.data, line.size),
error_msg.str());
_counter->num_rows_filtered++;
return false;
Expand All @@ -601,8 +569,8 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
str_slot->ptr = (char*)value.data();
str_slot->len = value.size();
str_slot->ptr = value.data;
str_slot->len = value.size;
}

return true;
Expand All @@ -624,7 +592,7 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is null";
_state->append_error_msg_to_file(
std::string((const char*)line.data(), line.size()), error_msg.str());
std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "util/slice.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -121,8 +122,8 @@ class BrokerScanner {

std::unique_ptr<TextConverter> _text_converter;

uint8_t _value_separator;
uint8_t _line_delimiter;
char _value_separator;
char _line_delimiter;

// Reader
FileReader* _cur_file_reader;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,15 @@ void OlapScanner::_convert_row_to_tuple(Tuple* tuple) {
size_t len = field->size();
switch (slot_desc->type().type) {
case TYPE_CHAR: {
StringSlice* slice = reinterpret_cast<StringSlice*>(ptr);
Slice* slice = reinterpret_cast<Slice*>(ptr);
StringValue *slot = tuple->get_string_slot(slot_desc->tuple_offset());
slot->ptr = slice->data;
slot->len = strnlen(slot->ptr, slice->size);
break;
}
case TYPE_VARCHAR:
case TYPE_HLL: {
StringSlice* slice = reinterpret_cast<StringSlice*>(ptr);
Slice* slice = reinterpret_cast<Slice*>(ptr);
StringValue *slot = tuple->get_string_slot(slot_desc->tuple_offset());
slot->ptr = slice->data;
slot->len = slice->size;
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/aggregate_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_REPLACE, OLAP_FIELD_TYPE_CHAR>
bool r_null = *reinterpret_cast<const bool*>(right);
*reinterpret_cast<bool*>(left) = r_null;
if (!r_null) {
StringSlice* l_slice = reinterpret_cast<StringSlice*>(left + 1);
const StringSlice* r_slice = reinterpret_cast<const StringSlice*>(right + 1);
Slice* l_slice = reinterpret_cast<Slice*>(left + 1);
const Slice* r_slice = reinterpret_cast<const Slice*>(right + 1);
if (arena == nullptr || l_slice->size >= r_slice->size) {
memory_copy(l_slice->data, r_slice->data, r_slice->size);
l_slice->size = r_slice->size;
Expand All @@ -224,13 +224,13 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_REPLACE, OLAP_FIELD_TYPE_VARCH
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL> {
static void aggregate(char* left, const char* right, Arena* arena) {
StringSlice* l_slice = reinterpret_cast<StringSlice*>(left + 1);
Slice* l_slice = reinterpret_cast<Slice*>(left + 1);
size_t hll_ptr = *(size_t*)(l_slice->data - sizeof(HllContext*));
HllContext* context = (reinterpret_cast<HllContext*>(hll_ptr));
HllSetHelper::fill_set(right + 1, context);
}
static void finalize(char* data) {
StringSlice* slice = reinterpret_cast<StringSlice*>(data);
Slice* slice = reinterpret_cast<Slice*>(data);
size_t hll_ptr = *(size_t*)(slice->data - sizeof(HllContext*));
HllContext* context = (reinterpret_cast<HllContext*>(hll_ptr));
std::map<int, uint8_t> index_to_value;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ OLAPStatus StringColumnDirectReader::init(
return OLAP_ERR_COLUMN_STREAM_NOT_EXIST;
}

_values = reinterpret_cast<StringSlice*>(mem_pool->allocate(size * sizeof(StringSlice)));
_values = reinterpret_cast<Slice*>(mem_pool->allocate(size * sizeof(Slice)));

ReadOnlyFileStream* length_stream = extract_stream(_column_unique_id,
StreamInfoMessage::LENGTH,
Expand Down Expand Up @@ -436,7 +436,7 @@ OLAPStatus StringColumnDictionaryReader::init(
}
*/

_values = reinterpret_cast<StringSlice*>(mem_pool->allocate(size * sizeof(StringSlice)));
_values = reinterpret_cast<Slice*>(mem_pool->allocate(size * sizeof(Slice)));
int64_t read_buffer_size = 1024;
char* _read_buffer = new(std::nothrow) char[read_buffer_size];

Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class StringColumnDirectReader {
private:
bool _eof;
uint32_t _column_unique_id;
StringSlice* _values;
Slice* _values;
ReadOnlyFileStream* _data_stream;
RunLengthIntegerReader* _length_reader;
};
Expand Down Expand Up @@ -158,7 +158,7 @@ class StringColumnDictionaryReader {
bool _eof;
uint32_t _dictionary_size;
uint32_t _column_unique_id;
StringSlice* _values;
Slice* _values;
char* _read_buffer;
//uint64_t _dictionary_size;
//uint64_t* _offset_dictionary; // 用来查找响应数据的数字对应的offset
Expand Down Expand Up @@ -335,27 +335,27 @@ class DefaultValueReader : public ColumnReader {
}
case OLAP_FIELD_TYPE_CHAR: {
_values =
reinterpret_cast<void*>(mem_pool->allocate(size * sizeof(StringSlice)));
reinterpret_cast<void*>(mem_pool->allocate(size * sizeof(Slice)));
int32_t length = _length;
char* string_buffer = reinterpret_cast<char*>(mem_pool->allocate(size * length));
memset(string_buffer, 0, size * length);
for (int i = 0; i < size; ++i) {
memory_copy(string_buffer, _default_value.c_str(), _default_value.length());
((StringSlice*)_values)[i].size = length;
((StringSlice*)_values)[i].data = string_buffer;
((Slice*)_values)[i].size = length;
((Slice*)_values)[i].data = string_buffer;
string_buffer += length;
}
break;
}
case OLAP_FIELD_TYPE_VARCHAR: {
_values =
reinterpret_cast<void*>(mem_pool->allocate(size * sizeof(StringSlice)));
reinterpret_cast<void*>(mem_pool->allocate(size * sizeof(Slice)));
int32_t length = _default_value.length();
char* string_buffer = reinterpret_cast<char*>(mem_pool->allocate(size * length));
for (int i = 0; i < size; ++i) {
memory_copy(string_buffer, _default_value.c_str(), length);
((StringSlice*)_values)[i].size = length;
((StringSlice*)_values)[i].data = string_buffer;
((Slice*)_values)[i].size = length;
((Slice*)_values)[i].data = string_buffer;
string_buffer += length;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ OLAPStatus ColumnWriter::write(RowCursor* row_cursor) {
_field_info.type == OLAP_FIELD_TYPE_VARCHAR ||
_field_info.type == OLAP_FIELD_TYPE_HLL)
{
StringSlice* slice = reinterpret_cast<StringSlice*>(buf);
Slice* slice = reinterpret_cast<Slice*>(buf);
_bf->add_bytes(slice->data, slice->size);
} else {
_bf->add_bytes(buf, field->size());
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ class VarStringColumnWriter : public ColumnWriter {
bool is_null = field->is_null(cursor->get_buf());
if (!is_null) {
char* buf = field->get_ptr(cursor->get_buf());
StringSlice* slice = reinterpret_cast<StringSlice*>(buf);
Slice* slice = reinterpret_cast<Slice*>(buf);
res = write(slice->data, slice->size);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to write varchar, res=" << res;
Expand Down Expand Up @@ -521,7 +521,7 @@ class FixLengthStringColumnWriter : public VarStringColumnWriter {

if (!is_null) {
//const char* str = reinterpret_cast<const char*>(buf);
StringSlice* slice = reinterpret_cast<StringSlice*>(buf);
Slice* slice = reinterpret_cast<Slice*>(buf);
res = VarStringColumnWriter::write(slice->data, slice->size);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to write fix-length string, res=" << res;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/field.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Field::Field(const FieldInfo& field_info)
_type_info = get_type_info(field_info.type);
if (_type == OLAP_FIELD_TYPE_CHAR || _type == OLAP_FIELD_TYPE_VARCHAR
|| _type == OLAP_FIELD_TYPE_HLL) {
_size = sizeof(StringSlice);
_size = sizeof(Slice);
} else {
/*
* the field_info.size and field_info.index_length is equal to zero,
Expand Down
Loading