Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 64 additions & 10 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
#include "olap/iterators.h"
#include "util/slice.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_struct.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -685,23 +689,23 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
}

Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
const MutableColumnPtr& data_column,
orc::ListVectorBatch* lvb, size_t num_values,
size_t* element_size) {
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets,
size_t num_values, size_t* element_size) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
if (num_values > 0) {
auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets();
auto& orc_offsets = lvb->offsets;
if (orc_offsets.size() < num_values + 1) {
return Status::InternalError("Wrong array offsets in orc file for column '{}'",
col_name);
}
auto prev_offset = offsets_data.back();
auto prev_offset = doris_offsets.back();
auto base_offset = orc_offsets[0];
for (int i = 1; i < num_values + 1; ++i) {
offsets_data.emplace_back(prev_offset + orc_offsets[i] - base_offset);
doris_offsets.emplace_back(prev_offset + orc_offsets[i] - base_offset);
}
*element_size = orc_offsets[num_values] - base_offset;
} else {
*element_size = 0;
}
return Status::OK();
}
Expand Down Expand Up @@ -779,17 +783,67 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_list = down_cast<orc::ListVectorBatch*>(cvb);
size_t element_size;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, data_column, orc_list, num_values,
auto& doris_offsets = static_cast<ColumnArray&>(*data_column).get_offsets();
auto& orc_offsets = orc_list->offsets;
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_offsets, orc_offsets, num_values,
&element_size));
DataTypePtr& nested_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get())
->get_nested_type());
const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
return _orc_column_to_doris_column(
col_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
nested_orc_type, orc_list->elements.get(), element_size);
}
case TypeIndex::Map: {
if (orc_column_type->getKind() != orc::TypeKind::MAP) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_map = down_cast<orc::MapVectorBatch*>(cvb);
auto& doris_map = static_cast<ColumnMap&>(*data_column);
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_map.get_offsets(),
orc_map->offsets, num_values, &element_size));
DataTypePtr& doris_key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_key_type());
DataTypePtr& doris_value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
const orc::Type* orc_value_type = orc_column_type->getSubtype(1);
const ColumnPtr& doris_key_column =
typeid_cast<const ColumnArray*>(doris_map.get_keys_ptr().get())->get_data_ptr();
const ColumnPtr& doris_value_column =
typeid_cast<const ColumnArray*>(doris_map.get_values_ptr().get())->get_data_ptr();
RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_key_column, doris_key_type,
orc_key_type, orc_map->keys.get(),
element_size));
return _orc_column_to_doris_column(col_name, doris_value_column, doris_value_type,
orc_value_type, orc_map->elements.get(), element_size);
}
case TypeIndex::Struct: {
if (orc_column_type->getKind() != orc::TypeKind::STRUCT) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_struct = down_cast<orc::StructVectorBatch*>(cvb);
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
if (orc_struct->fields.size() != doris_struct.tuple_size()) {
return Status::InternalError("Wrong number of struct fields for column '{}'", col_name);
}
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
orc::ColumnVectorBatch* orc_field = orc_struct->fields[i];
const orc::Type* orc_type = orc_column_type->getSubtype(i);
const ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
const DataTypePtr& doris_type = doris_struct_type->get_element(i);
RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_field, doris_type, orc_type,
orc_field, num_values));
}
return Status::OK();
}
default:
break;
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/config.h"
#include "exec/olap_common.h"
#include "io/fs/file_reader.h"
#include "vec/columns/column_array.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/exec/format/format_common.h"
Expand Down Expand Up @@ -251,8 +252,9 @@ class OrcReader : public GenericReader {
size_t num_values);

Status _fill_doris_array_offsets(const std::string& col_name,
const MutableColumnPtr& data_column, orc::ListVectorBatch* lvb,
size_t num_values, size_t* element_size);
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets, size_t num_values,
size_t* element_size);

std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);

Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/parquet/level_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ class LevelDecoder {
return _rle_decoder.GetNextRun(val, max_run);
}

inline level_t get_next() {
level_t next = -1;
_rle_decoder.Get(&next);
return next;
}

inline void rewind_one() { _rle_decoder.RewindOne(); }

private:
tparquet::Encoding::type _encoding;
level_t _bit_width = 0;
Expand Down
Loading