Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepengcheng01 committed May 6, 2022
1 parent 2655c09 commit bd54b7a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 54 deletions.
6 changes: 3 additions & 3 deletions be/src/vec/exec/vbroker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
}

// All scanner has been finished, and all cached batch has been read
if (scanner_block == nullptr || scanner_block.get() == nullptr) {
if (_mutable_block.get() != nullptr && !_mutable_block->empty()) {
if (!scanner_block) {
if (_mutable_block && !_mutable_block->empty()) {
*block = _mutable_block->to_block();
reached_limit(block, eos);
LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
Expand All @@ -101,7 +101,7 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
// notify one scanner
_queue_writer_cond.notify_one();

if (_mutable_block.get() == nullptr) {
if (UNLIKELY(!_mutable_block)) {
_mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
}

Expand Down
38 changes: 30 additions & 8 deletions be/src/vec/exec/vbroker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableCo
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
auto old_rows = tmp_block->rows();
// filter
Expand Down Expand Up @@ -131,7 +131,8 @@ Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableCo
return status;
}

Status VBrokerScanner::_fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns) {
Status VBrokerScanner::_fill_dest_columns(const Slice& line,
std::vector<MutableColumnPtr>& columns) {
RETURN_IF_ERROR(_line_split_to_values(line));
if (!_success) {
// If not success, which means we met an invalid row, return.
Expand Down Expand Up @@ -197,20 +198,41 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, std::vector<Mutable
return Status::OK();
}
// nullable
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
nullable_column->insert_data(nullptr, 0);
continue;
}
RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc, &columns[dest_index], _state));

RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
&columns[dest_index], _state));
}

const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file, range.columns_from_path, columns));
}

_success = true;
return Status::OK();
}

Status VBrokerScanner::_fill_columns_from_path(int start,
const std::vector<std::string>& columns_from_path,
std::vector<MutableColumnPtr>& columns) {
// values of columns from path can not be null
for (int i = 0; i < columns_from_path.size(); ++i) {
int dest_index = i + start;
auto slot_desc = _src_slot_descs.at(dest_index);
const std::string& column_from_path = columns_from_path[i];
RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()), column_from_path.size(),
slot_desc, &columns[dest_index], _state));
}
return Status::OK();
}

Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
vectorized::MutableColumnPtr* column_ptr,
RuntimeState* state) {
vectorized::MutableColumnPtr* column_ptr,
RuntimeState* state) {
if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
std::stringstream ss;
ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/vbroker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class VBrokerScanner final : public BrokerScanner {
std::unique_ptr<TextConverter> _text_converter;

Status _write_text_column(char* value, int length, SlotDescriptor* slot,
MutableColumnPtr* column_ptr,
RuntimeState* state);
MutableColumnPtr* column_ptr, RuntimeState* state);

Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns);

Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);

Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path,
std::vector<MutableColumnPtr>& columns);
};
} // namespace doris::vectorized
7 changes: 4 additions & 3 deletions be/test/exec/multi_bytes_separator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ namespace doris {

class MultiBytesSeparatorTest : public testing::Test {
public:
MultiBytesSeparatorTest() {}

MultiBytesSeparatorTest() : _runtime_state(TQueryGlobals()) {}
private:
RuntimeState _runtime_state;
protected:
virtual void SetUp() {}
virtual void TearDown() {}
Expand All @@ -56,7 +57,7 @@ TEST_F(MultiBytesSeparatorTest, normal) {
const std::vector<TBrokerRangeDesc> ranges;
const std::vector<TNetworkAddress> broker_addresses;
const std::vector<TExpr> pre_filter_texprs;
BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_texprs,
BrokerScanner scanner(&_runtime_state, nullptr, params, ranges, broker_addresses, pre_filter_texprs,
nullptr);

#define private public
Expand Down
72 changes: 34 additions & 38 deletions be/test/vec/exec/vbroker_scan_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class VBrokerScanNodeTest : public testing::Test {
VBrokerScanNodeTest() : _runtime_state(TQueryGlobals()) {
init();
_runtime_state._instance_mem_tracker.reset(new MemTracker());
_runtime_state._query_options.enable_vectorized_engine = true;
}
void init();
static void SetUpTestCase() {
Expand Down Expand Up @@ -277,7 +278,7 @@ void VBrokerScanNodeTest::init_desc_table() {
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 1;
slot_desc.columnPos = 2;
slot_desc.byteOffset = 32;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
Expand All @@ -304,7 +305,7 @@ void VBrokerScanNodeTest::init_desc_table() {
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 1;
slot_desc.columnPos = 3;
slot_desc.byteOffset = 48;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
Expand Down Expand Up @@ -466,37 +467,31 @@ TEST_F(VBrokerScanNodeTest, normal) {
doris::vectorized::Block block;
bool eos = false;
status = scan_node.get_next(&_runtime_state, &block, &eos);
ASSERT_EQ(3, block.rows());
ASSERT_EQ(4, block.rows());
ASSERT_EQ(4, block.columns());
ASSERT_FALSE(eos);

auto columns = block.get_columns();
ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);
ASSERT_EQ(columns[0]->get_int(2), 8);

ASSERT_EQ(columns[1]->get_int(0), 2);
ASSERT_EQ(columns[1]->get_int(1), 5);
ASSERT_EQ(columns[1]->get_int(2), 9);
ASSERT_TRUE(eos);

ASSERT_EQ(columns[2]->get_int(0), 3);
ASSERT_EQ(columns[2]->get_int(1), 6);
ASSERT_EQ(columns[2]->get_int(2), 10);
auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 4);
ASSERT_EQ(columns[0].to_string(0), "1");
ASSERT_EQ(columns[0].to_string(1), "4");
ASSERT_EQ(columns[0].to_string(2), "8");
ASSERT_EQ(columns[0].to_string(3), "4");

ASSERT_EQ(columns[3]->get_int(0), 1);
ASSERT_EQ(columns[3]->get_int(1), 1);
ASSERT_EQ(columns[3]->get_int(2), 1);
ASSERT_EQ(columns[1].to_string(0), "2");
ASSERT_EQ(columns[1].to_string(1), "5");
ASSERT_EQ(columns[1].to_string(2), "9");
ASSERT_EQ(columns[1].to_string(3), "5");

block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eos);
ASSERT_EQ(1, block.rows());
ASSERT_FALSE(eos);
ASSERT_EQ(columns[2].to_string(0), "3");
ASSERT_EQ(columns[2].to_string(1), "6");
ASSERT_EQ(columns[2].to_string(2), "10");
ASSERT_EQ(columns[2].to_string(3), "6");

columns = block.get_columns();
ASSERT_EQ(columns[0]->get_int(0), 4);
ASSERT_EQ(columns[1]->get_int(0), 5);
ASSERT_EQ(columns[2]->get_int(0), 6);
ASSERT_EQ(columns[3]->get_int(0), 2);
ASSERT_EQ(columns[3].to_string(0), "1");
ASSERT_EQ(columns[3].to_string(1), "1");
ASSERT_EQ(columns[3].to_string(2), "1");
ASSERT_EQ(columns[3].to_string(3), "2");

block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eos);
Expand Down Expand Up @@ -610,20 +605,21 @@ TEST_F(VBrokerScanNodeTest, where_binary_pre) {
ASSERT_EQ(2, block.rows());
ASSERT_EQ(4, block.columns());

auto columns = block.get_columns();
ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);
auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 4);
ASSERT_EQ(columns[0].to_string(0), "1");
ASSERT_EQ(columns[0].to_string(1), "4");

ASSERT_EQ(columns[1]->get_int(0), 2);
ASSERT_EQ(columns[1]->get_int(1), 5);
ASSERT_EQ(columns[1].to_string(0), "2");
ASSERT_EQ(columns[1].to_string(1), "5");

ASSERT_EQ(columns[2]->get_int(0), 3);
ASSERT_EQ(columns[2]->get_int(1), 6);
ASSERT_EQ(columns[2].to_string(0), "3");
ASSERT_EQ(columns[2].to_string(1), "6");

ASSERT_EQ(columns[3]->get_int(0), 1);
ASSERT_EQ(columns[3]->get_int(1), 1);
ASSERT_EQ(columns[3].to_string(0), "1");
ASSERT_EQ(columns[3].to_string(1), "1");

ASSERT_FALSE(eos);
ASSERT_TRUE(eos);

block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eos);
Expand Down

0 comments on commit bd54b7a

Please sign in to comment.