Skip to content

Commit

Permalink
[cherry-pick][Vectorized] Support ODBC sink for vec exec engine (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei authored and caiconghui committed Jul 21, 2022
1 parent cdd692f commit 6916517
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 8 deletions.
17 changes: 11 additions & 6 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
#include "runtime/result_file_sink.h"
#include "runtime/result_sink.h"
#include "runtime/runtime_state.h"

#include "vec/sink/result_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vmysql_table_writer.h"
#include "vec/sink/vodbc_table_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vtablet_sink.h"
#include "vec/sink/vmysql_table_sink.h"

namespace doris {

Expand Down Expand Up @@ -81,7 +81,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink

// TODO: figure out good buffer size based on size of output row
if (is_vec) {
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, thrift_sink.result_sink, 4096);
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
thrift_sink.result_sink, 4096);
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
Expand Down Expand Up @@ -139,7 +140,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
if (is_vec) {
doris::vectorized::VMysqlTableSink* vmysql_tbl_sink = new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
doris::vectorized::VMysqlTableSink* vmysql_tbl_sink =
new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
sink->reset(vmysql_tbl_sink);
} else {
// TODO: figure out good buffer size based on size of output row
Expand All @@ -156,8 +158,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
if (is_vec) {
sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
} else {
sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
}
break;
}

Expand Down
112 changes: 112 additions & 0 deletions be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "exprs/expr.h"
#include "runtime/primitive_type.h"
#include "util/types.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

#define ODBC_DISPOSE(h, ht, x, op) \
{ \
Expand Down Expand Up @@ -397,4 +400,113 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI
return diagnostic_msg;
}

Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
{
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);

int num_rows = block->rows();
int num_columns = block->columns();
for (int i = start_send_row; i < num_rows; ++i) {
(*num_rows_sent)++;

// Construct insert statement of odbc table
for (int j = 0; j < num_columns; ++j) {
if (j != 0) {
fmt::format_to(_insert_stmt_buffer, "{}", ", ");
}
auto [item, size] = block->get_by_position(j).column->get_data_at(i);
if (item == nullptr) {
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
continue;
}
switch (_output_vexpr_ctxs[j]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int8_t*>(item));
break;
case TYPE_SMALLINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int16_t*>(item));
break;
case TYPE_INT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int32_t*>(item));
break;
case TYPE_BIGINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int64_t*>(item));
break;
case TYPE_FLOAT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const float*>(item));
break;
case TYPE_DOUBLE:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const double*>(item));
break;
case TYPE_DATE:
case TYPE_DATETIME: {
vectorized::VecDateTimeValue value =
binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf);
std::string_view str(buf, pos - buf - 1);
fmt::format_to(_insert_stmt_buffer, "'{}'", str);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value value = *(DecimalV2Value*)(item);
fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
break;
}
case TYPE_LARGEINT: {
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const __int128*>(item));
break;
}
default: {
return Status::InternalError("can't convert this type to mysql type. type = {}",
_output_expr_ctxs[j]->root()->type().type);
}
}
}

if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
fmt::format_to(_insert_stmt_buffer, "{}", "),(");
} else {
// batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
fmt::format_to(_insert_stmt_buffer, "{}", ")");
break;
}
}
// Translate utf8 string to utf16 to use unicode encodeing
insert_stmt = utf8_to_wstring(
std::string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
}

{
SCOPED_TIMER(_result_send_timer);
ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
_insert_stmt_buffer.data());
}
COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
return Status::OK();
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

namespace doris {

namespace vectorized {
class VExprContext;
}

struct ODBCConnectorParam {
std::string connect_string;

Expand Down Expand Up @@ -74,7 +78,11 @@ class ODBCConnector {
// write for ODBC table
Status init_to_write(RuntimeProfile* profile);
Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row,
uint32_t* num_row_sent);
uint32_t* num_rows_sent);

Status append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent);

// use in ODBC transaction
Status begin_trans(); // should be call after connect and before query or init_to_write
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/odbc_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Status OdbcTableSink::prepare(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
std::stringstream title;
title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
title << _name << " (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ set(VEC_FILES
sink/vtablet_sink.cpp
sink/vmysql_table_writer.cpp
sink/vmysql_table_sink.cpp
sink/vodbc_table_sink.cpp
sink/vresult_file_sink.cpp
runtime/vdatetime_value.cpp
runtime/vdata_stream_recvr.cpp
Expand Down
112 changes: 112 additions & 0 deletions be/src/vec/sink/vodbc_table_sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/vodbc_table_sink.h"

#include <sstream>

#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr.h"

namespace doris {
namespace vectorized {

VOdbcTableSink::VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_exprs)
: _pool(pool),
_row_desc(row_desc),
_t_output_expr(t_exprs),
_mem_tracker(MemTracker::create_tracker(-1, "VOdbcTableSink")) {
_name = "VOdbcTableSink";
}

Status VOdbcTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(DataSink::init(t_sink));
const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;

_odbc_param.connect_string = t_odbc_sink.connect_string;
_odbc_tbl = t_odbc_sink.table;
_use_transaction = t_odbc_sink.use_transaction;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
return Status::OK();
}

Status VOdbcTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
std::stringstream title;
title << "VOdbcTableSink (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
}

Status VOdbcTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));

// create writer
_writer.reset(new ODBCConnector(_odbc_param));
RETURN_IF_ERROR(_writer->open());
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
RETURN_IF_ERROR(_writer->init_to_write(_profile));
return Status::OK();
}

Status VOdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
return Status::NotSupported(
"Not Implemented VOdbcTableSink::send(RuntimeState* state, RowBatch* batch)");
}

Status VOdbcTableSink::send(RuntimeState* state, Block* block) {
Status status = Status::OK();
if (block == nullptr || block->rows() == 0) {
return status;
}

auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
_output_expr_ctxs, *block, status);
materialize_block_inplace(output_block);

uint32_t start_send_row = 0;
uint32_t num_row_sent = 0;
while (start_send_row < output_block.rows()) {
status = _writer->append(_odbc_tbl, &output_block, _output_expr_ctxs, start_send_row,
&num_row_sent);
if (UNLIKELY(!status.ok())) return status;
start_send_row += num_row_sent;
num_row_sent = 0;
}

return Status::OK();
}

Status VOdbcTableSink::close(RuntimeState* state, Status exec_status) {
VExpr::close(_output_expr_ctxs, state);
return Status::OK();
}
} // namespace vectorized
} // namespace doris
Loading

0 comments on commit 6916517

Please sign in to comment.