Skip to content

Commit

Permalink
feat: support batchrequest in ProcessQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Jun 26, 2024
1 parent cf86f04 commit 05ec61c
Showing 1 changed file with 151 additions and 88 deletions.
239 changes: 151 additions & 88 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stdlib.h>
#include <filesystem>
#include <memory>
#include "vm/sql_compiler.h"
#ifdef DISALLOW_COPY_AND_ASSIGN
#undef DISALLOW_COPY_AND_ASSIGN
#endif
Expand Down Expand Up @@ -1691,112 +1692,174 @@ void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb::
auto mode = hybridse::vm::Engine::TryDetermineEngineMode(request->sql(), default_mode);

::hybridse::base::Status status;
// FIXME(someone): it does not handles batchrequest
if (mode == hybridse::vm::EngineMode::kBatchMode) {
// convert repeated openmldb:type::DataType into hybridse::codec::Schema
hybridse::codec::Schema parameter_schema;
for (int i = 0; i < request->parameter_types().size(); i++) {
auto column = parameter_schema.Add();
hybridse::type::Type hybridse_type;

if (!openmldb::schema::SchemaAdapter::ConvertType(request->parameter_types(i), &hybridse_type)) {
response->set_msg("Invalid parameter type: " +
openmldb::type::DataType_Name(request->parameter_types(i)));
response->set_code(::openmldb::base::kSQLCompileError);
return;
switch (mode) {
case hybridse::vm::EngineMode::kBatchMode: {
// convert repeated openmldb:type::DataType into hybridse::codec::Schema
hybridse::codec::Schema parameter_schema;
for (int i = 0; i < request->parameter_types().size(); i++) {
auto column = parameter_schema.Add();

Check warning on line 1700 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1700

Added line #L1700 was not covered by tests
hybridse::type::Type hybridse_type;

if (!openmldb::schema::SchemaAdapter::ConvertType(request->parameter_types(i), &hybridse_type)) {
response->set_msg("Invalid parameter type: " +
openmldb::type::DataType_Name(request->parameter_types(i)));
response->set_code(::openmldb::base::kSQLCompileError);
return;

Check warning on line 1707 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1703-L1707

Added lines #L1703 - L1707 were not covered by tests
}
column->set_type(hybridse_type);

Check warning on line 1709 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1709

Added line #L1709 was not covered by tests
}
column->set_type(hybridse_type);
}
::hybridse::vm::BatchRunSession session;
if (request->is_debug()) {
session.EnableDebug();
}
session.SetParameterSchema(parameter_schema);
{
bool ok = engine_->Get(request->sql(), request->db(), session, status);
if (!ok) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLCompileError);
DLOG(WARNING) << "fail to compile sql " << request->sql() << ", message: " << status.msg;
return;
::hybridse::vm::BatchRunSession session;
if (request->is_debug()) {
session.EnableDebug();
}
session.SetParameterSchema(parameter_schema);
{
bool ok = engine_->Get(request->sql(), request->db(), session, status);
if (!ok) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLCompileError);
DLOG(WARNING) << "fail to compile sql " << request->sql() << ", message: " << status.msg;
return;

Check warning on line 1722 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1719-L1722

Added lines #L1719 - L1722 were not covered by tests
}
}
}

::hybridse::codec::Row parameter_row;
auto& request_buf = static_cast<brpc::Controller*>(ctrl)->request_attachment();
if (request->parameter_row_size() > 0 &&
!codec::DecodeRpcRow(request_buf, 0, request->parameter_row_size(), request->parameter_row_slices(),
&parameter_row)) {
response->set_code(::openmldb::base::kSQLRunError);
response->set_msg("fail to decode parameter row");
return;
}
std::vector<::hybridse::codec::Row> output_rows;
int32_t run_ret = session.Run(parameter_row, output_rows);
if (run_ret != 0) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLRunError);
DLOG(WARNING) << "fail to run sql: " << request->sql();
return;
}
uint32_t byte_size = 0;
uint32_t count = 0;
for (auto& output_row : output_rows) {
if (FLAGS_scan_max_bytes_size > 0 && byte_size > FLAGS_scan_max_bytes_size) {
LOG(WARNING) << "reach the max byte size " << FLAGS_scan_max_bytes_size << " truncate result";
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
::hybridse::codec::Row parameter_row;
auto& request_buf = static_cast<brpc::Controller*>(ctrl)->request_attachment();
if (request->parameter_row_size() > 0 &&
!codec::DecodeRpcRow(request_buf, 0, request->parameter_row_size(), request->parameter_row_slices(),

Check warning on line 1729 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1729

Added line #L1729 was not covered by tests
&parameter_row)) {
response->set_code(::openmldb::base::kSQLRunError);
response->set_msg("fail to decode parameter row");
return;

Check warning on line 1733 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1731-L1733

Added lines #L1731 - L1733 were not covered by tests
}
std::vector<::hybridse::codec::Row> output_rows;
int32_t run_ret = session.Run(parameter_row, output_rows);
if (run_ret != 0) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLRunError);
DLOG(WARNING) << "fail to run sql: " << request->sql();

Check warning on line 1740 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1738-L1740

Added lines #L1738 - L1740 were not covered by tests
return;
}
byte_size += output_row.size();
buf->append(reinterpret_cast<void*>(output_row.buf()), output_row.size());
count += 1;
uint32_t byte_size = 0;
uint32_t count = 0;
for (auto& output_row : output_rows) {
if (FLAGS_scan_max_bytes_size > 0 && byte_size > FLAGS_scan_max_bytes_size) {
LOG(WARNING) << "reach the max byte size " << FLAGS_scan_max_bytes_size << " truncate result";
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
return;

Check warning on line 1752 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1747-L1752

Added lines #L1747 - L1752 were not covered by tests
}
byte_size += output_row.size();
buf->append(reinterpret_cast<void*>(output_row.buf()), output_row.size());
count += 1;
}
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
DLOG(INFO) << "handle batch sql " << request->sql() << " with record cnt " << count << " byte size "
<< byte_size;
break;
}
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
DLOG(INFO) << "handle batch sql " << request->sql() << " with record cnt " << count << " byte size "
<< byte_size;
} else {
::hybridse::vm::RequestRunSession session;
if (request->is_debug()) {
session.EnableDebug();
}
if (request->is_procedure()) {
const std::string& db_name = request->db();
const std::string& sp_name = request->sp_name();
std::shared_ptr<hybridse::vm::CompileInfo> request_compile_info;
{
hybridse::base::Status status;
request_compile_info = sp_cache_->GetRequestInfo(db_name, sp_name, status);
if (!status.isOK()) {
response->set_code(::openmldb::base::ReturnCode::kProcedureNotFound);
case hybridse::vm::kRequestMode: {
::hybridse::vm::RequestRunSession session;
if (request->is_debug()) {
session.EnableDebug();
}
if (request->is_procedure()) {
const std::string& db_name = request->db();
const std::string& sp_name = request->sp_name();
std::shared_ptr<hybridse::vm::CompileInfo> request_compile_info;

Check warning on line 1774 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1774

Added line #L1774 was not covered by tests
{
hybridse::base::Status status;
request_compile_info = sp_cache_->GetRequestInfo(db_name, sp_name, status);
if (!status.isOK()) {
response->set_code(::openmldb::base::ReturnCode::kProcedureNotFound);
response->set_msg(status.msg);
PDLOG(WARNING, status.msg.c_str());
return;

Check warning on line 1782 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1779-L1782

Added lines #L1779 - L1782 were not covered by tests
}
}
session.SetCompileInfo(request_compile_info);
session.SetSpName(sp_name);
RunRequestQuery(ctrl, *request, session, *response, *buf);
} else {
bool ok = engine_->Get(request->sql(), request->db(), session, status);
if (!ok || session.GetCompileInfo() == nullptr) {

Check warning on line 1790 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1789-L1790

Added lines #L1789 - L1790 were not covered by tests
response->set_msg(status.msg);
PDLOG(WARNING, status.msg.c_str());
response->set_code(::openmldb::base::kSQLCompileError);
DLOG(WARNING) << "fail to compile sql in request mode:\n" << request->sql();

Check warning on line 1793 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1792-L1793

Added lines #L1792 - L1793 were not covered by tests
return;
}
RunRequestQuery(ctrl, *request, session, *response, *buf);

Check warning on line 1796 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1796

Added line #L1796 was not covered by tests
}
const std::string& sql = session.GetCompileInfo()->GetSql();
if (response->code() != ::openmldb::base::kOk) {
DLOG(WARNING) << "fail to run sql " << sql << " error msg: " << response->msg();

Check warning on line 1800 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1800

Added line #L1800 was not covered by tests
} else {
DLOG(INFO) << "handle request sql " << sql;
}
break;
}
case hybridse::vm::kBatchRequestMode: {

Check warning on line 1806 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1806

Added line #L1806 was not covered by tests
// we support a simplified batch request query here
// not procedure
// no parameter input or bachrequst row
// batchrequest row must specified in CONFIG (values = ...)
::hybridse::base::Status status;
::hybridse::vm::BatchRequestRunSession session;
if (request->is_debug()) {
session.EnableDebug();

Check warning on line 1814 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1811-L1814

Added lines #L1811 - L1814 were not covered by tests
}
session.SetCompileInfo(request_compile_info);
session.SetSpName(sp_name);
RunRequestQuery(ctrl, *request, session, *response, *buf);
} else {
bool ok = engine_->Get(request->sql(), request->db(), session, status);
if (!ok || session.GetCompileInfo() == nullptr) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLCompileError);
DLOG(WARNING) << "fail to compile sql in request mode:\n" << request->sql();
return;
}
RunRequestQuery(ctrl, *request, session, *response, *buf);
auto info = std::dynamic_pointer_cast<hybridse::vm::SqlCompileInfo>(session.GetCompileInfo());
if (info && info->get_sql_context().request_rows.empty()) {
response->set_msg("batch request values must specified in SQL CONFIG (values = [...])");
response->set_code(::openmldb::base::kSQLCompileError);
return;

Check warning on line 1827 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1823-L1827

Added lines #L1823 - L1827 were not covered by tests
}
std::vector<::hybridse::codec::Row> output_rows;
std::vector<::hybridse::codec::Row> empty_inputs;
int32_t run_ret = session.Run(empty_inputs, output_rows);
if (run_ret != 0) {
response->set_msg(status.msg);
response->set_code(::openmldb::base::kSQLRunError);
DLOG(WARNING) << "fail to run batchrequest sql: " << request->sql();
return;

Check warning on line 1836 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1829-L1836

Added lines #L1829 - L1836 were not covered by tests
}
uint32_t byte_size = 0;
uint32_t count = 0;
for (auto& output_row : output_rows) {
if (FLAGS_scan_max_bytes_size > 0 && byte_size > FLAGS_scan_max_bytes_size) {
LOG(WARNING) << "reach the max byte size " << FLAGS_scan_max_bytes_size << " truncate result";
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
return;

Check warning on line 1847 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1838-L1847

Added lines #L1838 - L1847 were not covered by tests
}
byte_size += output_row.size();
buf->append(reinterpret_cast<void*>(output_row.buf()), output_row.size());
count += 1;

Check warning on line 1851 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1849-L1851

Added lines #L1849 - L1851 were not covered by tests
}
response->set_schema(session.GetEncodedSchema());
response->set_byte_size(byte_size);
response->set_count(count);
response->set_code(::openmldb::base::kOk);
break;

Check warning on line 1857 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1853-L1857

Added lines #L1853 - L1857 were not covered by tests
}
const std::string& sql = session.GetCompileInfo()->GetSql();
if (response->code() != ::openmldb::base::kOk) {
DLOG(WARNING) << "fail to run sql " << sql << " error msg: " << response->msg();
} else {
DLOG(INFO) << "handle request sql " << sql;
default: {
response->set_msg("un-implemented execute_mode: " + hybridse::vm::EngineModeName(mode));
response->set_code(::openmldb::base::kSQLCompileError);
break;

Check warning on line 1862 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1859-L1862

Added lines #L1859 - L1862 were not covered by tests
}
}
}
Expand Down

0 comments on commit 05ec61c

Please sign in to comment.