Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Aug 7, 2023
1 parent b5ecf84 commit 7eb0fe5
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 27 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ Status ExecNode::get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& func,
bool clear_data) {
LOG(INFO) << "sout: block type=" << typeid(block).name();
// LOG(INFO) << "sout: block type=" << typeid(block).name();
if (_output_row_descriptor) {
if (clear_data) {
clear_origin_block();
Expand Down
27 changes: 20 additions & 7 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Status LoadInstanceInfo::get_block(vectorized::Block* block, bool* find_block, b
#endif
}
if (!_block_queue.empty()) {
LOG(INFO) << "sout: block type=" << typeid(*block).name();
// LOG(INFO) << "sout: block type=" << typeid(*block).name();
auto& future_block = _block_queue.front();
auto* fblock = static_cast<vectorized::FutureBlock*>(block);
fblock->swap_future_block(future_block);
Expand Down Expand Up @@ -309,7 +309,8 @@ Status GroupCommitTable::_exe_plan_fragment(int64_t db_id, int64_t table_id, int
<< ", executor status=" << status->to_string()
<< ", request commit status=" << st.to_string()
<< ", instance_id=" << print_id(instance_id)
<< ", rows=" << state->num_rows_load_total();
<< ", rows=" << state->num_rows_load_total()
<< ", url=" << state->get_error_log_file_path();
if (!st.ok()) {
LOG(WARNING) << "request commit error, table_id=" << table_id
<< ", executor status=" << status->to_string()
Expand Down Expand Up @@ -403,6 +404,10 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
std::bind<void>(&GroupCommitMgr::_append_row, this, pipe, request));

std::unique_ptr<RuntimeState> runtime_state = RuntimeState::create_unique();
TQueryOptions query_options;
query_options.query_type = TQueryType::LOAD;
TQueryGlobals query_globals;
runtime_state->init(pipe_id, query_options, query_globals, _exec_env);
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(pipe_id)), -1));
DescriptorTbl* desc_tbl = nullptr;
Expand All @@ -426,11 +431,11 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
bool eof = false;
bool first = true;
while (!eof) {
// LOG(INFO) << "sout: start get block, thread_id=" << std::this_thread::get_id();
LOG(INFO) << "sout: start get block, thread_id=" << std::this_thread::get_id();
// TODO what to do if read one block error
RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof));
/*LOG(INFO) << "sout: finish get block, thread_id=" << std::this_thread::get_id()
<< ", rows=" << _block->rows() << ", eof=" << eof;*/
LOG(INFO) << "sout: finish get block, thread_id="
<< ", rows=" << _block->rows() << ", eof=" << eof;
/*if (UNLIKELY(_block->rows() == 0)) {
continue;
}*/
Expand All @@ -445,13 +450,17 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
RETURN_IF_ERROR(_get_block_load_instance_info(request->db_id(), table_id,
future_block, load_instance_info));
}
// RETURN_IF_ERROR(_add_block(table_id, future_block));
RETURN_IF_ERROR(load_instance_info->add_block(future_block));
if (future_block->rows() > 0) {
future_blocks.emplace_back(future_block);
}
first = false;
}
LOG(INFO) << "sout: url=" << runtime_state->get_error_log_file_path()
<< ", load rows=" << runtime_state->num_rows_load_total()
<< ", filter rows=" << runtime_state->num_rows_load_filtered()
<< ", unselect rows=" << runtime_state->num_rows_load_unselected()
<< ", success rows=" << runtime_state->num_rows_load_success();
}

for (const auto& future_block : future_blocks) {
Expand All @@ -462,7 +471,11 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
auto st = std::get<1>(*(future_block->block_status));
*total_rows += std::get<2>(*(future_block->block_status));
*loaded_rows += std::get<3>(*(future_block->block_status));
LOG(INFO) << "sout: rows 0=" << request->data().size() << ", total=" << *total_rows
<< ", loaded=" << *loaded_rows;
}
LOG(INFO) << "sout: rows=" << request->data().size() << ", total=" << *total_rows
<< ", loaded=" << *loaded_rows << ", block.size=" << future_blocks.size();
// TODO
return Status::OK();
}
Expand All @@ -471,7 +484,7 @@ Status GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
const PGroupCommitInsertRequest* request) {
/*LOG(INFO) << "sout: start add row num=" << request->data_size()
<< ", thread_id=" << std::this_thread::get_id();*/
for (int i = 0; i < request->data_size(); ++i) {
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
// TODO append may error when pipe is cancelled
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
bool* stop_processing, bool is_summary) {
*stop_processing = false;
if (query_type() != TQueryType::LOAD) {
LOG(INFO) << "sout: skip append error msg";
return Status::OK();
}
// If file haven't been opened, open it here
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ void Block::update_hash(SipHash& hash) const {
void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
const IColumn::Filter& filter) {
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
LOG(INFO) << "sout: columns_to_filter size=" << columns_to_filter.size()
<< ", filter size=" << filter.size() << ", count=" << count;
for (const auto& item : columns_to_filter) {
LOG(INFO) << "sout: column to filter=" << item;
}
if (count == 0) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
Expand Down Expand Up @@ -753,8 +758,10 @@ void Block::append_block_by_selector(MutableBlock* dst, const IColumn::Selector&

Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter,
int filter_column_id, int column_to_keep) {
LOG(INFO) << "sout: before filter block, rows=" << block->rows();
const auto& filter_column = block->get_by_position(filter_column_id).column;
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
LOG(INFO) << "sout: filter 1";
const auto& nested_column = nullable_column->get_nested_column_ptr();

MutableColumnPtr mutable_holder =
Expand All @@ -773,20 +780,23 @@ Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to
}
RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
} else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
LOG(INFO) << "sout: filter 2";
bool ret = const_column->get_bool(0);
if (!ret) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
}
} else {
LOG(INFO) << "sout: filter 3, block=" << block->dump_data(0);
const IColumn::Filter& filter =
assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
.get_data();
RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
}

LOG(INFO) << "sout: after filter block 1, rows=" << block->rows();
erase_useless_column(block, column_to_keep);
LOG(INFO) << "sout: after filter block 2, rows=" << block->rows();
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
},
&_line_reader_eof));
_counter->num_rows_filtered++;
LOG(INFO) << "sout: filter one row, " << _counter->num_rows_filtered;
*success = false;
return Status::OK();
}
Expand Down Expand Up @@ -482,6 +483,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
},
&_line_reader_eof));
_counter->num_rows_filtered++;
LOG(INFO) << "sout: filter one row, " << _counter->num_rows_filtered;
*success = false;
return Status::OK();
}
Expand Down Expand Up @@ -629,6 +631,7 @@ Status CsvReader::_check_array_format(std::vector<Slice>& split_values, bool* is
},
&_line_reader_eof));
_counter->num_rows_filtered++;
LOG(INFO) << "sout: filter one row, " << _counter->num_rows_filtered;
*is_success = false;
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/group_commit_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ GroupCommitScanNode::GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnod

Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
bool find_node = false;
LOG(INFO) << "sout: block type=" << typeid(block).name();
// LOG(INFO) << "sout: block type=" << typeid(block).name();
while (!find_node && !*eos) {
RETURN_IF_ERROR(load_instance_info->get_block(block, &find_node, eos));
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
LOG(INFO) << "sout: block 0 = " << _src_block.dump_data(0);

for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand Down Expand Up @@ -512,6 +513,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
},
&_scanner_eof));
filter_map[i] = false;
LOG(INFO) << "sout: filter 1 row=" << i;
} else if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
Expand All @@ -527,6 +529,10 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
},
&_scanner_eof));
filter_map[i] = false;
LOG(INFO) << "sout: filter 2 row=" << i
<< ", slot=" << slot_desc->col_name()
<< ", is nullable=" << slot_desc->is_nullable()
<< ", url=" << _state->get_error_log_file_path();
}
}
}
Expand All @@ -545,6 +551,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block.clear();

LOG(INFO) << "sout: block 1 = " << block->dump_data(0);
size_t dest_size = block->columns();
// do filter
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
Expand All @@ -553,6 +560,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));

_counter.num_rows_filtered += rows - block->rows();
LOG(INFO) << "sout: filter rows, " << (rows - block->rows());
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.FrontendOptions;
Expand Down Expand Up @@ -325,7 +326,7 @@ public void analyze(Analyzer analyzer) throws UserException {
return;
}

analyzeSubquery(analyzer);
analyzeSubquery(analyzer, false);

analyzePlanHints();

Expand Down Expand Up @@ -459,7 +460,7 @@ private void checkColumnCoverage(Set<String> mentionedCols, List<Column> baseCol
}
}

private void analyzeSubquery(Analyzer analyzer) throws UserException {
private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserException {
// Analyze columns mentioned in the statement.
Set<String> mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
List<String> realTargetColumnNames;
Expand Down Expand Up @@ -601,7 +602,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex);
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex, skipCheck);
}

// clear these 2 structures, rebuild them using VALUES exprs
Expand All @@ -619,7 +620,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing
// error.
rows.add(Lists.newArrayList(selectStmt.getResultExprs()));
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex);
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex, skipCheck);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
for (Expr expr : rows.get(0)) {
Expand Down Expand Up @@ -698,14 +699,17 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows,
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols, Map<String, Expr> slotToIndex)
throws AnalysisException {
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols, Map<String, Expr> slotToIndex,
boolean skipCheck) throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
// so we should minus this
if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) {
throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1));
}
if (skipCheck) {
return;
}

ArrayList<Expr> row = rows.get(rowIdx);
if (!origColIdxsForExtendCols.isEmpty()) {
Expand Down Expand Up @@ -983,7 +987,14 @@ public void planForGroupCommit(TUniqueId queryId) throws UserException, TExcepti
this.analyzer = analyzerTmp;
}
// analyzeSubquery(analyzer, true);
analyzeSubquery(analyzer);
/*SelectStmt selectStmt = (SelectStmt) getQueryStmt();
for (List<Expr> row : selectStmt.getValueList().getRows()) {
LOG.info("sout: before row: {}", row);
}*/
analyzeSubquery(analyzer, true);
/*for (List<Expr> row : selectStmt.getValueList().getRows()) {
LOG.info("sout: after row: {}", row);
}*/
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
if (targetColumnNames != null) {
streamLoadPutRequest.setColumns(String.join(",", targetColumnNames));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,9 @@ private void handleInsertStmt() throws Exception {
txnId = response.getTxnId();
loadedRows = response.getLoadedRows();
filteredRows = (int) response.getFilteredRows();
LOG.info("sout: stmt={}, rows={}, total={}, filter={}, values={}, retry={}, st={}",
parsedStmt.getOrigStmt().originStmt, rows.size(), loadedRows, filteredRows,
parsedStmt.getPlaceHolders().toString(), i, code);
break;
}
} else {
Expand Down
Loading

0 comments on commit 7eb0fe5

Please sign in to comment.