Skip to content

Commit

Permalink
[Vectorized] Support ODBC sink for vec exec engine (apache#11045)
Browse files Browse the repository at this point in the history
Co-authored-by: lihaopeng <lihaopeng@baidu.com>
  • Loading branch information
2 people authored and eldenmoon committed Aug 1, 2022
1 parent 03d2bbe commit 948e787
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 4 deletions.
8 changes: 6 additions & 2 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "runtime/runtime_state.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vodbc_table_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vresult_sink.h"
#include "vec/sink/vtablet_sink.h"
Expand Down Expand Up @@ -156,8 +157,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
if (is_vec) {
sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
} else {
sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
}
break;
}

Expand Down
112 changes: 112 additions & 0 deletions be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "runtime/primitive_type.h"
#include "util/mysql_global.h"
#include "util/types.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

#define ODBC_DISPOSE(h, ht, x, op) \
{ \
Expand Down Expand Up @@ -395,4 +398,113 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI
return diagnostic_msg;
}

Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
{
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);

int num_rows = block->rows();
int num_columns = block->columns();
for (int i = start_send_row; i < num_rows; ++i) {
(*num_rows_sent)++;

// Construct insert statement of odbc table
for (int j = 0; j < num_columns; ++j) {
if (j != 0) {
fmt::format_to(_insert_stmt_buffer, "{}", ", ");
}
auto [item, size] = block->get_by_position(j).column->get_data_at(i);
if (item == nullptr) {
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
continue;
}
switch (_output_vexpr_ctxs[j]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int8_t*>(item));
break;
case TYPE_SMALLINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int16_t*>(item));
break;
case TYPE_INT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int32_t*>(item));
break;
case TYPE_BIGINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int64_t*>(item));
break;
case TYPE_FLOAT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const float*>(item));
break;
case TYPE_DOUBLE:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const double*>(item));
break;
case TYPE_DATE:
case TYPE_DATETIME: {
vectorized::VecDateTimeValue value =
binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf);
std::string_view str(buf, pos - buf - 1);
fmt::format_to(_insert_stmt_buffer, "'{}'", str);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value value = *(DecimalV2Value*)(item);
fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
break;
}
case TYPE_LARGEINT: {
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const __int128*>(item));
break;
}
default: {
return Status::InternalError("can't convert this type to mysql type. type = {}",
_output_expr_ctxs[j]->root()->type().type);
}
}
}

if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
fmt::format_to(_insert_stmt_buffer, "{}", "),(");
} else {
// batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
fmt::format_to(_insert_stmt_buffer, "{}", ")");
break;
}
}
// Translate utf8 string to utf16 to use unicode encodeing
insert_stmt = utf8_to_wstring(
std::string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
}

{
SCOPED_TIMER(_result_send_timer);
ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
_insert_stmt_buffer.data());
}
COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
return Status::OK();
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

namespace doris {

namespace vectorized {
class VExprContext;
}

struct ODBCConnectorParam {
std::string connect_string;

Expand Down Expand Up @@ -72,7 +76,11 @@ class ODBCConnector {
// write for ODBC table
Status init_to_write(RuntimeProfile* profile);
Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row,
uint32_t* num_row_sent);
uint32_t* num_rows_sent);

Status append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent);

// use in ODBC transaction
Status begin_trans(); // should be call after connect and before query or init_to_write
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/odbc_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Status OdbcTableSink::prepare(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
std::stringstream title;
title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
title << _name << " (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ set(VEC_FILES
sink/vtablet_sink.cpp
sink/vmysql_table_writer.cpp
sink/vmysql_table_sink.cpp
sink/vodbc_table_sink.cpp
sink/vresult_file_sink.cpp
runtime/vdatetime_value.cpp
runtime/vdata_stream_recvr.cpp
Expand Down
112 changes: 112 additions & 0 deletions be/src/vec/sink/vodbc_table_sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/vodbc_table_sink.h"

#include <sstream>

#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr.h"

namespace doris {
namespace vectorized {

VOdbcTableSink::VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_exprs)
: _pool(pool),
_row_desc(row_desc),
_t_output_expr(t_exprs),
_mem_tracker(MemTracker::create_tracker(-1, "VOdbcTableSink")) {
_name = "VOdbcTableSink";
}

Status VOdbcTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(DataSink::init(t_sink));
const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;

_odbc_param.connect_string = t_odbc_sink.connect_string;
_odbc_tbl = t_odbc_sink.table;
_use_transaction = t_odbc_sink.use_transaction;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
return Status::OK();
}

Status VOdbcTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
std::stringstream title;
title << "VOdbcTableSink (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
}

Status VOdbcTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));

// create writer
_writer.reset(new ODBCConnector(_odbc_param));
RETURN_IF_ERROR(_writer->open());
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
RETURN_IF_ERROR(_writer->init_to_write(_profile));
return Status::OK();
}

Status VOdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
return Status::NotSupported(
"Not Implemented VOdbcTableSink::send(RuntimeState* state, RowBatch* batch)");
}

Status VOdbcTableSink::send(RuntimeState* state, Block* block) {
Status status = Status::OK();
if (block == nullptr || block->rows() == 0) {
return status;
}

auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
_output_expr_ctxs, *block, status);
materialize_block_inplace(output_block);

uint32_t start_send_row = 0;
uint32_t num_row_sent = 0;
while (start_send_row < output_block.rows()) {
status = _writer->append(_odbc_tbl, &output_block, _output_expr_ctxs, start_send_row,
&num_row_sent);
if (UNLIKELY(!status.ok())) return status;
start_send_row += num_row_sent;
num_row_sent = 0;
}

return Status::OK();
}

Status VOdbcTableSink::close(RuntimeState* state, Status exec_status) {
VExpr::close(_output_expr_ctxs, state);
return Status::OK();
}
} // namespace vectorized
} // namespace doris
72 changes: 72 additions & 0 deletions be/src/vec/sink/vodbc_table_sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/odbc_connector.h"
#include "vec/sink/vmysql_table_writer.h"

namespace doris {

class RowDescriptor;
class TExpr;
class RuntimeState;
class RuntimeProfile;
class MemTracker;
namespace vectorized {

// This class is a sinker, which put input data to odbc table
class VOdbcTableSink : public DataSink {
public:
VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_exprs);

Status init(const TDataSink& thrift_sink) override;

Status prepare(RuntimeState* state) override;

Status open(RuntimeState* state) override;

Status send(RuntimeState* state, RowBatch* batch) override;

Status send(RuntimeState* state, vectorized::Block* block) override;
// Flush all buffered data and close all existing channels to destination
// hosts. Further send() calls are illegal after calling close().
Status close(RuntimeState* state, Status exec_status) override;

RuntimeProfile* profile() override { return _profile; }

private:
// owned by RuntimeState
ObjectPool* _pool;
const RowDescriptor& _row_desc;
const std::vector<TExpr>& _t_output_expr;

std::vector<VExprContext*> _output_expr_ctxs;

RuntimeProfile* _profile;
std::shared_ptr<MemTracker> _mem_tracker;

ODBCConnectorParam _odbc_param;
std::string _odbc_tbl;
std::unique_ptr<ODBCConnector> _writer;
// whether use transaction
bool _use_transaction;
};
} // namespace vectorized
} // namespace doris

0 comments on commit 948e787

Please sign in to comment.